Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 28 additions & 1 deletion cloud_pipelines_backend/instrumentation/execution_tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,32 @@ def _error_attrs(*, execution: bts.ExecutionNode, status: str) -> dict[str, obje
_EPOCH = datetime.datetime(1970, 1, 1, tzinfo=datetime.timezone.utc)


def _launcher_pod_attrs(
*, execution: bts.ExecutionNode, status: str
) -> dict[str, object]:
"""k8s pod/cluster attributes for the PENDING span."""
if status != bts.ContainerExecutionStatus.PENDING:
return {}
if execution.container_execution_id is None:
return {}
ce = execution.container_execution
if ce is None or ce.launcher_data is None:
return {}
k8s = (
ce.launcher_data.get("kubernetes")
or ce.launcher_data.get("kubernetes_job")
or {}
)
attrs: dict[str, object] = {}
if pod_name := k8s.get("pod_name") or k8s.get("job_name"):
attrs["k8s.pod.name"] = pod_name
if namespace := k8s.get("namespace"):
attrs["k8s.namespace.name"] = namespace
if cluster_url := k8s.get("cluster_server"):
attrs["k8s.cluster.url"] = cluster_url
return attrs


def _ns(*, dt: datetime.datetime) -> int:
"""Return *dt* as nanoseconds since the Unix epoch (required by OTel SDK).

Expand All @@ -75,7 +101,7 @@ def _ns(*, dt: datetime.datetime) -> int:
) * 1_000_000_000 + delta.microseconds * 1_000


def try_emit_execution_trace(*, execution: bts.ExecutionNode) -> None:
def emit_execution_trace(*, execution: bts.ExecutionNode) -> None:
"""Emit a complete execution trace when *execution* reaches a terminal status.

No-op for non-terminal executions. All exceptions are caught and logged so
Expand Down Expand Up @@ -106,6 +132,7 @@ def try_emit_execution_trace(*, execution: bts.ExecutionNode) -> None:
"execution.id": execution.id,
"execution.status": entry["status"],
**_error_attrs(execution=execution, status=entry["status"]),
**_launcher_pod_attrs(execution=execution, status=entry["status"]),
}
_tracer.start_span(
f"execution.status {entry['status']}",
Expand Down
27 changes: 27 additions & 0 deletions tests/instrumentation/test_execution_tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,3 +210,30 @@ def test_root_span_not_marked_error_on_succeeded(
s for s in span_exporter.get_finished_spans() if s.name == "execution"
)
assert root.status.status_code != StatusCode.ERROR


class TestLauncherPodAttrs:
def test_pending_span_carries_k8s_attributes(
self, span_exporter: InMemorySpanExporter
) -> None:
execution = _make_execution(
statuses=["QUEUED", "PENDING", "RUNNING", "SUCCEEDED"]
)
execution_tracing.try_emit_execution_trace(execution=execution)

pending_span = next(
s
for s in span_exporter.get_finished_spans()
if s.attributes.get("execution.status") == "PENDING"
)
# No container_execution set on this stub — attrs should simply be absent.
assert "k8s.pod.name" not in (pending_span.attributes or {})

def test_non_pending_span_has_no_k8s_attributes(
self, span_exporter: InMemorySpanExporter
) -> None:
execution = _make_execution(statuses=["QUEUED", "SUCCEEDED"])
execution_tracing.try_emit_execution_trace(execution=execution)

for span in span_exporter.get_finished_spans():
assert "k8s.pod.name" not in (span.attributes or {})