Avoid psutil lookup race when starting supervised task subprocesses#64995
Avoid psutil lookup race when starting supervised task subprocesses#64995DaveT1991 wants to merge 3 commits intoapache:mainfrom
Conversation
|
Nice. @kaxil @ashb @amoghrajesh -> that looks indeed plausible and the solution looks good as well. WDYT? That would explain a flurry of random task instance status not matching actually running tasks - there were quite a few reports about that one. |
ashb
left a comment
There was a problem hiding this comment.
Mmmm I don't like the look of this -- The other advantage of psutil has is pid cycle/reuse detection.
Let me examine the issue too
ashb
left a comment
There was a problem hiding this comment.
I am not convinced this is the error -- Given psutil reads from the /proc fs filesystem on linux, and that comes directly from Kernel memory my understanding is that this cannot be racey. Once the process pid exists the /proc fs is updated and available.
I'm going to need more evidence that this is actually a race. We used psutil because it's safer than just killing a pid.
Yes. I saw similar issues raised and similar races happening at scalenin reported - by a quick search, we likely need to look closer and look at similar issues being solved elsewhere. But the issue is real - we have seen at least few reports from our users that in Airflow 3 there are running tasks that are reported as failed - but they are actually running in background - which would definitely match the pattern described here. |
|
And yes - I agree that solution likely could be better if we find that this kind of race is really the root cause. |
|
https://github.com/giampaolo/psutil/blob/v7.1.0/psutil/_pslinux.py#L1645-L1651: self._raise_if_zombie()
# /proc/PID directory may still exist, but the files within
# it may not, indicating the process is gone, see:
# https://github.com/giampaolo/psutil/issues/2418
if not os.path.exists(f"{self._procfs_path}/{pid}/stat"):
raise NoSuchProcess(pid, name) from err
raiseI think this is where the exception is coming from. I wonder if this could be caused by something kube/containerd and a procfs proxy/causing a delay. I couldn't find any other reports about this If that is the case I think I'd rather we do something like try:
proc = psutil.Process(pid)
except psutil.ProcessNotFound:
time.sleep(0.1)
proc = psutil.Process(pid)However I'm still skeptical this is actually what's happening. I can find no other reports anywhere saying a pid can exist without |
|
What might be worth doing is catching the error, and then when it does read the data from stdout/stderr sockets -- it's possible something is written on one those. However even then, if the process has died it's procfs entry should stay around until something |
I am also looking for those. |
|
Thanks for the feedback @ashb. I've updated the approach based on your suggestion — instead of replacing psutil, the new version simply retries psutil.Process(pid) after a 100ms sleep when NoSuchProcess is raised, which preserves pid-reuse detection. |
| monkeypatch.setattr("airflow.sdk.execution_time.supervisor.psutil.Process", flaky_process) | ||
| monkeypatch.setattr("airflow.sdk.execution_time.supervisor.time.sleep", lambda _: None) |
There was a problem hiding this comment.
The monkeypatch targets here use a dotted-string that looks like a nested attribute path ("airflow.sdk.execution_time.supervisor.psutil.Process" / ".time.sleep"). With pytest's monkeypatch, the string form imports everything up to the last dot as a module, so this will try to import airflow.sdk.execution_time.supervisor.psutil and airflow.sdk.execution_time.supervisor.time (which don’t exist) and the test will error. Patch the attributes on the already-imported supervisor module objects instead (e.g. import airflow.sdk.execution_time.supervisor and set supervisor.psutil.Process / supervisor.time.sleep).
| def _psutil_process(pid: int) -> psutil.Process: | ||
| try: | ||
| return psutil.Process(pid) | ||
| except psutil.NoSuchProcess: | ||
| time.sleep(0.1) | ||
| return psutil.Process(pid) |
There was a problem hiding this comment.
The PR description says this change replaces the immediate psutil lookup with an os.waitpid/os.kill-backed lightweight handle, but the implementation here still wraps the child PID in psutil.Process (with a retry). Either update the PR description to match the actual approach, or implement the described non-psutil handle so reviewers/users aren’t misled about the behavior and dependencies.
| except psutil.NoSuchProcess: | ||
| time.sleep(0.1) | ||
| return psutil.Process(pid) |
There was a problem hiding this comment.
time.sleep(0.1) is a new hard-coded retry delay. Given this module already centralizes timing knobs as constants (e.g. MIN_HEARTBEAT_INTERVAL, SOCKET_CLEANUP_TIMEOUT), consider extracting this delay (and possibly retry count/timeout) into a named constant or config-backed value so it’s easier to tune and reason about if the race needs longer retries on some platforms.
|
If you resolve comments from copilot, either add a comment if it is invalid or create a commit fixing it |
|
Note however: I am still very very skeptical this is the fix |
Summary
Avoid a startup race in the task supervisor when starting forked task subprocesses.
WatchedSubprocess.start()currently wraps the freshly forked child withpsutil.Process(pid)immediately. On some systems this can raisepsutil.NoSuchProcesseven though the child is the direct supervised process, which aborts task startup before the task instance is marked running and later shows up as a queued/failed mismatch.This change uses a lightweight handle backed by
os.waitpid/os.killfor directly forked children instead of performing an immediatepsutillookup, and adds a regression test to cover that startup path.Closes #64974
Testing
task-sdk/tests/task_sdk/execution_time/test_supervisor.pygit diff --checkI could not run the Python test suite locally in this environment because
pythonis not installed here.