Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion airflow-core/src/airflow/api_fastapi/execution_api/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,9 +253,25 @@ async def _extract_w3c_trace_context(
token = otel_context.attach(ctx)
try:
yield
finally:
except GeneratorExit:
# Cross-task force-close (client disconnect / request cancellation): the
# finalizer runs in a different asyncio Task — and thus a different
# contextvars.Context — than attach did, so detaching the token would raise
# "Token was created in a different Context" (which OTel logs at ERROR before
# any suppression here could see it). The attached Context is being discarded
# with the dying task, so detaching is unnecessary; skip it and re-raise.
raise
except BaseException:
# A route handler raised: FastAPI throws the exception into this generator at
# the yield, in the SAME task that attach ran in. Detach so the upstream trace
# context does not stay attached for the exception handler, the error response,
# and any spans/logs emitted while unwinding. Suppress any detach error so it
# cannot mask the original exception being propagated.
with contextlib.suppress(Exception):
otel_context.detach(token)
raise
else:
otel_context.detach(token)
Comment thread
dheerajturaga marked this conversation as resolved.
Comment on lines 254 to +274
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you share before/after logs from an actual disconnect? The tests assert detach isn't called, but the spurious line is logged inside OTel's own detach() (except Exception: logger.exception(...)), so I'd like to see it's actually gone from the output.

Minor and optional: rather than splitting on GeneratorExit vs BaseException, you could capture the task at attach and detach only when unwinding in it:

attached_in = asyncio.current_task()
try:
    yield
finally:
    if asyncio.current_task() is attached_in:
        otel_context.detach(token)

Same result, one branch. Not a blocker.



def _inject_trace_context_dep(routes, mode: str) -> None:
Expand Down
49 changes: 48 additions & 1 deletion airflow-core/tests/unit/api_fastapi/execution_api/test_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@
from fastapi.testclient import TestClient
from opentelemetry import context as otel_context, propagate as otel_propagate

from airflow.api_fastapi.execution_api.app import create_task_execution_api_app
from airflow.api_fastapi.execution_api.app import (
_extract_w3c_trace_context,
create_task_execution_api_app,
)
from airflow.api_fastapi.execution_api.datamodels.taskinstance import TaskInstance
from airflow.api_fastapi.execution_api.datamodels.token import TIClaims, TIToken
from airflow.api_fastapi.execution_api.security import require_auth
Expand Down Expand Up @@ -244,3 +247,47 @@ async def mock_require_auth(request: Request) -> TIToken:
):
with pytest.raises(RuntimeError, match="boom"):
test_client.get("/variables/k", headers={"Authorization": "Bearer fake"})

@staticmethod
def _make_request() -> Request:
return Request({"type": "http", "headers": []})

@pytest.mark.asyncio
async def test_detach_skipped_on_generator_exit(self):
"""A force-closed generator (aclose -> GeneratorExit) must NOT call detach.

On a real client disconnect the generator is finalised from a different asyncio
Task, where detach would raise "Token was created in a different Context". Closing
from the same task (as here) does not reproduce that cross-context error, but it
does exercise the GeneratorExit branch and asserts detach is skipped on it.
"""
with mock.patch.object(otel_context, "detach") as detach_spy:
gen = _extract_w3c_trace_context(self._make_request(), dependency_solver="fastapi")
await gen.asend(None) # run up to and including the yield (after attach)
await gen.aclose() # raises GeneratorExit at the yield
Comment thread
dheerajturaga marked this conversation as resolved.

detach_spy.assert_not_called()

@pytest.mark.asyncio
async def test_detach_runs_on_route_exception(self):
"""A route handler error (athrow at the yield) runs in the same task, so detach
must run -- otherwise the upstream trace context leaks into the error handling.
"""
with mock.patch.object(otel_context, "detach") as detach_spy:
gen = _extract_w3c_trace_context(self._make_request(), dependency_solver="fastapi")
await gen.asend(None) # run up to the yield
with pytest.raises(RuntimeError, match="boom"):
await gen.athrow(RuntimeError("boom")) # FastAPI's route-error finalization

detach_spy.assert_called_once()

@pytest.mark.asyncio
async def test_detach_runs_on_normal_completion(self):
"""Normal completion still detaches the token in the request's own Context."""
with mock.patch.object(otel_context, "detach") as detach_spy:
gen = _extract_w3c_trace_context(self._make_request(), dependency_solver="fastapi")
await gen.asend(None) # run up to the yield
with pytest.raises(StopAsyncIteration):
await gen.asend(None) # resume past the yield -> else branch -> detach

detach_spy.assert_called_once()
Loading