Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 30 additions & 23 deletions lib/charms/tempo_coordinator_k8s/v0/charm_tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,20 +266,20 @@ def _remove_stale_otel_sdk_packages():
if name.startswith("opentelemetry_"):
otel_distributions[name].append(distribution)

otel_logger.debug(f"Found {len(otel_distributions)} opentelemetry distributions")
otel_logger.debug("Found %d opentelemetry distributions", len(otel_distributions))

# If we have multiple distributions with the same name, remove any that have 0 associated files
for name, distributions_ in otel_distributions.items():
if len(distributions_) <= 1:
continue

otel_logger.debug(
f"Package {name} has multiple ({len(distributions_)}) distributions."
"Package %s has multiple (%d) distributions.", name, len(distributions_)
)
for distribution in distributions_:
if not distribution.files: # Not None or empty list
path = distribution._path # type: ignore
otel_logger.info(f"Removing empty distribution of {name} at {path}.")
otel_logger.info("Removing empty distribution of %s at %s.", name, path)
shutil.rmtree(path)

otel_logger.debug("Successfully applied _remove_stale_otel_sdk_packages patch. ")
Expand Down Expand Up @@ -350,7 +350,7 @@ def _remove_stale_otel_sdk_packages():
# Increment this PATCH version before using `charmcraft publish-lib` or reset
# to 0 if you are raising the major API version

LIBPATCH = 10
LIBPATCH = 11

PYDEPS = ["opentelemetry-exporter-otlp-proto-http==1.21.0"]

Expand Down Expand Up @@ -430,7 +430,8 @@ def _prune(self, queue: Sequence[bytes]) -> Sequence[bytes]:
if overflow > 0:
n_dropped_spans += overflow
logger.warning(
f"charm tracing buffer exceeds max history length ({self._max_event_history_length} events)"
"charm tracing buffer exceeds max history length (%d events)",
self._max_event_history_length,
)

new_spans = deque(queue[-self._max_event_history_length :])
Expand All @@ -446,19 +447,21 @@ def _prune(self, queue: Sequence[bytes]) -> Sequence[bytes]:
# only do this once
if not logged_drop:
logger.warning(
f"charm tracing buffer exceeds size limit ({self._max_buffer_size_mib}MiB)."
"charm tracing buffer exceeds size limit (%dMiB).",
self._max_buffer_size_mib,
)
logged_drop = True

if n_dropped_spans > 0:
dev_logger.debug(
f"charm tracing buffer overflow: dropped {n_dropped_spans} older spans. "
f"Please increase the buffer limits, or ensure the spans can be flushed."
"charm tracing buffer overflow: dropped %d older spans. "
"Please increase the buffer limits, or ensure the spans can be flushed.",
n_dropped_spans,
)
return new_spans

def _save(self, spans: Sequence[ReadableSpan], replace: bool = False):
dev_logger.debug(f"saving {len(spans)} new spans to buffer")
dev_logger.debug("saving %d new spans to buffer", len(spans))
old = [] if replace else self.load()
queue = old + [self._serialize(spans)]
new_buffer = self._prune(queue)
Expand All @@ -480,7 +483,7 @@ def _write(self, spans: Sequence[bytes]):
# ensure the destination folder exists
db_file_dir = self._db_file.parent
if not db_file_dir.exists():
dev_logger.info(f"creating buffer dir: {db_file_dir}")
dev_logger.info("creating buffer dir: %s", db_file_dir)
db_file_dir.mkdir(parents=True)

self._db_file.write_bytes(self._SPANSEP.join(spans))
Expand All @@ -496,15 +499,15 @@ def load(self) -> List[bytes]:
try:
spans = self._db_file.read_bytes().split(self._SPANSEP)
except Exception:
logger.exception(f"error parsing {self._db_file}")
logger.exception("error parsing %s", self._db_file)
return []
return spans

def drop(self, n_spans: Optional[int] = None):
"""Drop some currently buffered spans from the cache file."""
current = self.load()
if n_spans:
dev_logger.debug(f"dropping {n_spans} spans from buffer")
dev_logger.debug("dropping %d spans from buffer", n_spans)
new = current[n_spans:]
else:
dev_logger.debug("emptying buffer")
Expand Down Expand Up @@ -693,7 +696,7 @@ def _get_tracing_endpoint(
)

dev_logger.debug(
f"Setting up span exporter to endpoint: {tracing_endpoint}/v1/traces"
"Setting up span exporter to endpoint: %s/v1/traces", tracing_endpoint
)
return f"{tracing_endpoint}/v1/traces"

Expand All @@ -711,13 +714,17 @@ def _get_server_cert(

if server_cert is None:
logger.warning(
f"{charm_type}.{server_cert_attr} is None; sending traces over INSECURE connection."
"%s.%s is None; sending traces over INSECURE connection.",
charm_type,
server_cert_attr,
)
return
if not isinstance(server_cert, (str, Path)):
logger.warning(
f"{charm_type}.{server_cert_attr} has unexpected type {type(server_cert)}; "
f"sending traces over INSECURE connection."
"%s.%s has unexpected type %s; sending traces over INSECURE connection.",
charm_type,
server_cert_attr,
type(server_cert),
)
return
path = Path(server_cert)
Expand Down Expand Up @@ -862,13 +869,13 @@ def wrap_init(self: CharmBase, framework: Framework, *args, **kwargs):

# log a trace id, so we can pick it up from the logs (and jhack) to look it up in tempo.
root_trace_id = hex(span.get_span_context().trace_id)[2:] # strip 0x prefix
logger.debug(f"Starting root trace with id={root_trace_id!r}.")
logger.debug("Starting root trace with id=%r.", root_trace_id)

span_token = opentelemetry.context.attach(ctx) # type: ignore

@contextmanager
def wrap_event_context(event_name: str):
dev_logger.debug(f"entering event context: {event_name}")
dev_logger.debug("entering event context: %s", event_name)
# when the framework enters an event context, we create a span.
with _span("event: " + event_name) as event_context_span:
if event_context_span:
Expand Down Expand Up @@ -1059,7 +1066,7 @@ def _autoinstrument(
Minimum 10MiB.
:param buffer_path: path to buffer file to use for saving buffered spans.
"""
dev_logger.debug(f"instrumenting {charm_type}")
dev_logger.debug("instrumenting %s", charm_type)
_setup_root_span_initializer(
charm_type,
tracing_endpoint_attr,
Expand All @@ -1083,12 +1090,12 @@ def trace_type(cls: _T) -> _T:
It assumes that this class is only instantiated after a charm type decorated with `@trace_charm`
has been instantiated.
"""
dev_logger.debug(f"instrumenting {cls}")
dev_logger.debug("instrumenting %s", cls)
for name, method in inspect.getmembers(cls, predicate=inspect.isfunction):
dev_logger.debug(f"discovered {method}")
dev_logger.debug("discovered %s", method)

if method.__name__.startswith("__"):
dev_logger.debug(f"skipping {method} (dunder)")
dev_logger.debug("skipping %s (dunder)", method)
continue

# the span title in the general case should be:
Expand Down Expand Up @@ -1134,7 +1141,7 @@ def trace_function(function: _F, name: Optional[str] = None) -> _F:


def _trace_callable(callable: _F, qualifier: str, name: Optional[str] = None) -> _F:
dev_logger.debug(f"instrumenting {callable}")
dev_logger.debug("instrumenting %s", callable)

# sig = inspect.signature(callable)
@functools.wraps(callable)
Expand Down
14 changes: 13 additions & 1 deletion src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
from charms.operator_libs_linux.v2 import snap
from charms.rolling_ops.v0.rollingops import RollingOpsManager, RunWithLock
from charms.tempo_coordinator_k8s.v0.charm_tracing import trace_charm
from cryptography.x509 import load_pem_x509_certificate
from cryptography.x509.oid import NameOID
from ops import (
ActionEvent,
ActiveStatus,
Expand Down Expand Up @@ -378,6 +380,17 @@ def _post_snap_refresh(self, refresh: charm_refresh.Machines):

Called after snap refresh
"""
try:
if raw_cert := self.get_secret(UNIT_SCOPE, "internal-cert"):
cert = load_pem_x509_certificate(raw_cert.encode())
if (
cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value
!= self._unit_ip
):
self.tls.generate_internal_peer_cert()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add INFO-like message to log to see when cert is regenerated (to simplify production troubleshooting). Warning is also fine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added info log inside generate_internal_peer_cert().

except Exception:
logger.exception("Unable to check or update internal cert")
Comment on lines +383 to +392
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regenerate the cert if the common name is not the IP before the upgrade.


if not self._patroni.start_patroni():
self.set_unit_status(ops.BlockedStatus("Failed to start PostgreSQL"), refresh=refresh)
return
Expand Down Expand Up @@ -1403,7 +1416,6 @@ def _on_leader_elected(self, event: LeaderElectedEvent) -> None: # noqa: C901

if not self.get_secret(APP_SCOPE, "internal-ca"):
self.tls.generate_internal_peer_ca()
self.tls.generate_internal_peer_cert()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the internal cert generates before the IP is set to the peer data, the hostname will be used as common name. It also causes issues on Juju 4.

self.update_config()

# Don't update connection endpoints in the first time this event run for
Expand Down
16 changes: 10 additions & 6 deletions src/relations/tls.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,21 +71,22 @@ def _get_peer_addrs(self) -> set[str]:
peer_addrs.add(addr)
return peer_addrs

def _get_common_name(self):
def _get_common_name(self) -> str:
return self.charm.unit_peer_data.get("database-address") or self.host

def _get_peer_common_name(self) -> str:
return self.charm.unit_peer_data.get("database-peers-address") or self.host
Comment on lines +77 to +78
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should use the peers address here, in case it's different from the relation address.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, as we might have different Juju spaces and for the purpose of using the certificates for backup on replicas, what makes sense is the peer address.


def __init__(self, charm: "PostgresqlOperatorCharm", peer_relation: str):
super().__init__(charm, "client-relations")
self.charm = charm
self.peer_relation = peer_relation
unit_id = self.charm.unit.name.split("/")[1]
self.host = f"{self.charm.app.name}-{unit_id}"
if self.charm.unit_peer_data:
common_name = self._get_common_name()
client_addresses = self._get_client_addrs()
peer_addresses = self._get_peer_addrs()
else:
common_name = self.host
client_addresses = set()
peer_addresses = set()
self.common_hosts = {self.host}
Expand All @@ -97,7 +98,7 @@ def __init__(self, charm: "PostgresqlOperatorCharm", peer_relation: str):
TLS_CLIENT_RELATION,
certificate_requests=[
CertificateRequestAttributes(
common_name=common_name,
common_name=self._get_common_name(),
sans_ip=frozenset(client_addresses),
sans_dns=frozenset({
*self.common_hosts,
Expand All @@ -114,7 +115,7 @@ def __init__(self, charm: "PostgresqlOperatorCharm", peer_relation: str):
TLS_PEER_RELATION,
certificate_requests=[
CertificateRequestAttributes(
common_name=common_name,
common_name=self._get_peer_common_name(),
sans_ip=frozenset(self._get_peer_addrs()),
sans_dns=frozenset({
*self.common_hosts,
Expand Down Expand Up @@ -239,7 +240,7 @@ def generate_internal_peer_cert(self) -> None:
private_key = generate_private_key()
csr = generate_csr(
private_key,
common_name=self._get_common_name(),
common_name=self._get_peer_common_name(),
sans_ip=frozenset(self._get_peer_addrs()),
sans_dns=frozenset({
*self.common_hosts,
Expand All @@ -252,3 +253,6 @@ def generate_internal_peer_cert(self) -> None:
self.charm.set_secret(UNIT_SCOPE, "internal-key", str(private_key))
self.charm.set_secret(UNIT_SCOPE, "internal-cert", str(cert))
self.charm.push_tls_files_to_workload()
logger.info(
"Internal peer certificate generated. Please use a proper TLS operator if possible."
)
19 changes: 12 additions & 7 deletions tests/integration/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -1215,9 +1215,13 @@ async def backup_operations(
charm,
) -> None:
"""Basic set of operations for backup testing in different cloud providers."""
use_tls = all([tls_certificates_app_name, tls_config, tls_channel])
# Deploy S3 Integrator and TLS Certificates Operator.
await ops_test.model.deploy(s3_integrator_app_name)
await ops_test.model.deploy(tls_certificates_app_name, config=tls_config, channel=tls_channel)
if use_tls:
await ops_test.model.deploy(
tls_certificates_app_name, config=tls_config, channel=tls_channel
)

# Deploy and relate PostgreSQL to S3 integrator (one database app for each cloud for now
# as archive_mode is disabled after restoring the backup) and to TLS Certificates Operator
Expand All @@ -1231,12 +1235,13 @@ async def backup_operations(
config={"profile": "testing"},
)

await ops_test.model.relate(
f"{database_app_name}:client-certificates", f"{tls_certificates_app_name}:certificates"
)
await ops_test.model.relate(
f"{database_app_name}:peer-certificates", f"{tls_certificates_app_name}:certificates"
)
if use_tls:
await ops_test.model.relate(
f"{database_app_name}:client-certificates", f"{tls_certificates_app_name}:certificates"
)
await ops_test.model.relate(
f"{database_app_name}:peer-certificates", f"{tls_certificates_app_name}:certificates"
)
async with ops_test.fast_forward(fast_interval="60s"):
await ops_test.model.wait_for_idle(apps=[database_app_name], status="active", timeout=1000)

Expand Down
9 changes: 3 additions & 6 deletions tests/integration/test_backups_ceph.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@
logger = logging.getLogger(__name__)

S3_INTEGRATOR_APP_NAME = "s3-integrator"
tls_certificates_app_name = "self-signed-certificates"
tls_channel = "latest/stable"
tls_config = {"ca-common-name": "Test CA"}

backup_id, value_before_backup, value_after_backup = "", None, None

Expand Down Expand Up @@ -167,9 +164,9 @@ async def test_backup_ceph(ops_test: OpsTest, cloud_configs, cloud_credentials,
await backup_operations(
ops_test,
S3_INTEGRATOR_APP_NAME,
tls_certificates_app_name,
tls_config,
tls_channel,
None,
None,
None,
Comment on lines +167 to +169
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test on microceph with the internal certs, without using a TLS operator.

cloud_credentials,
"ceph",
cloud_configs,
Expand Down