Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent race condition in trying to collect result from DagFileProcessor #11306

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
38 changes: 35 additions & 3 deletions airflow/jobs/scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ def file_path(self) -> str:
@staticmethod
def _run_file_processor(
result_channel: MultiprocessingConnection,
parent_channel: MultiprocessingConnection,
file_path: str,
pickle_dags: bool,
dag_ids: Optional[List[str]],
Expand All @@ -131,6 +132,8 @@ def _run_file_processor(

:param result_channel: the connection to use for passing back the result
:type result_channel: multiprocessing.Connection
:param parent_channel: the parent end of the channel to close in the child
:type result_channel: multiprocessing.Connection
:param file_path: the file to process
:type file_path: str
:param pickle_dags: whether to pickle the DAGs found in the file and
Expand All @@ -149,6 +152,13 @@ def _run_file_processor(
# This helper runs in the newly created process
log: logging.Logger = logging.getLogger("airflow.processor")

# Since we share all open FDs from the parent, we need to close the parent side of the pipe here in
# the child, else it won't get closed properly until we exit.
log.info("Closing parent pipe")

parent_channel.close()
del parent_channel

set_context(log, file_path)
setproctitle("airflow scheduler - DagFileProcessor {}".format(file_path))

Expand Down Expand Up @@ -183,23 +193,25 @@ def _run_file_processor(
log.exception("Got an exception! Propagating...")
raise
finally:
result_channel.close()
# We re-initialized the ORM within this Process above so we need to
# tear it down manually here
settings.dispose_orm()

result_channel.close()

def start(self) -> None:
"""
Launch the process and start processing the DAG.
"""
start_method = self._get_multiprocessing_start_method()
context = multiprocessing.get_context(start_method)

self._parent_channel, _child_channel = context.Pipe()
_parent_channel, _child_channel = context.Pipe(duplex=False)
process = context.Process(
target=type(self)._run_file_processor,
args=(
_child_channel,
_parent_channel,
self.file_path,
self._pickle_dags,
self._dag_ids,
Expand All @@ -212,6 +224,15 @@ def start(self) -> None:
self._start_time = timezone.utcnow()
process.start()

# Close the child side of the pipe now the subprocess has started -- otherwise this would prevent it
# from closing in some cases
_child_channel.close()
del _child_channel

# Don't store it on self until after we've started the child process - we don't want to keep it from
# getting GCd/closed
self._parent_channel = _parent_channel

def kill(self) -> None:
"""
Kill the process launched to process the file, and ensure consistent state.
Expand Down Expand Up @@ -245,6 +266,8 @@ def _kill_process(self) -> None:
if self._process.is_alive() and self._process.pid:
self.log.warning("Killing DAGFileProcessorProcess (PID=%d)", self._process.pid)
os.kill(self._process.pid, signal.SIGKILL)
if self._parent_channel:
self._parent_channel.close()

@property
def pid(self) -> int:
Expand Down Expand Up @@ -293,7 +316,16 @@ def done(self) -> bool:
self._parent_channel.close()
return True
except EOFError:
pass
# If we get an EOFError, it means the child end of the pipe has been closed. This only happens
# in the finally block. But due to a possible race condition, the process may have not yet
# terminated (it could be doing cleanup/python shutdown still). So we kill it here after a
# "suitable" timeout.
self._done = True
# Arbitrary timeout -- error/race condition only, so this doesn't need to be tunable.
self._process.join(timeout=5)
if self._process.is_alive():
# Didn't shut down cleanly - kill it
self._kill_process()

if not self._process.is_alive():
self._done = True
Expand Down