From c802f2ba7c6c7ab4edfb4d74af56c8766e3241e4 Mon Sep 17 00:00:00 2001 From: Morgan Wowk Date: Fri, 22 May 2026 15:27:41 -0700 Subject: [PATCH] Add cache hit/miss context to root execution span execution.cache.hit (bool), execution.cache_key, and execution.cache.reused_from_id (on cache hits) allow tracing cache efficiency and which source execution was reused. --- .../instrumentation/execution_tracing.py | 13 ++++++++ .../instrumentation/test_execution_tracing.py | 30 +++++++++++++++++++ 2 files changed, 43 insertions(+) diff --git a/cloud_pipelines_backend/instrumentation/execution_tracing.py b/cloud_pipelines_backend/instrumentation/execution_tracing.py index 6b749ac..50bfe0f 100644 --- a/cloud_pipelines_backend/instrumentation/execution_tracing.py +++ b/cloud_pipelines_backend/instrumentation/execution_tracing.py @@ -128,6 +128,18 @@ def _cloud_provider_attrs(*, execution: bts.ExecutionNode) -> dict[str, object]: return {"execution.cloud_provider": provider} +def _cache_attrs(*, execution: bts.ExecutionNode) -> dict[str, object]: + """Cache hit/miss attributes for the root execution span.""" + attrs: dict[str, object] = {} + if execution.container_execution_cache_key is not None: + attrs["execution.cache_key"] = execution.container_execution_cache_key + reused_from = (execution.extra_data or {}).get("reused_from_execution_node_id") + attrs["execution.cache.hit"] = reused_from is not None + if reused_from is not None: + attrs["execution.cache.reused_from_id"] = reused_from + return attrs + + def _ns(*, dt: datetime.datetime) -> int: """Return *dt* as nanoseconds since the Unix epoch (required by OTel SDK). @@ -161,6 +173,7 @@ def emit_execution_trace(*, execution: bts.ExecutionNode) -> None: "execution.id": execution.id, **_launcher_type_attrs(execution=execution), **_cloud_provider_attrs(execution=execution), + **_cache_attrs(execution=execution), }, start_time=_ns(dt=first_time), ) diff --git a/tests/instrumentation/test_execution_tracing.py b/tests/instrumentation/test_execution_tracing.py index fbd8f9c..1c859d5 100644 --- a/tests/instrumentation/test_execution_tracing.py +++ b/tests/instrumentation/test_execution_tracing.py @@ -288,3 +288,33 @@ def test_root_span_omits_cloud_provider_when_annotation_absent( s for s in span_exporter.get_finished_spans() if s.name == "execution" ) assert "execution.cloud_provider" not in (root.attributes or {}) + + +class TestCacheAttrs: + def test_cache_miss_sets_hit_false( + self, span_exporter: InMemorySpanExporter + ) -> None: + execution = _make_execution(statuses=["QUEUED", "SUCCEEDED"]) + execution_tracing.try_emit_execution_trace(execution=execution) + + root = next( + s for s in span_exporter.get_finished_spans() if s.name == "execution" + ) + assert root.attributes["execution.cache.hit"] is False + + def test_cache_hit_sets_hit_true_and_reused_from_id( + self, span_exporter: InMemorySpanExporter + ) -> None: + execution = _make_execution( + statuses=["QUEUED", "SUCCEEDED"], + extra={"reused_from_execution_node_id": "source-execution-id"}, + ) + execution_tracing.try_emit_execution_trace(execution=execution) + + root = next( + s for s in span_exporter.get_finished_spans() if s.name == "execution" + ) + assert root.attributes["execution.cache.hit"] is True + assert ( + root.attributes["execution.cache.reused_from_id"] == "source-execution-id" + )