Skip to content

Commit

Permalink
chore: update charm libraries
Browse files Browse the repository at this point in the history
  • Loading branch information
telcobot committed Mar 8, 2024
1 parent fef2e2e commit d01524c
Showing 1 changed file with 94 additions and 90 deletions.
184 changes: 94 additions & 90 deletions lib/charms/lego_base_k8s/v0/lego_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,11 @@ class ExampleAcmeCharm(AcmeClient):
def __init__(self, *args):
super().__init__(*args, plugin="namecheap")
self._server = "https://acme-staging-v02.api.letsencrypt.org/directory"
self.framework.observe(self.on.config_changed, self._on_config_changed)
def _on_config_changed(self, _):
if not self._validate_registrar_config():
return
if not self.validate_generic_acme_config():
return
self.unit.status = ActiveStatus()
def _validate_plugin_config(self) -> str:
if not self._api_key:
return "API key was not provided"
return ""
@property
def _plugin_config(self):
Expand All @@ -50,25 +47,21 @@ def _plugin_config(self):
- Inherit from AcmeClient
- Call `super().__init__(*args, plugin="")` with the lego plugin name
- Observe `ConfigChanged` to a method called `_on_config_changed`
- `_on_config_changed` must follow those requirements:
- Validate its specific configuration, blocking if invalid
- Validate generic configuration, by calling `self.validate_generic_acme_config()`,
returning immediately when it returns `False`
- Sets the status to Active
- Accept any kind of events
- Implement the `_validate_plugin_config` method,
it should validate the plugin specific configuration,
returning a string with an error message if the
plugin specific configuration is invalid, otherwise an empty string.
- Implement the `_plugin_config` property, returning a dictionary of its specific
configuration. Keys must be capitalized and follow the plugins documentation from
lego.
Charms that leverage this library also need to specify a `certificates` integration in their
`metadata.yaml` file:
- Specify a `certificates` integration in their
`metadata.yaml` file:
```yaml
provides:
certificates:
interface: tls-certificates
```
They shouldalso specify a `logging` integration in their `metadata.yaml` file:
- Specify a `logging` integration in their `metadata.yaml` file:
```yaml
requires:
logging:
Expand All @@ -79,7 +72,7 @@ def _plugin_config(self):
import logging
import re
from abc import abstractmethod
from typing import Dict, List, Optional, Union
from typing import Dict, List, Optional
from urllib.parse import urlparse

from charms.loki_k8s.v1.loki_push_api import LogForwarder
Expand All @@ -89,9 +82,9 @@ def _plugin_config(self):
)
from cryptography import x509
from cryptography.x509.oid import NameOID
from ops.charm import CharmBase
from ops.charm import CharmBase, CollectStatusEvent
from ops.framework import EventBase
from ops.model import ActiveStatus, BlockedStatus
from ops.model import ActiveStatus, BlockedStatus, WaitingStatus
from ops.pebble import ExecError

# The unique Charmhub library identifier, never change it
Expand All @@ -102,7 +95,7 @@ def _plugin_config(self):

# Increment this PATCH version before using `charmcraft publish-lib` or reset
# to 0 if you are raising the major API version
LIBPATCH = 6
LIBPATCH = 7


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -130,44 +123,89 @@ def __init__(self, *args, plugin: str):
self.tls_certificates.on.certificate_creation_request,
self._on_certificate_creation_request,
)
self.framework.observe(self.on.config_changed, self._sync_certificates)
self.framework.observe(self.on.update_status, self._sync_certificates)
self.framework.observe(self.on.collect_unit_status, self._on_collect_status)
self._plugin = plugin

def validate_generic_acme_config(self) -> bool:
"""Validate generic ACME config."""
if not self._email:
self.unit.status = BlockedStatus("Email address was not provided")
return False
if not self._server:
self.unit.status = BlockedStatus("ACME server was not provided")
return False
if not self._email_is_valid(self._email):
self.unit.status = BlockedStatus("Invalid email address")
return False
if not self._server_is_valid(self._server):
self.unit.status = BlockedStatus("Invalid ACME server")
return False
return True
def _on_collect_status(self, event: CollectStatusEvent) -> None:
"""Handle the collect status event."""
if not self._container.can_connect():
event.add_status(WaitingStatus("Waiting to be able to connect to vault unit"))
return

if (err := self.validate_generic_acme_config()):
event.add_status(
BlockedStatus(err)
)
return
if (err := self._validate_plugin_config()):
event.add_status(
BlockedStatus(err)
)
return
event.add_status(ActiveStatus())

def _sync_certificates(self, event: EventBase) -> None:
"""Go through all the certificates relations and handle outstanding requests."""
if not self._container.can_connect():
return
if (err := self.validate_generic_acme_config()):
logger.error(err)
return
if (err := self._validate_plugin_config()):
logger.error(err)
return
for relation in self.model.relations.get(CERTIFICATES_RELATION_NAME, []):
outstanding_requests = self.tls_certificates.get_outstanding_certificate_requests(
relation_id=relation.id
)
for request in outstanding_requests:
self._generate_signed_certificate(
csr=request.csr,
relation_id=relation.id,
)

def _on_certificate_creation_request(self, event: CertificateCreationRequestEvent) -> None:
"""Handle certificate creation request event."""
if not self._container.can_connect():
return
if (err := self.validate_generic_acme_config()):
logger.error(err)
return
if (err := self._validate_plugin_config()):
logger.error(err)
return
self._generate_signed_certificate(event.certificate_signing_request, event.relation_id)

@abstractmethod
def _on_config_changed(self, event: EventBase) -> None:
"""Validate configuration and sets status accordingly.
def _validate_plugin_config(self) -> str:
"""Validate plugin specific configuration.
Implementations need to follow the following steps:
Implementations need to validate the plugin specific configuration
And return either an empty string if valid
Or an the status message if invalid.
1. Validate their specific configuration, setting the status
to `Blocked` if invalid and returning immediately.
2. Validate generic configuration by calling
`self.validate_generic_acme_config()`, returning immediately
if it returns `False`.
3. Set the status to `Active` and return.
Returns:
str: Error message if invalid, otherwise an empty string.
"""
pass

Args:
event (EventBase): Any Juju event
def validate_generic_acme_config(self) -> str:
"""Validate generic ACME config.
Returns:
None
str: Error message if invalid, otherwise an empty string.
"""
if not self._email:
return "Email address was not provided"
if not self._server:
return "ACME server was not provided"
if not self._email_is_valid(self._email):
return "Invalid email address"
if not self._server_is_valid(self._server):
return "Invalid ACME server"
return ""

@staticmethod
def _get_subject_from_csr(certificate_signing_request: str) -> str:
Expand All @@ -177,7 +215,7 @@ def _get_subject_from_csr(certificate_signing_request: str) -> str:
if isinstance(subject_value, bytes):
return subject_value.decode()
else:
return subject_value
return str(subject_value)

def _push_csr_to_workload(self, csr: str) -> None:
"""Push CSR to workload container."""
Expand All @@ -193,53 +231,16 @@ def _execute_lego_cmd(self) -> bool:
logger.info(f"Return message: {stdout}, {error}")
except ExecError as e:
logger.error("Exited with code %d. Stderr:", e.exit_code)
for line in e.stderr.splitlines():
for line in e.stderr.splitlines(): # type: ignore
logger.error(" %s", line)
return False
return True

def _pull_certificates_from_workload(self, csr_subject: str) -> List[Union[bytes, str]]:
def _pull_certificates_from_workload(self, csr_subject: str) -> List[str]:
"""Pull certificates from workload container."""
chain_pem = self._container.pull(path=f"{self._certs_path}{csr_subject}.crt")
return list(chain_pem.read().split("\n\n"))

def _sync_certificates(self, event: EventBase) -> None:
"""Goes through all the certificates relations and handles outstanding requests."""
self._on_config_changed(event)
if not isinstance(self.unit.status, ActiveStatus):
logger.debug(
"Charm is not active, skipping certificate generation, \
will try again in during the next update status event."
)
return
for relation in self.model.relations.get(CERTIFICATES_RELATION_NAME, []):
outstanding_requests = self.tls_certificates.get_outstanding_certificate_requests(
relation_id=relation.id
)
for request in outstanding_requests:
self._generate_signed_certificate(
csr=request.csr,
relation_id=relation.id,
)

def _on_certificate_creation_request(self, event: CertificateCreationRequestEvent) -> None:
"""Handle certificate creation request event.
- Retrieves subject from CSR
- Pushes CSR to workload container
- Executes lego command in workload
- Pulls certificates from workload
- Sends certificates to requesting charm
"""
self._on_config_changed(event)
if not isinstance(self.unit.status, ActiveStatus):
logger.debug(
"Charm is not active, skipping certificate generation, \
will try again in during the next update status event."
)
return
self._generate_signed_certificate(event.certificate_signing_request, event.relation_id)

def _generate_signed_certificate(self, csr: str, relation_id: int):
"""Generate signed certificate from the ACME provider."""
if not self.unit.is_leader():
Expand All @@ -250,7 +251,10 @@ def _generate_signed_certificate(self, csr: str, relation_id: int):
return
csr_subject = self._get_subject_from_csr(csr)
if len(csr_subject) > 64:
logger.error("Subject is too long (> 64 characters): %s", csr_subject)
logger.error(
"Failed to request certificate, \
Subject is too long (> 64 characters): %s", csr_subject
)
return
logger.info("Received Certificate Creation Request for domain %s", csr_subject)
self._push_csr_to_workload(csr=csr)
Expand Down

0 comments on commit d01524c

Please sign in to comment.