From 3bb4cc1b0f951437a8b1fbc8089b038d92624541 Mon Sep 17 00:00:00 2001 From: Morgan Wowk Date: Thu, 28 May 2026 15:39:20 -0700 Subject: [PATCH] bugsnag: propagate cloud_provider into contextual logging for queued executions Adds CLOUD_PROVIDER_ANNOTATION_KEY to common_annotations so the annotation key has a single definition in the OSS layer. In the queued-execution processing span, reads the cloud_provider value from task_spec annotations (already in memory) and adds it to the contextual logging context when present. The existing _before_notify hook then includes it automatically in the tangle_context tab on every Bugsnag event raised during that execution's processing. --- .../launchers/common_annotations.py | 1 + cloud_pipelines_backend/orchestrator_sql.py | 14 +++++++++++++- 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/cloud_pipelines_backend/launchers/common_annotations.py b/cloud_pipelines_backend/launchers/common_annotations.py index 01ecdd34..951239e2 100644 --- a/cloud_pipelines_backend/launchers/common_annotations.py +++ b/cloud_pipelines_backend/launchers/common_annotations.py @@ -1,6 +1,7 @@ ### TaskSpec annotations that are relevant to multiple launchers. # Annotations added by the Orchestrator +CLOUD_PROVIDER_ANNOTATION_KEY = "cloud-pipelines.net/orchestration/cloud_provider" PIPELINE_RUN_CREATED_BY_ANNOTATION_KEY = ( "cloud-pipelines.net/orchestration/pipeline_run.created_by" ) diff --git a/cloud_pipelines_backend/orchestrator_sql.py b/cloud_pipelines_backend/orchestrator_sql.py index cabb11a8..7f3a7bdd 100644 --- a/cloud_pipelines_backend/orchestrator_sql.py +++ b/cloud_pipelines_backend/orchestrator_sql.py @@ -131,7 +131,7 @@ def internal_process_queued_executions_queue(self, session: orm.Session): self._queued_executions_queue_idle = False start_timestamp = time.monotonic_ns() - with contextual_logging.logging_context(execution_id=queued_execution.id): + with _execution_logging_context(queued_execution): _logger.info("Before processing queued execution") try: self.internal_process_one_queued_execution( @@ -1051,6 +1051,18 @@ def _calculate_container_execution_cache_key( return cache_key +def _execution_logging_context(execution: bts.ExecutionNode): + ctx: dict[str, typing.Any] = {"execution_id": execution.id} + cloud_provider = ( + (execution.task_spec or {}) + .get("annotations", {}) + .get(common_annotations.CLOUD_PROVIDER_ANNOTATION_KEY) + ) + if cloud_provider is not None: + ctx["cloud_provider"] = cloud_provider + return contextual_logging.logging_context(**ctx) + + def _get_current_time() -> datetime.datetime: return datetime.datetime.now(tz=datetime.timezone.utc)