diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/app.py b/airflow-core/src/airflow/api_fastapi/execution_api/app.py index f10c772e03f47..049fc60bdab78 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/app.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/app.py @@ -253,9 +253,25 @@ async def _extract_w3c_trace_context( token = otel_context.attach(ctx) try: yield - finally: + except GeneratorExit: + # Cross-task force-close (client disconnect / request cancellation): the + # finalizer runs in a different asyncio Task — and thus a different + # contextvars.Context — than attach did, so detaching the token would raise + # "Token was created in a different Context" (which OTel logs at ERROR before + # any suppression here could see it). The attached Context is being discarded + # with the dying task, so detaching is unnecessary; skip it and re-raise. + raise + except BaseException: + # A route handler raised: FastAPI throws the exception into this generator at + # the yield, in the SAME task that attach ran in. Detach so the upstream trace + # context does not stay attached for the exception handler, the error response, + # and any spans/logs emitted while unwinding. Suppress any detach error so it + # cannot mask the original exception being propagated. with contextlib.suppress(Exception): otel_context.detach(token) + raise + else: + otel_context.detach(token) def _inject_trace_context_dep(routes, mode: str) -> None: diff --git a/airflow-core/tests/unit/api_fastapi/execution_api/test_app.py b/airflow-core/tests/unit/api_fastapi/execution_api/test_app.py index 0a71e65525505..5f90c5a9b83eb 100644 --- a/airflow-core/tests/unit/api_fastapi/execution_api/test_app.py +++ b/airflow-core/tests/unit/api_fastapi/execution_api/test_app.py @@ -26,7 +26,10 @@ from fastapi.testclient import TestClient from opentelemetry import context as otel_context, propagate as otel_propagate -from airflow.api_fastapi.execution_api.app import create_task_execution_api_app +from airflow.api_fastapi.execution_api.app import ( + _extract_w3c_trace_context, + create_task_execution_api_app, +) from airflow.api_fastapi.execution_api.datamodels.taskinstance import TaskInstance from airflow.api_fastapi.execution_api.datamodels.token import TIClaims, TIToken from airflow.api_fastapi.execution_api.security import require_auth @@ -244,3 +247,47 @@ async def mock_require_auth(request: Request) -> TIToken: ): with pytest.raises(RuntimeError, match="boom"): test_client.get("/variables/k", headers={"Authorization": "Bearer fake"}) + + @staticmethod + def _make_request() -> Request: + return Request({"type": "http", "headers": []}) + + @pytest.mark.asyncio + async def test_detach_skipped_on_generator_exit(self): + """A force-closed generator (aclose -> GeneratorExit) must NOT call detach. + + On a real client disconnect the generator is finalised from a different asyncio + Task, where detach would raise "Token was created in a different Context". Closing + from the same task (as here) does not reproduce that cross-context error, but it + does exercise the GeneratorExit branch and asserts detach is skipped on it. + """ + with mock.patch.object(otel_context, "detach") as detach_spy: + gen = _extract_w3c_trace_context(self._make_request(), dependency_solver="fastapi") + await gen.asend(None) # run up to and including the yield (after attach) + await gen.aclose() # raises GeneratorExit at the yield + + detach_spy.assert_not_called() + + @pytest.mark.asyncio + async def test_detach_runs_on_route_exception(self): + """A route handler error (athrow at the yield) runs in the same task, so detach + must run -- otherwise the upstream trace context leaks into the error handling. + """ + with mock.patch.object(otel_context, "detach") as detach_spy: + gen = _extract_w3c_trace_context(self._make_request(), dependency_solver="fastapi") + await gen.asend(None) # run up to the yield + with pytest.raises(RuntimeError, match="boom"): + await gen.athrow(RuntimeError("boom")) # FastAPI's route-error finalization + + detach_spy.assert_called_once() + + @pytest.mark.asyncio + async def test_detach_runs_on_normal_completion(self): + """Normal completion still detaches the token in the request's own Context.""" + with mock.patch.object(otel_context, "detach") as detach_spy: + gen = _extract_w3c_trace_context(self._make_request(), dependency_solver="fastapi") + await gen.asend(None) # run up to the yield + with pytest.raises(StopAsyncIteration): + await gen.asend(None) # resume past the yield -> else branch -> detach + + detach_spy.assert_called_once()