From 00a8b23cd1523047b03fc5b916ce64b5a5286d8f Mon Sep 17 00:00:00 2001 From: Dheeraj Turaga Date: Thu, 4 Jun 2026 15:59:15 -0500 Subject: [PATCH] Fix spurious "Failed to detach context" error on Execution API disconnects The Execution API trace-propagation dependency attaches an OpenTelemetry context before yielding and detaches it afterwards. When a client disconnects or the request is cancelled, the dependency generator is force-closed from a different asyncio task, so the detach ran against a contextvars.Token created in a different Context. OpenTelemetry caught and logged that ValueError at ERROR ("Failed to detach context") before our suppression could see it, producing alarming but harmless log noise on the Dag processor and other clients. Skip the detach only on the GeneratorExit force-close path, where the originating Context is being discarded anyway. On all same-task unwind paths -- normal completion and a route handler raising (which FastAPI throws into the generator at the yield) -- detach as before so the upstream trace context does not leak into the exception handler, the error response, or error-path spans. The route-error detach is suppressed so it can never mask the original exception. --- .../airflow/api_fastapi/execution_api/app.py | 18 ++++++- .../api_fastapi/execution_api/test_app.py | 49 ++++++++++++++++++- 2 files changed, 65 insertions(+), 2 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/app.py b/airflow-core/src/airflow/api_fastapi/execution_api/app.py index f10c772e03f47..049fc60bdab78 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/app.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/app.py @@ -253,9 +253,25 @@ async def _extract_w3c_trace_context( token = otel_context.attach(ctx) try: yield - finally: + except GeneratorExit: + # Cross-task force-close (client disconnect / request cancellation): the + # finalizer runs in a different asyncio Task — and thus a different + # contextvars.Context — than attach did, so detaching the token would raise + # "Token was created in a different Context" (which OTel logs at ERROR before + # any suppression here could see it). The attached Context is being discarded + # with the dying task, so detaching is unnecessary; skip it and re-raise. + raise + except BaseException: + # A route handler raised: FastAPI throws the exception into this generator at + # the yield, in the SAME task that attach ran in. Detach so the upstream trace + # context does not stay attached for the exception handler, the error response, + # and any spans/logs emitted while unwinding. Suppress any detach error so it + # cannot mask the original exception being propagated. with contextlib.suppress(Exception): otel_context.detach(token) + raise + else: + otel_context.detach(token) def _inject_trace_context_dep(routes, mode: str) -> None: diff --git a/airflow-core/tests/unit/api_fastapi/execution_api/test_app.py b/airflow-core/tests/unit/api_fastapi/execution_api/test_app.py index 0a71e65525505..5f90c5a9b83eb 100644 --- a/airflow-core/tests/unit/api_fastapi/execution_api/test_app.py +++ b/airflow-core/tests/unit/api_fastapi/execution_api/test_app.py @@ -26,7 +26,10 @@ from fastapi.testclient import TestClient from opentelemetry import context as otel_context, propagate as otel_propagate -from airflow.api_fastapi.execution_api.app import create_task_execution_api_app +from airflow.api_fastapi.execution_api.app import ( + _extract_w3c_trace_context, + create_task_execution_api_app, +) from airflow.api_fastapi.execution_api.datamodels.taskinstance import TaskInstance from airflow.api_fastapi.execution_api.datamodels.token import TIClaims, TIToken from airflow.api_fastapi.execution_api.security import require_auth @@ -244,3 +247,47 @@ async def mock_require_auth(request: Request) -> TIToken: ): with pytest.raises(RuntimeError, match="boom"): test_client.get("/variables/k", headers={"Authorization": "Bearer fake"}) + + @staticmethod + def _make_request() -> Request: + return Request({"type": "http", "headers": []}) + + @pytest.mark.asyncio + async def test_detach_skipped_on_generator_exit(self): + """A force-closed generator (aclose -> GeneratorExit) must NOT call detach. + + On a real client disconnect the generator is finalised from a different asyncio + Task, where detach would raise "Token was created in a different Context". Closing + from the same task (as here) does not reproduce that cross-context error, but it + does exercise the GeneratorExit branch and asserts detach is skipped on it. + """ + with mock.patch.object(otel_context, "detach") as detach_spy: + gen = _extract_w3c_trace_context(self._make_request(), dependency_solver="fastapi") + await gen.asend(None) # run up to and including the yield (after attach) + await gen.aclose() # raises GeneratorExit at the yield + + detach_spy.assert_not_called() + + @pytest.mark.asyncio + async def test_detach_runs_on_route_exception(self): + """A route handler error (athrow at the yield) runs in the same task, so detach + must run -- otherwise the upstream trace context leaks into the error handling. + """ + with mock.patch.object(otel_context, "detach") as detach_spy: + gen = _extract_w3c_trace_context(self._make_request(), dependency_solver="fastapi") + await gen.asend(None) # run up to the yield + with pytest.raises(RuntimeError, match="boom"): + await gen.athrow(RuntimeError("boom")) # FastAPI's route-error finalization + + detach_spy.assert_called_once() + + @pytest.mark.asyncio + async def test_detach_runs_on_normal_completion(self): + """Normal completion still detaches the token in the request's own Context.""" + with mock.patch.object(otel_context, "detach") as detach_spy: + gen = _extract_w3c_trace_context(self._make_request(), dependency_solver="fastapi") + await gen.asend(None) # run up to the yield + with pytest.raises(StopAsyncIteration): + await gen.asend(None) # resume past the yield -> else branch -> detach + + detach_spy.assert_called_once()