diff --git a/cloud_pipelines_backend/instrumentation/execution_tracing.py b/cloud_pipelines_backend/instrumentation/execution_tracing.py index a6512eb..e5e8afe 100644 --- a/cloud_pipelines_backend/instrumentation/execution_tracing.py +++ b/cloud_pipelines_backend/instrumentation/execution_tracing.py @@ -11,6 +11,7 @@ import logging from opentelemetry import trace +from opentelemetry.trace import StatusCode from .. import backend_types_sql as bts @@ -19,6 +20,42 @@ _HISTORY_KEY = bts.EXECUTION_NODE_EXTRA_DATA_STATUS_HISTORY_KEY _TERMINAL_STATUSES = frozenset(s.value for s in bts.CONTAINER_STATUSES_ENDED) +_ERROR_TERMINAL_STATUSES = frozenset( + s.value + for s in ( + bts.ContainerExecutionStatus.FAILED, + bts.ContainerExecutionStatus.SYSTEM_ERROR, + ) +) + + +def _error_attrs(*, execution: bts.ExecutionNode, status: str) -> dict[str, object]: + """Extra attributes for terminal error/success status spans.""" + extra = execution.extra_data or {} + attrs: dict[str, object] = {} + if status == bts.ContainerExecutionStatus.FAILED: + msg = extra.get(bts.EXECUTION_NODE_EXTRA_DATA_ORCHESTRATION_ERROR_MESSAGE_KEY) + if msg is not None: + attrs["error.message"] = msg + if execution.container_execution_id is not None: + ce = execution.container_execution + if ce is not None and ce.exit_code is not None: + attrs["execution.exit_code"] = ce.exit_code + elif status == bts.ContainerExecutionStatus.SYSTEM_ERROR: + msg = extra.get( + bts.EXECUTION_NODE_EXTRA_DATA_SYSTEM_ERROR_EXCEPTION_MESSAGE_KEY + ) + if msg is not None: + attrs["error.message"] = msg + tb = extra.get(bts.EXECUTION_NODE_EXTRA_DATA_SYSTEM_ERROR_EXCEPTION_FULL_KEY) + if tb is not None: + attrs["exception.stacktrace"] = tb + elif status == bts.ContainerExecutionStatus.SUCCEEDED: + if execution.container_execution_id is not None: + ce = execution.container_execution + if ce is not None and ce.exit_code is not None: + attrs["execution.exit_code"] = ce.exit_code + return attrs _EPOCH = datetime.datetime(1970, 1, 1, tzinfo=datetime.timezone.utc) @@ -68,6 +105,7 @@ def try_emit_execution_trace(*, execution: bts.ExecutionNode) -> None: attrs: dict[str, object] = { "execution.id": execution.id, "execution.status": entry["status"], + **_error_attrs(execution=execution, status=entry["status"]), } _tracer.start_span( f"execution.status {entry['status']}", @@ -76,6 +114,8 @@ def try_emit_execution_trace(*, execution: bts.ExecutionNode) -> None: start_time=_ns(dt=t_start), ).end(end_time=_ns(dt=t_end)) + if history[-1]["status"] in _ERROR_TERMINAL_STATUSES: + root.set_status(status=StatusCode.ERROR) root.end(end_time=_ns(dt=last_time)) except Exception: _logger.warning( diff --git a/tests/instrumentation/test_execution_tracing.py b/tests/instrumentation/test_execution_tracing.py index 41cb39b..4b74770 100644 --- a/tests/instrumentation/test_execution_tracing.py +++ b/tests/instrumentation/test_execution_tracing.py @@ -144,3 +144,69 @@ def test_child_span_status_attribute( "execution.status QUEUED", "execution.status SUCCEEDED", } + + +class TestErrorDataAttrs: + def test_failed_span_carries_orchestration_error_message( + self, span_exporter: InMemorySpanExporter + ) -> None: + execution = _make_execution( + statuses=["QUEUED", "RUNNING", "FAILED"], + extra={ + bts.EXECUTION_NODE_EXTRA_DATA_ORCHESTRATION_ERROR_MESSAGE_KEY: "missing outputs" + }, + ) + execution_tracing.try_emit_execution_trace(execution=execution) + + failed_span = next( + s + for s in span_exporter.get_finished_spans() + if s.attributes.get("execution.status") == "FAILED" + ) + assert failed_span.attributes["error.message"] == "missing outputs" + + def test_system_error_span_carries_exception_message_and_stacktrace( + self, span_exporter: InMemorySpanExporter + ) -> None: + execution = _make_execution( + statuses=["QUEUED", "SYSTEM_ERROR"], + extra={ + bts.EXECUTION_NODE_EXTRA_DATA_SYSTEM_ERROR_EXCEPTION_MESSAGE_KEY: "RuntimeError", + bts.EXECUTION_NODE_EXTRA_DATA_SYSTEM_ERROR_EXCEPTION_FULL_KEY: "Traceback...", + }, + ) + execution_tracing.try_emit_execution_trace(execution=execution) + + err_span = next( + s + for s in span_exporter.get_finished_spans() + if s.attributes.get("execution.status") == "SYSTEM_ERROR" + ) + assert err_span.attributes["error.message"] == "RuntimeError" + assert err_span.attributes["exception.stacktrace"] == "Traceback..." + + def test_root_span_marked_error_on_failed( + self, span_exporter: InMemorySpanExporter + ) -> None: + from opentelemetry.trace import StatusCode + + execution = _make_execution(statuses=["QUEUED", "FAILED"]) + execution_tracing.try_emit_execution_trace(execution=execution) + + root = next( + s for s in span_exporter.get_finished_spans() if s.name == "execution" + ) + assert root.status.status_code == StatusCode.ERROR + + def test_root_span_not_marked_error_on_succeeded( + self, span_exporter: InMemorySpanExporter + ) -> None: + from opentelemetry.trace import StatusCode + + execution = _make_execution(statuses=["QUEUED", "SUCCEEDED"]) + execution_tracing.try_emit_execution_trace(execution=execution) + + root = next( + s for s in span_exporter.get_finished_spans() if s.name == "execution" + ) + assert root.status.status_code != StatusCode.ERROR