Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Uses the hash of the CSR as a secret label #188

Merged
merged 1 commit into from
May 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 22 additions & 10 deletions lib/charms/tls_certificates_interface/v3/tls_certificates.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ def _on_all_certificates_invalidated(self, event: AllCertificatesInvalidatedEven

# Increment this PATCH version before using `charmcraft publish-lib` or reset
# to 0 if you are raising the major API version
LIBPATCH = 14
LIBPATCH = 15

PYDEPS = ["cryptography", "jsonschema"]

Expand Down Expand Up @@ -1093,6 +1093,13 @@ def generate_csr( # noqa: C901
return signed_certificate.public_bytes(serialization.Encoding.PEM)


def get_sha256_hex(data: str) -> str:
"""Calculate the hash of the provided data and return the hexadecimal representation."""
digest = hashes.Hash(hashes.SHA256())
digest.update(data.encode())
return digest.finalize().hex()


def csr_matches_certificate(csr: str, cert: str) -> bool:
"""Check if a CSR matches a certificate.

Expand Down Expand Up @@ -1872,12 +1879,15 @@ def _on_relation_changed(self, event: RelationChangedEvent) -> None:
]
for certificate in provider_certificates:
if certificate.csr in requirer_csrs:
csr_in_sha256_hex = get_sha256_hex(certificate.csr)
if certificate.revoked:
with suppress(SecretNotFoundError):
logger.debug(
"Removing secret with label %s", f"{LIBID}-{certificate.csr}"
"Removing secret with label %s",
f"{LIBID}-{csr_in_sha256_hex}",
)
secret = self.model.get_secret(label=f"{LIBID}-{certificate.csr}")
secret = self.model.get_secret(
label=f"{LIBID}-{csr_in_sha256_hex}")
secret.remove_all_revisions()
self.on.certificate_invalidated.emit(
reason="revoked",
Expand All @@ -1889,20 +1899,22 @@ def _on_relation_changed(self, event: RelationChangedEvent) -> None:
else:
try:
logger.debug(
"Setting secret with label %s", f"{LIBID}-{certificate.csr}"
"Setting secret with label %s", f"{LIBID}-{csr_in_sha256_hex}"
)
secret = self.model.get_secret(label=f"{LIBID}-{csr_in_sha256_hex}")
secret.set_content(
{"certificate": certificate.certificate, "csr": certificate.csr}
)
secret = self.model.get_secret(label=f"{LIBID}-{certificate.csr}")
secret.set_content({"certificate": certificate.certificate})
secret.set_info(
expire=self._get_next_secret_expiry_time(certificate),
)
except SecretNotFoundError:
logger.debug(
"Creating new secret with label %s", f"{LIBID}-{certificate.csr}"
"Creating new secret with label %s", f"{LIBID}-{csr_in_sha256_hex}"
)
secret = self.charm.unit.add_secret(
{"certificate": certificate.certificate},
label=f"{LIBID}-{certificate.csr}",
{"certificate": certificate.certificate, "csr": certificate.csr},
label=f"{LIBID}-{csr_in_sha256_hex}",
expire=self._get_next_secret_expiry_time(certificate),
)
self.on.certificate_available.emit(
Expand Down Expand Up @@ -1965,7 +1977,7 @@ def _on_secret_expired(self, event: SecretExpiredEvent) -> None:
"""
if not event.secret.label or not event.secret.label.startswith(f"{LIBID}-"):
return
csr = event.secret.label[len(f"{LIBID}-") :]
csr = event.secret.get_content()["csr"]
provider_certificate = self._find_certificate_in_relation_data(csr)
if not provider_certificate:
# A secret expired but we did not find matching certificate. Cleaning up
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
from unittest.mock import Mock, patch

import pytest
from charms.tls_certificates_interface.v3.tls_certificates import (
get_sha256_hex as get_sha256_hex,
)
from ops import testing

from tests.unit.charms.tls_certificates_interface.v3.certificates import (
Expand Down Expand Up @@ -695,6 +698,7 @@ def test_given_csr_in_unit_relation_data_and_certificate_revoked_in_remote_relat
chain = ["certificate 1", "certiicate 2", "certificate 3"]
csr = "whatever csr"
certificate = "whatever certificate"
patch_load_pem_x509_certificate.return_value = self.setup_mock_certificate_object()
unit_relation_data = {
"certificate_signing_requests": json.dumps([{"certificate_signing_request": csr}])
}
Expand All @@ -711,24 +715,37 @@ def test_given_csr_in_unit_relation_data_and_certificate_revoked_in_remote_relat
"chain": chain,
"certificate_signing_request": csr,
"certificate": certificate,
"revoked": True,
}
]
)
}
secret_id = self.harness.add_model_secret(
owner=self.harness.model.unit.name, content={"certificate": "old data"}
)
secret = self.harness.model.get_secret(id=secret_id)
secret.set_info(label=f"{LIBID}-{csr}")

patch_load_pem_x509_certificate.return_value = self.setup_mock_certificate_object()
self.harness.update_relation_data(
relation_id=relation_id,
app_or_unit=self.remote_app,
key_values=remote_app_relation_data,
)
csr_sha256_hex = get_sha256_hex(csr)
secret = self.harness.model.get_secret(label=f"{LIBID}-{csr_sha256_hex}")
secret_id = secret.get_info().id

revoked_remote_app_relation_data = {
"certificates": json.dumps(
[
{
"ca": ca_certificate,
"chain": chain,
"certificate_signing_request": csr,
"certificate": certificate,
"revoked": True,
}
]
)
}
self.harness.update_relation_data(
relation_id=relation_id,
app_or_unit=self.remote_app,
key_values=revoked_remote_app_relation_data,
)
with pytest.raises(RuntimeError):
self.harness.get_secret_revisions(secret_id)

Expand Down Expand Up @@ -865,7 +882,8 @@ def test_given_csr_in_unit_relation_data_and_certificate_in_remote_relation_data
key_values=remote_app_relation_data,
)

secret = self.harness.model.get_secret(label=f"{LIBID}-{csr}")
csr_sha256_hex = get_sha256_hex(csr)
secret = self.harness.model.get_secret(label=f"{LIBID}-{csr_sha256_hex}")
assert secret.get_content()["certificate"] == certificate
assert secret.get_info().expires == expiry_time - timedelta(hours=168)

Expand Down Expand Up @@ -916,7 +934,8 @@ def test_given_csr_in_unit_relation_data_and_certificate_in_remote_relation_data
key_values=remote_app_relation_data,
)

secret = self.harness.model.get_secret(label=f"{LIBID}-{csr}")
csr_sha256_hex = get_sha256_hex(csr)
secret = self.harness.model.get_secret(label=f"{LIBID}-{csr_sha256_hex}")
assert secret.get_content(refresh=True)["certificate"] == certificate
assert secret.get_info().expires == expiry_time - timedelta(hours=168)

Expand Down Expand Up @@ -1344,7 +1363,8 @@ def test_given_expired_certificate_in_relation_data_when_secret_expired_then_cer
app_or_unit=self.remote_app,
key_values=remote_app_relation_data,
)
secret = self.harness.model.get_secret(label=f"{LIBID}-{csr}")
csr_sha256_hex = get_sha256_hex(csr)
secret = self.harness.model.get_secret(label=f"{LIBID}-{csr_sha256_hex}")

self.harness.trigger_secret_expiration(secret.get_info().id, 0)

Expand Down Expand Up @@ -1400,7 +1420,8 @@ def test_given_expired_certificate_and_other_certificates_in_relation_data_when_
app_or_unit=self.remote_app,
key_values=remote_app_relation_data,
)
secret = self.harness.model.get_secret(label=f"{LIBID}-{csr}")
csr_sha256_hex = get_sha256_hex(csr)
secret = self.harness.model.get_secret(label=f"{LIBID}-{csr_sha256_hex}")

self.harness.trigger_secret_expiration(secret.get_info().id, 0)

Expand Down Expand Up @@ -1450,7 +1471,8 @@ def test_given_expired_certificate_in_relation_data_when_secret_expired_then_sec
app_or_unit=self.remote_app,
key_values=remote_app_relation_data,
)
secret = self.harness.model.get_secret(label=f"{LIBID}-{csr}")
csr_sha256_hex = get_sha256_hex(csr)
secret = self.harness.model.get_secret(label=f"{LIBID}-{csr_sha256_hex}")
secret_id = secret.get_info().id

self.harness.trigger_secret_expiration(secret_id, 0)
Expand Down Expand Up @@ -1499,7 +1521,8 @@ def test_given_almost_expiring_certificate_in_relation_data_when_secret_expired_
app_or_unit=self.remote_app,
key_values=remote_app_relation_data,
)
secret = self.harness.model.get_secret(label=f"{LIBID}-{csr}")
csr_sha256_hex = get_sha256_hex(csr)
secret = self.harness.model.get_secret(label=f"{LIBID}-{csr_sha256_hex}")

self.harness.trigger_secret_expiration(secret.get_info().id, 0)

Expand Down Expand Up @@ -1549,13 +1572,14 @@ def test_given_almost_expiring_certificate_in_relation_data_when_secret_expired_
app_or_unit=self.remote_app,
key_values=remote_app_relation_data,
)
secret = self.harness.model.get_secret(label=f"{LIBID}-{csr}")
csr_sha256_hex = get_sha256_hex(csr)
secret = self.harness.model.get_secret(label=f"{LIBID}-{csr_sha256_hex}")

self.harness.trigger_secret_expiration(secret.get_info().id, 0)

assert secret.get_info().expires == expiry_time

def test_given_secret_not_owner_by_lib_when_secret_expired_then_secret_revisions_are_not_removed( # noqa: E501
def test_given_secret_not_owned_by_lib_when_secret_expired_then_secret_revisions_are_not_removed( # noqa: E501
self,
):
secret_id = self.harness.add_model_secret(
Expand All @@ -1568,23 +1592,69 @@ def test_given_secret_not_owner_by_lib_when_secret_expired_then_secret_revisions

assert self.harness.get_secret_revisions(secret_id)

@patch('cryptography.x509.load_pem_x509_certificate')
def test_given_certificate_not_found_in_relation_data_when_secret_expired_then_secret_revisions_are_removed( # noqa: E501
self,
self, patch_load_pem_x509_certificate
):
secret_id = self.harness.add_model_secret(
owner=self.harness.charm.unit.name, content={"certificate": "brisket"}
relation_id = self.create_certificates_relation()
ca_certificate = "whatever certificate"
chain = ["certificate 1", "certiicate 2", "certificate 3"]
csr = "whatever csr"
certificate = "whatever certificate"
unit_relation_data = {
"certificate_signing_requests": json.dumps([{"certificate_signing_request": csr}])
}
self.harness.update_relation_data(
relation_id=relation_id,
app_or_unit=self.harness.charm.unit.name,
key_values=unit_relation_data,
)
secret = self.harness.model.get_secret(id=secret_id)
secret.set_info(label=f"{LIBID}-brisket")
remote_app_relation_data = {
"certificates": json.dumps(
[
{
"ca": ca_certificate,
"chain": chain,
"certificate_signing_request": csr,
"certificate": certificate,
},
]
)
}
start_time = datetime.combine(datetime.today(), datetime.min.time(), tzinfo=timezone.utc)

expiry_time = start_time - timedelta(seconds=10)

patch_load_pem_x509_certificate.return_value = self.setup_mock_certificate_object(

expiry_time=expiry_time,

start_time=start_time,

)
self.harness.update_relation_data(
relation_id=relation_id,
app_or_unit=self.remote_app,
key_values=remote_app_relation_data,
)
csr_sha256_hex = get_sha256_hex(csr)
secret = self.harness.model.get_secret(label=f"{LIBID}-{csr_sha256_hex}")
secret_id = secret.get_info().id

self.harness.update_relation_data(
relation_id=relation_id,
app_or_unit=self.remote_app,
key_values={},
)

self.harness.trigger_secret_expiration(secret_id, 0)

with pytest.raises(RuntimeError):
self.harness.get_secret_revisions(secret_id)

@patch('cryptography.x509.load_pem_x509_certificate')
def test_given_certificate_invalid_in_relation_data_when_secret_expired_then_secret_revisions_are_removed( # noqa: E501
self,
self, patch_load_pem_x509_certificate
):
relation_id = self.create_certificates_relation()
ca_certificate = "whatever certificate"
Expand All @@ -1611,16 +1681,24 @@ def test_given_certificate_invalid_in_relation_data_when_secret_expired_then_sec
]
)
}
start_time = datetime.combine(datetime.today(), datetime.min.time(), tzinfo=timezone.utc)

expiry_time = start_time - timedelta(seconds=10)

patch_load_pem_x509_certificate.return_value = self.setup_mock_certificate_object(

expiry_time=expiry_time,

start_time=start_time,

)
self.harness.update_relation_data(
relation_id=relation_id,
app_or_unit=self.remote_app,
key_values=remote_app_relation_data,
)
secret_id = self.harness.add_model_secret(
owner=self.harness.charm.unit.name, content={"certificate": certificate}
)
secret = self.harness.model.get_secret(id=secret_id)
secret.set_info(label=f"{LIBID}-{csr}")
csr_sha256_hex = get_sha256_hex(csr)
secret = self.harness.model.get_secret(label=f"{LIBID}-{csr_sha256_hex}")
secret_id = secret.get_info().id

self.harness.trigger_secret_expiration(secret_id, 0)
Expand Down