Skip to content

Commit

Permalink
chore: Uses the hash of the CSR as a secret label (#188)
Browse files Browse the repository at this point in the history
  • Loading branch information
saltiyazan committed May 31, 2024
1 parent 24b86a1 commit e8fde14
Show file tree
Hide file tree
Showing 2 changed files with 127 additions and 37 deletions.
32 changes: 22 additions & 10 deletions lib/charms/tls_certificates_interface/v3/tls_certificates.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ def _on_all_certificates_invalidated(self, event: AllCertificatesInvalidatedEven

# Increment this PATCH version before using `charmcraft publish-lib` or reset
# to 0 if you are raising the major API version
LIBPATCH = 14
LIBPATCH = 15

PYDEPS = ["cryptography", "jsonschema"]

Expand Down Expand Up @@ -1093,6 +1093,13 @@ def generate_csr( # noqa: C901
return signed_certificate.public_bytes(serialization.Encoding.PEM)


def get_sha256_hex(data: str) -> str:
"""Calculate the hash of the provided data and return the hexadecimal representation."""
digest = hashes.Hash(hashes.SHA256())
digest.update(data.encode())
return digest.finalize().hex()


def csr_matches_certificate(csr: str, cert: str) -> bool:
"""Check if a CSR matches a certificate.
Expand Down Expand Up @@ -1872,12 +1879,15 @@ def _on_relation_changed(self, event: RelationChangedEvent) -> None:
]
for certificate in provider_certificates:
if certificate.csr in requirer_csrs:
csr_in_sha256_hex = get_sha256_hex(certificate.csr)
if certificate.revoked:
with suppress(SecretNotFoundError):
logger.debug(
"Removing secret with label %s", f"{LIBID}-{certificate.csr}"
"Removing secret with label %s",
f"{LIBID}-{csr_in_sha256_hex}",
)
secret = self.model.get_secret(label=f"{LIBID}-{certificate.csr}")
secret = self.model.get_secret(
label=f"{LIBID}-{csr_in_sha256_hex}")
secret.remove_all_revisions()
self.on.certificate_invalidated.emit(
reason="revoked",
Expand All @@ -1889,20 +1899,22 @@ def _on_relation_changed(self, event: RelationChangedEvent) -> None:
else:
try:
logger.debug(
"Setting secret with label %s", f"{LIBID}-{certificate.csr}"
"Setting secret with label %s", f"{LIBID}-{csr_in_sha256_hex}"
)
secret = self.model.get_secret(label=f"{LIBID}-{csr_in_sha256_hex}")
secret.set_content(
{"certificate": certificate.certificate, "csr": certificate.csr}
)
secret = self.model.get_secret(label=f"{LIBID}-{certificate.csr}")
secret.set_content({"certificate": certificate.certificate})
secret.set_info(
expire=self._get_next_secret_expiry_time(certificate),
)
except SecretNotFoundError:
logger.debug(
"Creating new secret with label %s", f"{LIBID}-{certificate.csr}"
"Creating new secret with label %s", f"{LIBID}-{csr_in_sha256_hex}"
)
secret = self.charm.unit.add_secret(
{"certificate": certificate.certificate},
label=f"{LIBID}-{certificate.csr}",
{"certificate": certificate.certificate, "csr": certificate.csr},
label=f"{LIBID}-{csr_in_sha256_hex}",
expire=self._get_next_secret_expiry_time(certificate),
)
self.on.certificate_available.emit(
Expand Down Expand Up @@ -1965,7 +1977,7 @@ def _on_secret_expired(self, event: SecretExpiredEvent) -> None:
"""
if not event.secret.label or not event.secret.label.startswith(f"{LIBID}-"):
return
csr = event.secret.label[len(f"{LIBID}-") :]
csr = event.secret.get_content()["csr"]
provider_certificate = self._find_certificate_in_relation_data(csr)
if not provider_certificate:
# A secret expired but we did not find matching certificate. Cleaning up
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
from unittest.mock import Mock, patch

import pytest
from charms.tls_certificates_interface.v3.tls_certificates import (
get_sha256_hex as get_sha256_hex,
)
from ops import testing

from tests.unit.charms.tls_certificates_interface.v3.certificates import (
Expand Down Expand Up @@ -695,6 +698,7 @@ def test_given_csr_in_unit_relation_data_and_certificate_revoked_in_remote_relat
chain = ["certificate 1", "certiicate 2", "certificate 3"]
csr = "whatever csr"
certificate = "whatever certificate"
patch_load_pem_x509_certificate.return_value = self.setup_mock_certificate_object()
unit_relation_data = {
"certificate_signing_requests": json.dumps([{"certificate_signing_request": csr}])
}
Expand All @@ -711,24 +715,37 @@ def test_given_csr_in_unit_relation_data_and_certificate_revoked_in_remote_relat
"chain": chain,
"certificate_signing_request": csr,
"certificate": certificate,
"revoked": True,
}
]
)
}
secret_id = self.harness.add_model_secret(
owner=self.harness.model.unit.name, content={"certificate": "old data"}
)
secret = self.harness.model.get_secret(id=secret_id)
secret.set_info(label=f"{LIBID}-{csr}")

patch_load_pem_x509_certificate.return_value = self.setup_mock_certificate_object()
self.harness.update_relation_data(
relation_id=relation_id,
app_or_unit=self.remote_app,
key_values=remote_app_relation_data,
)
csr_sha256_hex = get_sha256_hex(csr)
secret = self.harness.model.get_secret(label=f"{LIBID}-{csr_sha256_hex}")
secret_id = secret.get_info().id

revoked_remote_app_relation_data = {
"certificates": json.dumps(
[
{
"ca": ca_certificate,
"chain": chain,
"certificate_signing_request": csr,
"certificate": certificate,
"revoked": True,
}
]
)
}
self.harness.update_relation_data(
relation_id=relation_id,
app_or_unit=self.remote_app,
key_values=revoked_remote_app_relation_data,
)
with pytest.raises(RuntimeError):
self.harness.get_secret_revisions(secret_id)

Expand Down Expand Up @@ -865,7 +882,8 @@ def test_given_csr_in_unit_relation_data_and_certificate_in_remote_relation_data
key_values=remote_app_relation_data,
)

secret = self.harness.model.get_secret(label=f"{LIBID}-{csr}")
csr_sha256_hex = get_sha256_hex(csr)
secret = self.harness.model.get_secret(label=f"{LIBID}-{csr_sha256_hex}")
assert secret.get_content()["certificate"] == certificate
assert secret.get_info().expires == expiry_time - timedelta(hours=168)

Expand Down Expand Up @@ -916,7 +934,8 @@ def test_given_csr_in_unit_relation_data_and_certificate_in_remote_relation_data
key_values=remote_app_relation_data,
)

secret = self.harness.model.get_secret(label=f"{LIBID}-{csr}")
csr_sha256_hex = get_sha256_hex(csr)
secret = self.harness.model.get_secret(label=f"{LIBID}-{csr_sha256_hex}")
assert secret.get_content(refresh=True)["certificate"] == certificate
assert secret.get_info().expires == expiry_time - timedelta(hours=168)

Expand Down Expand Up @@ -1344,7 +1363,8 @@ def test_given_expired_certificate_in_relation_data_when_secret_expired_then_cer
app_or_unit=self.remote_app,
key_values=remote_app_relation_data,
)
secret = self.harness.model.get_secret(label=f"{LIBID}-{csr}")
csr_sha256_hex = get_sha256_hex(csr)
secret = self.harness.model.get_secret(label=f"{LIBID}-{csr_sha256_hex}")

self.harness.trigger_secret_expiration(secret.get_info().id, 0)

Expand Down Expand Up @@ -1400,7 +1420,8 @@ def test_given_expired_certificate_and_other_certificates_in_relation_data_when_
app_or_unit=self.remote_app,
key_values=remote_app_relation_data,
)
secret = self.harness.model.get_secret(label=f"{LIBID}-{csr}")
csr_sha256_hex = get_sha256_hex(csr)
secret = self.harness.model.get_secret(label=f"{LIBID}-{csr_sha256_hex}")

self.harness.trigger_secret_expiration(secret.get_info().id, 0)

Expand Down Expand Up @@ -1450,7 +1471,8 @@ def test_given_expired_certificate_in_relation_data_when_secret_expired_then_sec
app_or_unit=self.remote_app,
key_values=remote_app_relation_data,
)
secret = self.harness.model.get_secret(label=f"{LIBID}-{csr}")
csr_sha256_hex = get_sha256_hex(csr)
secret = self.harness.model.get_secret(label=f"{LIBID}-{csr_sha256_hex}")
secret_id = secret.get_info().id

self.harness.trigger_secret_expiration(secret_id, 0)
Expand Down Expand Up @@ -1499,7 +1521,8 @@ def test_given_almost_expiring_certificate_in_relation_data_when_secret_expired_
app_or_unit=self.remote_app,
key_values=remote_app_relation_data,
)
secret = self.harness.model.get_secret(label=f"{LIBID}-{csr}")
csr_sha256_hex = get_sha256_hex(csr)
secret = self.harness.model.get_secret(label=f"{LIBID}-{csr_sha256_hex}")

self.harness.trigger_secret_expiration(secret.get_info().id, 0)

Expand Down Expand Up @@ -1549,13 +1572,14 @@ def test_given_almost_expiring_certificate_in_relation_data_when_secret_expired_
app_or_unit=self.remote_app,
key_values=remote_app_relation_data,
)
secret = self.harness.model.get_secret(label=f"{LIBID}-{csr}")
csr_sha256_hex = get_sha256_hex(csr)
secret = self.harness.model.get_secret(label=f"{LIBID}-{csr_sha256_hex}")

self.harness.trigger_secret_expiration(secret.get_info().id, 0)

assert secret.get_info().expires == expiry_time

def test_given_secret_not_owner_by_lib_when_secret_expired_then_secret_revisions_are_not_removed( # noqa: E501
def test_given_secret_not_owned_by_lib_when_secret_expired_then_secret_revisions_are_not_removed( # noqa: E501
self,
):
secret_id = self.harness.add_model_secret(
Expand All @@ -1568,23 +1592,69 @@ def test_given_secret_not_owner_by_lib_when_secret_expired_then_secret_revisions

assert self.harness.get_secret_revisions(secret_id)

@patch('cryptography.x509.load_pem_x509_certificate')
def test_given_certificate_not_found_in_relation_data_when_secret_expired_then_secret_revisions_are_removed( # noqa: E501
self,
self, patch_load_pem_x509_certificate
):
secret_id = self.harness.add_model_secret(
owner=self.harness.charm.unit.name, content={"certificate": "brisket"}
relation_id = self.create_certificates_relation()
ca_certificate = "whatever certificate"
chain = ["certificate 1", "certiicate 2", "certificate 3"]
csr = "whatever csr"
certificate = "whatever certificate"
unit_relation_data = {
"certificate_signing_requests": json.dumps([{"certificate_signing_request": csr}])
}
self.harness.update_relation_data(
relation_id=relation_id,
app_or_unit=self.harness.charm.unit.name,
key_values=unit_relation_data,
)
secret = self.harness.model.get_secret(id=secret_id)
secret.set_info(label=f"{LIBID}-brisket")
remote_app_relation_data = {
"certificates": json.dumps(
[
{
"ca": ca_certificate,
"chain": chain,
"certificate_signing_request": csr,
"certificate": certificate,
},
]
)
}
start_time = datetime.combine(datetime.today(), datetime.min.time(), tzinfo=timezone.utc)

expiry_time = start_time - timedelta(seconds=10)

patch_load_pem_x509_certificate.return_value = self.setup_mock_certificate_object(

expiry_time=expiry_time,

start_time=start_time,

)
self.harness.update_relation_data(
relation_id=relation_id,
app_or_unit=self.remote_app,
key_values=remote_app_relation_data,
)
csr_sha256_hex = get_sha256_hex(csr)
secret = self.harness.model.get_secret(label=f"{LIBID}-{csr_sha256_hex}")
secret_id = secret.get_info().id

self.harness.update_relation_data(
relation_id=relation_id,
app_or_unit=self.remote_app,
key_values={},
)

self.harness.trigger_secret_expiration(secret_id, 0)

with pytest.raises(RuntimeError):
self.harness.get_secret_revisions(secret_id)

@patch('cryptography.x509.load_pem_x509_certificate')
def test_given_certificate_invalid_in_relation_data_when_secret_expired_then_secret_revisions_are_removed( # noqa: E501
self,
self, patch_load_pem_x509_certificate
):
relation_id = self.create_certificates_relation()
ca_certificate = "whatever certificate"
Expand All @@ -1611,16 +1681,24 @@ def test_given_certificate_invalid_in_relation_data_when_secret_expired_then_sec
]
)
}
start_time = datetime.combine(datetime.today(), datetime.min.time(), tzinfo=timezone.utc)

expiry_time = start_time - timedelta(seconds=10)

patch_load_pem_x509_certificate.return_value = self.setup_mock_certificate_object(

expiry_time=expiry_time,

start_time=start_time,

)
self.harness.update_relation_data(
relation_id=relation_id,
app_or_unit=self.remote_app,
key_values=remote_app_relation_data,
)
secret_id = self.harness.add_model_secret(
owner=self.harness.charm.unit.name, content={"certificate": certificate}
)
secret = self.harness.model.get_secret(id=secret_id)
secret.set_info(label=f"{LIBID}-{csr}")
csr_sha256_hex = get_sha256_hex(csr)
secret = self.harness.model.get_secret(label=f"{LIBID}-{csr_sha256_hex}")
secret_id = secret.get_info().id

self.harness.trigger_secret_expiration(secret_id, 0)
Expand Down

0 comments on commit e8fde14

Please sign in to comment.