Fix PythonVirtualenvOperator failing to access Variables/Connections in Airflow 3.x#61629
Fix PythonVirtualenvOperator failing to access Variables/Connections in Airflow 3.x#61629andreahlert wants to merge 1 commit intoapache:mainfrom
Conversation
…in Airflow 3.x In the normal forked execution path, the supervisor communicates with the task process via fd 0 (stdin), but __AIRFLOW_SUPERVISOR_FD env var is never set. When PythonVirtualenvOperator launches a subprocess via Popen, the virtualenv process cannot re-establish supervisor comms, causing Variable.get() and Connection.get() to fail silently. This fix explicitly detects the SUPERVISOR_COMMS socket fd in the operator and propagates it via __AIRFLOW_SUPERVISOR_FD to the subprocess. It also hardens the virtualenv script template to catch ImportError (not just ModuleNotFoundError) and wraps reinit_supervisor_comms() in try/except to prevent script crashes. Closes: apache#58724
What is the error you see in this case? How does it present to the user/taskligs? |
ashb
left a comment
There was a problem hiding this comment.
Not yet convinced this is the right behaviour
| try: | ||
| reinit_supervisor_comms() | ||
| except Exception: | ||
| pass |
There was a problem hiding this comment.
No. If expect airflow is true then an error here should fail the task.
Regardless silently ignoring the error is the worst of all paths
There was a problem hiding this comment.
Yeah you're right about the silent pass, I'll add a warning to stderr.
There was a problem hiding this comment.
But I'd keep the try/except itself. reinit failing doesn't mean airflow isn't there, it just means the socket setup broke.
If the callable actually needs Variables/Connections it'll fail with a proper error at that point anyway. Crashing the whole task on a socket error before we even get to run user code feels wrong to me.
The real fix is the fd propagation in python.py, this is just a fallback.
What do ya think?
There was a problem hiding this comment.
This should never happen. It means your environment is broken. If you don't want this, set expect airflow to false
There was a problem hiding this comment.
Oh this isn't in an expect airflow block.
Still, if imports at all it should be complete, else all sorts of things will break.
Is think moving this (without the exception handing) in to the expect airflow, or yet import, but only fail/log off expect airflow is yet
| env_vars = dict(os.environ) if self.inherit_env else {} | ||
| if fd := os.getenv("__AIRFLOW_SUPERVISOR_FD"): | ||
| env_vars["__AIRFLOW_SUPERVISOR_FD"] = fd | ||
| elif AIRFLOW_V_3_0_PLUS and "__AIRFLOW_SUPERVISOR_FD" not in env_vars: |
There was a problem hiding this comment.
Normal forked execution. The supervisor dup2's the comms socket onto fd 0 never sets __AIRFLOW_SUPERVISOR_FD in env. So when the virtualenv operator spawns a Popen on top of that, getenv returns None and we land here.
We grab the fd from the existing SUPERVISOR_COMMS and pass it through. Otherwise reinit in the child defaults to fd 0 which is stdout at that point because Popen redirected it.
There was a problem hiding this comment.
Connections/vars work without this change - stdin is not a normal stdin at this point, its already a socket, so the default to 0 is correct and works.
If the SDK is installed but some transitive dep is missing (httpx, structlog, etc), the import blows up with ImportError, not ModuleNotFoundError. The script just dies before the callable even runs - pretty bad UX. |
How is that possible? Sdk lists those modules as dependencies.
Does it show the error in task logs? |
What
Fixes
PythonVirtualenvOperator(andExternalPythonOperator) failing to accessVariable.get(),Connection.get(), and XCom via the Task SDK in Airflow 3.x environments (particularly on Kubernetes).Closes: #58724
Why
In the normal forked execution path, the supervisor communicates with the task process via a Unix socketpair mapped to fd 0 (stdin). However, the
__AIRFLOW_SUPERVISOR_FDenvironment variable is never set in this path - it is only set byInProcessTestSupervisorwhen usingdag.test().When
PythonVirtualenvOperatorlaunches a subprocess viaPopen(close_fds=False), while fd 0 is technically inherited, the virtualenv subprocess has no reliable way to know which fd carries the supervisor comms channel. Thereinit_supervisor_comms()function defaults to fd 0, but this is fragile and breaks when:airflow.sdk.execution_time.task_runnerimport fails withImportError(not justModuleNotFoundError) due to missing transitive dependencies in the virtualenvreinit_supervisor_comms()itself crashes (e.g., socket validation fails), killing the entire virtualenv scriptWithout supervisor comms, the secrets backend falls back to
EnvironmentVariablesBackendonly, silently losing access to all DB-stored Variables and Connections.How
Two-pronged fix:
1. Operator side (
python.py)In
_BasePythonVirtualenvOperator._execute_python_callable_in_subprocess(), added anelifblock that:SUPERVISOR_COMMSsocket fd fromtask_runneros.set_inheritable()__AIRFLOW_SUPERVISOR_FDenv varThis makes the fd propagation explicit rather than relying on implicit fd 0 inheritance.
2. Template side (
python_virtualenv_script.jinja2)except ModuleNotFoundErrortoexcept (ModuleNotFoundError, ImportError)to handle cases wheretask_runnercan be found but has failing transitive dependenciesreinit_supervisor_comms()call intry/except Exceptionto prevent the entire virtualenv script from crashing if socket communication setup failsTesting
The existing test
test_reinit_supervisor_commsintask-sdk/tests/task_sdk/execution_time/test_supervisor.pyvalidates the basic mechanism (subprocess reinits comms and fetches a connection). The fix ensures this mechanism is properly triggered in production by explicitly propagating the fd.