diff --git a/cloud_pipelines_backend/launchers/common_annotations.py b/cloud_pipelines_backend/launchers/common_annotations.py index 01ecdd3..951239e 100644 --- a/cloud_pipelines_backend/launchers/common_annotations.py +++ b/cloud_pipelines_backend/launchers/common_annotations.py @@ -1,6 +1,7 @@ ### TaskSpec annotations that are relevant to multiple launchers. # Annotations added by the Orchestrator +CLOUD_PROVIDER_ANNOTATION_KEY = "cloud-pipelines.net/orchestration/cloud_provider" PIPELINE_RUN_CREATED_BY_ANNOTATION_KEY = ( "cloud-pipelines.net/orchestration/pipeline_run.created_by" ) diff --git a/cloud_pipelines_backend/orchestrator_sql.py b/cloud_pipelines_backend/orchestrator_sql.py index cabb11a..7f3a7bd 100644 --- a/cloud_pipelines_backend/orchestrator_sql.py +++ b/cloud_pipelines_backend/orchestrator_sql.py @@ -131,7 +131,7 @@ def internal_process_queued_executions_queue(self, session: orm.Session): self._queued_executions_queue_idle = False start_timestamp = time.monotonic_ns() - with contextual_logging.logging_context(execution_id=queued_execution.id): + with _execution_logging_context(queued_execution): _logger.info("Before processing queued execution") try: self.internal_process_one_queued_execution( @@ -1051,6 +1051,18 @@ def _calculate_container_execution_cache_key( return cache_key +def _execution_logging_context(execution: bts.ExecutionNode): + ctx: dict[str, typing.Any] = {"execution_id": execution.id} + cloud_provider = ( + (execution.task_spec or {}) + .get("annotations", {}) + .get(common_annotations.CLOUD_PROVIDER_ANNOTATION_KEY) + ) + if cloud_provider is not None: + ctx["cloud_provider"] = cloud_provider + return contextual_logging.logging_context(**ctx) + + def _get_current_time() -> datetime.datetime: return datetime.datetime.now(tz=datetime.timezone.utc)