Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 10 additions & 12 deletions sunbeam-python/sunbeam/features/interface/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@
LOG = logging.getLogger()


def normalize_pem(pem_bytes: bytes) -> bytes:
"""Normalize pem line endings to LF."""
return pem_bytes.replace(b"\r\n", b"\n").replace(b"\r", b"\n")


def get_all_registered_groups(cli: click.Group) -> dict:
"""Get all the registered groups from cli object.

Expand Down Expand Up @@ -91,9 +96,9 @@ def validate_ca_certificate(
ctx: click.core.Context, param: click.core.Option, value: str
) -> str:
try:
ca_bytes = base64.b64decode(value)
ca_bytes = normalize_pem(base64.b64decode(value))
x509.load_pem_x509_certificate(ca_bytes)
return value
return base64.b64encode(ca_bytes).decode()
except (binascii.Error, TypeError, ValueError) as e:
LOG.debug(e)
raise click.BadParameter(str(e))
Expand All @@ -106,7 +111,7 @@ def validate_ca_chain(
return None

try:
chain_bytes = base64.b64decode(value)
chain_bytes = normalize_pem(base64.b64decode(value))
chain_list = re.findall(
pattern=(
"(?=-----BEGIN CERTIFICATE-----)(.*?)(?<=-----END CERTIFICATE-----)"
Expand All @@ -120,15 +125,15 @@ def validate_ca_chain(
for cert in chain_list:
x509.load_pem_x509_certificate(cert.encode())

return value
return base64.b64encode(chain_bytes).decode()

# Check if the chain is in correct order
for i in range(len(chain_list) - 1):
cert = x509.load_pem_x509_certificate(chain_list[i].encode())
issuer = x509.load_pem_x509_certificate(chain_list[i + 1].encode())
cert.verify_directly_issued_by(issuer)

return value
return base64.b64encode(chain_bytes).decode()
except (
binascii.Error,
TypeError,
Expand Down Expand Up @@ -211,13 +216,6 @@ def generate_ca_chain(certificate: str, ca_certificate: str, ca_chain: str) -> s
if not certificate_decoded or not ca_certificate_decoded or not ca_chain_decoded:
raise binascii.Error("Unable to decode one of the certificates")

# Normalize line endings to LF to ensure consistent comparison and output.
# Certificates with CRLF line endings may otherwise be treated as different
# from equivalent certificates with LF line endings.
certificate_decoded = certificate_decoded.replace("\r\n", "\n")
ca_certificate_decoded = ca_certificate_decoded.replace("\r\n", "\n")
ca_chain_decoded = ca_chain_decoded.replace("\r\n", "\n")

# If ca_certificate is already part of ca_chain, do not add it to the final ca chain
# manual-tls-certificates checks if the final ca_chain is in proper order and each
# certificate is signed by the successor one.
Expand Down
11 changes: 9 additions & 2 deletions sunbeam-python/sunbeam/features/tls/common.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# SPDX-FileCopyrightText: 2024 - Canonical Ltd
# SPDX-License-Identifier: Apache-2.0

import base64
import binascii
import json
import logging
Expand Down Expand Up @@ -40,6 +41,7 @@
generate_ca_chain,
get_subject_from_csr,
is_certificate_valid,
normalize_pem,
)
from sunbeam.features.interface.v1.base import BaseFeatureGroup
from sunbeam.features.interface.v1.openstack import (
Expand Down Expand Up @@ -625,15 +627,20 @@ def prompt(
if not cert or not is_certificate_valid(cert):
raise click.ClickException("Not a valid certificate")

# Normalize CRLF to LF in the leaf cert before storing
cert_normalized = base64.b64encode(
normalize_pem(base64.b64decode(cert))
).decode()

self.process_certs[subject] = {
"app": app,
"unit": unit_name,
"relation_id": relation_id,
"csr": csr,
"certificate": cert,
"certificate": cert_normalized,
}
variables["certificates"].setdefault(subject, {})
variables["certificates"][subject]["certificate"] = cert
variables["certificates"][subject]["certificate"] = cert_normalized

questions.write_answers(self.client, self._CONFIG, variables)

Expand Down
16 changes: 16 additions & 0 deletions sunbeam-python/tests/unit/sunbeam/features/test_tls.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,21 @@ def is_certificate_valid():
yield p


@pytest.fixture()
def normalize_pem():
with patch.object(tls, "normalize_pem", return_value=b"fake-cert-bytes") as p:
yield p


@pytest.fixture()
Comment thread
Raven-182 marked this conversation as resolved.
def b64_encode_decode():
with (
patch.object(tls.base64, "b64decode", return_value=b"fake-cert-bytes"),
patch.object(tls.base64, "b64encode", return_value=b"fake-cert-encoded"),
):
yield


@pytest.fixture()
def get_outstanding():
with patch.object(
Expand Down Expand Up @@ -236,6 +251,7 @@ def test_prompt(
write_answers,
get_subject_from_csr,
is_certificate_valid,
b64_encode_decode,
):
certs_to_process = [
{
Expand Down
51 changes: 30 additions & 21 deletions sunbeam-python/tests/unit/sunbeam/features/test_utils.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
# SPDX-FileCopyrightText: 2023 - Canonical Ltd
# SPDX-License-Identifier: Apache-2.0

import base64
from unittest.mock import patch

from sunbeam.features.interface.utils import (
decode_base64_as_string,
encode_base64_as_string,
generate_ca_chain,
validate_ca_certificate,
validate_ca_chain,
)


Expand All @@ -23,27 +28,31 @@ def test_generate_ca_chain():
assert ca_chain_decoded == expected_chain


def test_generate_ca_chain_deduplicates_ca_cert_with_crlf():
"""Test that ca_certificate with CRLF endings is not duplicated when
def test_validate_ca_certificate_normalizes_crlf():
"""validate_ca_certificate strips CRLF and returns clean base64."""
cert_crlf = b"-----BEGIN CERTIFICATE-----\r\nDATA\r\n-----END CERTIFICATE-----\r\n"
cert_lf = b"-----BEGIN CERTIFICATE-----\nDATA\n-----END CERTIFICATE-----\n"
encoded = base64.b64encode(cert_crlf).decode()

ca_chain already contains an equivalent certificate with LF endings.
"""
cert1 = "CERT1"
# ca_certificate uses CRLF line endings
cert2_crlf = (
"-----BEGIN CERTIFICATE-----\r\nISSUINGCA\r\n-----END CERTIFICATE-----\r\n"
)
# ca_chain already contains the same cert with LF line endings
cert2_lf = "-----BEGIN CERTIFICATE-----\nISSUINGCA\n-----END CERTIFICATE-----\n"
cert3 = "-----BEGIN CERTIFICATE-----\nROOTCA\n-----END CERTIFICATE-----\n"
ca_chain_input = cert2_lf + "\n" + cert3
with patch("sunbeam.features.interface.utils.x509.load_pem_x509_certificate"):
result = validate_ca_certificate(None, None, encoded)

ca_chain = generate_ca_chain(
encode_base64_as_string(cert1),
encode_base64_as_string(cert2_crlf),
encode_base64_as_string(ca_chain_input),
assert base64.b64decode(result) == cert_lf


def test_validate_ca_chain_normalizes_crlf():
"""validate_ca_chain strips CRLF and returns clean base64."""
chain_crlf = (
b"-----BEGIN CERTIFICATE-----\r\nISSUINGCA\r\n-----END CERTIFICATE-----\r\n"
b"-----BEGIN CERTIFICATE-----\r\nROOTCA\r\n-----END CERTIFICATE-----\r\n"
)
ca_chain_decoded = decode_base64_as_string(ca_chain)
# cert2 must appear only once in the output
assert ca_chain_decoded.count("ISSUINGCA") == 1
assert ca_chain_decoded.count("ROOTCA") == 1
chain_lf = (
b"-----BEGIN CERTIFICATE-----\nISSUINGCA\n-----END CERTIFICATE-----\n"
b"-----BEGIN CERTIFICATE-----\nROOTCA\n-----END CERTIFICATE-----\n"
)
encoded = base64.b64encode(chain_crlf).decode()

with patch("sunbeam.features.interface.utils.x509.load_pem_x509_certificate"):
result = validate_ca_chain(None, None, encoded)

assert base64.b64decode(result) == chain_lf
Loading