Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dag_processing code needs to handle OSError("handle is closed") in poll() and recv() calls #22191

Closed
2 tasks done
dlesco opened this issue Mar 11, 2022 · 11 comments · Fixed by #22685
Closed
2 tasks done
Labels
area:core kind:bug This is a clearly a bug
Milestone

Comments

@dlesco
Copy link

dlesco commented Mar 11, 2022

Apache Airflow version

2.1.4

What happened

The problem also exists in the latest version of the Airflow code, but I experienced it in 2.1.4.

This is the root cause of problems experienced in issue#13542.

I'll provide a stack trace below. The problem is in the code of airflow/dag_processing/processor.py (and manager.py), all poll() and recv() calls to the multiprocessing communication channels need to be wrapped in exception handlers, handling OSError("handle is closed") exceptions. If one looks at the Python multiprocessing source code, it throws this exception when the channel's handle has been closed.

This occurs in Airflow when a DAG File Processor has been killed or terminated; the Airflow code closes the communication channel when it is killing or terminating a DAG File Processor process (for example, when a dag_file_processor_timeout occurs).This killing or terminating happens asynchronously (in another process) from the process calling the poll() or recv() on the communication channel. This is why an exception needs to be handled. A pre-check of the handle being open is not good enough, because the other process doing the kill or terminate may close the handle in between your pre-check and actually calling poll() or recv() (a race condition).

What you expected to happen

Here is the stack trace of the occurence I saw:

[2022-03-08 17:41:06,101] {taskinstance.py:914} DEBUG - <TaskInstance: staq_report_daily.gs.wait_staq_csv_file 2022-03-06 17:15:00+00:00 [running]> dependency 'Not In Retry Period' PASSED: True, The context specified that being in a retry p
eriod was permitted.
[2022-03-08 17:41:06,101] {taskinstance.py:904} DEBUG - Dependencies all met for <TaskInstance: staq_report_daily.gs.wait_staq_csv_file 2022-03-06 17:15:00+00:00 [running]>
[2022-03-08 17:41:06,119] {scheduler_job.py:1196} DEBUG - Skipping SLA check for <DAG: gdai_gcs_sync> because no tasks in DAG have SLAs
[2022-03-08 17:41:06,119] {scheduler_job.py:1196} DEBUG - Skipping SLA check for <DAG: unity_creative_import_process> because no tasks in DAG have SLAs
[2022-03-08 17:41:06,119] {scheduler_job.py:1196} DEBUG - Skipping SLA check for <DAG: sales_dm_to_bq> because no tasks in DAG have SLAs
[2022-03-08 17:44:50,454] {settings.py:302} DEBUG - Disposing DB connection pool (PID 1902)
Process ForkProcess-1:
Traceback (most recent call last):
  File "/opt/python3.8/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/opt/python3.8/lib/python3.8/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/dag_processing/manager.py", line 370, in _run_processor_manager
    processor_manager.start()
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/dag_processing/manager.py", line 610, in start
    return self._run_parsing_loop()
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/dag_processing/manager.py", line 671, in _run_parsing_loop
    self._collect_results_from_processor(processor)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/dag_processing/manager.py", line 981, in _collect_results_from_processor
    if processor.result is not None:
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/dag_processing/processor.py", line 321, in result
    if not self.done:
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/dag_processing/processor.py", line 286, in done
    if self._parent_channel.poll():
  File "/opt/python3.8/lib/python3.8/multiprocessing/connection.py", line 255, in poll
    self._check_closed()
  File "/opt/python3.8/lib/python3.8/multiprocessing/connection.py", line 136, in _check_closed
    raise OSError("handle is closed")
OSError: handle is closed

This corresponded in time to the following log entries:

% kubectl logs airflow-scheduler-58c997dd98-n8xr8 -c airflow-scheduler --previous | egrep 'Ran scheduling loop in|[[]heartbeat[]]'
[2022-03-08 17:40:47,586] {scheduler_job.py:813} DEBUG - Ran scheduling loop in 0.56 seconds
[2022-03-08 17:40:49,146] {scheduler_job.py:813} DEBUG - Ran scheduling loop in 0.56 seconds
[2022-03-08 17:40:50,675] {base_job.py:227} DEBUG - [heartbeat]
[2022-03-08 17:40:50,687] {scheduler_job.py:813} DEBUG - Ran scheduling loop in 0.54 seconds
[2022-03-08 17:40:52,144] {scheduler_job.py:813} DEBUG - Ran scheduling loop in 0.46 seconds
[2022-03-08 17:40:53,620] {scheduler_job.py:813} DEBUG - Ran scheduling loop in 0.47 seconds
[2022-03-08 17:40:55,085] {scheduler_job.py:813} DEBUG - Ran scheduling loop in 0.46 seconds
[2022-03-08 17:40:56,169] {base_job.py:227} DEBUG - [heartbeat]
[2022-03-08 17:40:56,180] {scheduler_job.py:813} DEBUG - Ran scheduling loop in 0.49 seconds
[2022-03-08 17:40:57,667] {scheduler_job.py:813} DEBUG - Ran scheduling loop in 0.49 seconds
[2022-03-08 17:40:59,148] {scheduler_job.py:813} DEBUG - Ran scheduling loop in 0.48 seconds
[2022-03-08 17:41:00,618] {scheduler_job.py:813} DEBUG - Ran scheduling loop in 0.47 seconds
[2022-03-08 17:41:01,742] {base_job.py:227} DEBUG - [heartbeat]
[2022-03-08 17:41:01,757] {scheduler_job.py:813} DEBUG - Ran scheduling loop in 0.58 seconds
[2022-03-08 17:41:03,133] {scheduler_job.py:813} DEBUG - Ran scheduling loop in 0.55 seconds
[2022-03-08 17:41:04,664] {scheduler_job.py:813} DEBUG - Ran scheduling loop in 0.53 seconds
[2022-03-08 17:44:50,649] {base_job.py:227} DEBUG - [heartbeat]
[2022-03-08 17:44:50,814] {scheduler_job.py:813} DEBUG - Ran scheduling loop in 225.15 seconds

You can see that when this exception occurred, there was a hang in the scheduler for almost 4 minutes, no scheduling loops, and no scheduler_job heartbeats.

This hang probably also caused stuck queued jobs as issue#13542 describes.

How to reproduce

This is hard to reproduce because it is a race condition. But you might be able to reproduce by having in a dagfile top-level code that calls sleep, so that it takes longer to parse than core dag_file_processor_timeout setting. That would cause the parsing processes to be terminated, creating the conditions for this bug to occur.

Operating System

NAME="Ubuntu" VERSION="18.04.6 LTS (Bionic Beaver)" ID=ubuntu ID_LIKE=debian PRETTY_NAME="Ubuntu 18.04.6 LTS" VERSION_ID="18.04" HOME_URL="https://www.ubuntu.com/" SUPPORT_URL="https://help.ubuntu.com/" BUG_REPORT_URL="https://bugs.launchpad.net/ubuntu/" PRIVACY_POLICY_URL="https://www.ubuntu.com/legal/terms-and-policies/privacy-policy" VERSION_CODENAME=bionic UBUNTU_CODENAME=bionic

Versions of Apache Airflow Providers

Not relevant, this is a core dag_processing issue.

Deployment

Composer

Deployment details

"composer-1.17.6-airflow-2.1.4"

In order to isolate the scheduler to a separate machine, so as to not have interference from other processes such as airflow-workers running on the same machine, we created an additional node-pool for the scheduler, and ran these k8s patches to move the scheduler to a separate machine.

New node pool definition:

    {
      name              = "scheduler-pool"
      machine_type      = "n1-highcpu-8"
      autoscaling       = false
      node_count        = 1
      disk_type         = "pd-balanced"
      disk_size         = 64
      image_type        = "COS"
      auto_repair       = true
      auto_upgrade      = true
      max_pods_per_node = 32
    },

patch.sh

#!/bin/bash
if [ $# -lt 1 ]; then
  echo "Usage: $0 namespace"
  echo "Description: Isolate airflow-scheduler onto it's own node-pool (scheduler-pool)."
  echo "Options:"
  echo "  namespace: kubernetes namespace used by Composer"
  exit 1
fi

namespace=$1

set -eu
set -o pipefail

scheduler_patch="$(cat airflow-scheduler-patch.yaml)"
fluentd_patch="$(cat composer-fluentd-daemon-patch.yaml)"

set -x

kubectl -n default patch daemonset composer-fluentd-daemon -p "${fluentd_patch}"
kubectl -n ${namespace} patch deployment airflow-scheduler -p "${scheduler_patch}"

composer-fluentd-daemon-patch.yaml

spec:
  template:
    spec:
      nodeSelector: null
      affinity:
        nodeAffinity:
          requiredDuringSchedulingIgnoredDuringExecution:
            nodeSelectorTerms:
            - matchExpressions:
              - key: cloud.google.com/gke-nodepool
                operator: In
                values:
                - default-pool
                - scheduler-pool

airflow-scheduler-patch.yaml

spec:
  template:
    spec:
      nodeSelector:
        cloud.google.com/gke-nodepool: scheduler-pool
      containers:
      - name: gcs-syncd
        resources:
          limits:
            memory: 2Gi

Anything else

On the below checkbox of submitting a PR, I could submit one, but it'd be untested code, I don't really have the environment setup to test the patch.

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@dlesco dlesco added area:core kind:bug This is a clearly a bug labels Mar 11, 2022
@boring-cyborg
Copy link

boring-cyborg bot commented Mar 11, 2022

Thanks for opening your first issue here! Be sure to follow the issue template!

@uranusjr
Copy link
Member

Feel free to submit a pull request to handle the exception! We can figure out how to test the solution in the review process.

BTW I don’t know what your current fix looks like, but OSError has an errno attribute, checking that in the error handling code may be appropriate as well. (Not sure, I don’t even know what errno this error has right now.)

@dlesco
Copy link
Author

dlesco commented Mar 16, 2022

I plan to submit a PR within the next two weeks.

@pcolladosoto
Copy link
Contributor

pcolladosoto commented Mar 21, 2022

Trying to explain things...

Our team has run into this issue time and time again. We have tried different combinations of both Airflow and Python versions to no avail.

TL;DR

When a DAGFileProcessor hangs and is killed due to a timeout we believe the self.waitables and self._processors attributes of the DAGFileProcessorManager are not being updated as they should. This causes an unhandled exception when trying to receive data on a pipe end (i.e. file descriptor) which has already been closed.

The long read...

We're running a decouple Airflow deployment within a k8s cluster. We are currently using a 3-container pod where one of them runs the Web Server, another one executes the Scheduler and the third one implements Flower (we're using the CeleryExecutor). The backbone of the deployment is implemented through a StatefulSet that runs the Celery executors themselves.

The trace we were seeing on the scheduler time and time again was:

Process ForkServerProcess-1:
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/usr/local/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/dag_processing/manager.py", line 370, in _run_processor_manager
    processor_manager.start()
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/dag_processing/manager.py", line 610, in start
    return self._run_parsing_loop()
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/dag_processing/manager.py", line 671, in _run_parsing_loop
    self._collect_results_from_processor(processor)
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/dag_processing/manager.py", line 981, in _collect_results_from_processor
    if processor.result is not None:
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/dag_processing/processor.py", line 321, in result
    if not self.done:
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/dag_processing/processor.py", line 286, in done
    if self._parent_channel.poll():
  File "/usr/local/lib/python3.7/multiprocessing/connection.py", line 255, in poll
    self._check_closed()
  File "/usr/local/lib/python3.7/multiprocessing/connection.py", line 136, in _check_closed
    raise OSError("handle is closed")
OSError: handle is closed

This has been thrown by Airflow 2.1.3, but we've seen very similar (if not equal) variations with versions all the way up to Airflow 2.2.4.

Given we traced the problem down to the way multiprocessing synchronisation was being handled we played around with multiprocessing's start method through the mp_start_method configuration parameter which wasn't included in the stock configuration example:

def _get_multiprocessing_start_method(self) -> str:
"""
Determine method of creating new processes by checking if the
mp_start_method is set in configs, else, it uses the OS default.
"""
if conf.has_option('core', 'mp_start_method'):
return conf.get('core', 'mp_start_method')
method = multiprocessing.get_start_method()
if not method:
raise ValueError("Failed to determine start method")
return method

The containers we are using leverage fork as the default way of creating new process. After trying that one out we moved on to using spawn and ended up settling on forkserver. No matter the start method we leveraged, we ran into the same issue over and over again.

For a while we coped with this behaviour by just restarting the Airflow deployment on an hourly basis, but we decided to set some time apart today to delve a bit deeper into all this. The good news is after a thorough investigation we noticed a pattern the preceded the crash.

In order to pin it down down ran ps(1) on the scheduler container. We also monitored the DAG Processor Manager log (which we have at /opt/airflow/logs/dag_processor_manager/dag_processor_manager.log given our Airflow home is /opt/airflow) and we took a look at the scheduler's log through kubectl logs given it's sent to stdout/stderr. The pattern itself goes something like:

  1. A DAGFileProcessor get's stuck for longer than dag_file_processor_timeout as seen on ps' output.
  2. As soon as the timeout is exceeded, the DAGFileProcessorManager kills the stuck DAGFileProcessor.
  3. When the DAGFileProcessorManager tries to collect results back from the different DAGFileProcessors it crashes.

The above led us to believe something was a bit off in the way the DAGFileProcessors were being killed. Given our Docker-based deployment allowed for it, we retrieved a copy of the stock manager.py and processor.py files and added a bit of logging through self.log.debug(). The following appeared on out DAGFileProcessorManager log:

[2022-03-21 13:01:00,747] {manager.py:1163} DEBUG - FOO - Looking for DAG processors to terminate due to timeouts!
[2022-03-21 13:01:00,748] {manager.py:1172} ERROR - Processor for /opt/airflow/dags/dags-airflow/spark_streaming.py with PID 965 started at 2022-03-21T13:00:10.536124+00:00 has timed out, killing it.
[2022-03-21 13:01:00,748] {manager.py:1178} DEBUG - FOO - # of waitables BEFORE killing timed out processor: 2
[2022-03-21 13:01:00,748] {manager.py:1180} DEBUG - FOO - # of waitables AFTER killing timed out processor: 2
[2022-03-21 13:01:00,748] {manager.py:1013} DEBUG - 1/1 DAG parsing processes running
[2022-03-21 13:01:00,748] {manager.py:1015} DEBUG - 2 file paths queued for processing
[2022-03-21 13:01:00,758] {manager.py:978} DEBUG - Processor for /opt/airflow/dags/dags-airflow/spark_streaming.py finished
[2022-03-21 13:01:00,758] {manager.py:982} DEBUG - FOO - Trying to get result for processor with PID: 965

Can you see how the number of waitables (more on that later) doesn't change even though we're killing a DAGFileProcessor? We believe that's what's causing the trouble...

Note we added the - FOO - token to the logging entries we added to easily grep for them. These entries were generated with calls to self.log.debug() within the _kill_timed_out_processors() function. The 'stock' version is:

def _kill_timed_out_processors(self):
"""Kill any file processors that timeout to defend against process hangs."""
now = timezone.utcnow()
for file_path, processor in self._processors.items():
duration = now - processor.start_time
if duration > self._processor_timeout:
self.log.error(
"Processor for %s with PID %s started at %s has timed out, killing it.",
file_path,
processor.pid,
processor.start_time.isoformat(),
)
Stats.decr('dag_processing.processes')
Stats.incr('dag_processing.processor_timeouts')
# TODO: Remove after Airflow 2.0
Stats.incr('dag_file_processor_timeouts')
processor.kill()

After we added additional logging it looked like:

def _kill_timed_out_processors(self):
    """Kill any file processors that timeout to defend against process hangs."""
    self.log.debug("FOO - Looking for DAG processors to terminate due to timeouts!")
    now = timezone.utcnow()
    processors_to_remove = []
    for file_path, processor in self._processors.items():
        duration = now - processor.start_time
        if duration > self._processor_timeout:
            self.log.error(
                "Processor for %s with PID %s started at %s has timed out, killing it.",
                file_path,
                processor.pid,
                processor.start_time.isoformat(),
            )
            Stats.decr('dag_processing.processes')
            Stats.incr('dag_processing.processor_timeouts')
            # TODO: Remove after Airflow 2.0
            Stats.incr('dag_file_processor_timeouts')
            self.log.debug(f"FOO - # of waitables BEFORE killing timed out processor: {len(self.waitables)}")
            processor.kill()
            self.log.debug(f"FOO - # of waitables AFTER killing timed out processor: {len(self.waitables)}")

You can see how we call the kill() (which basically wraps the processor's _kill_process() method) method on the timed out processor. We believe the key of all this resides on line 246 of:

def _kill_process(self) -> None:
if self._process is None:
raise AirflowException("Tried to kill process before starting!")
if self._process.is_alive() and self._process.pid:
self.log.warning("Killing DAGFileProcessorProcess (PID=%d)", self._process.pid)
os.kill(self._process.pid, signal.SIGKILL)
if self._parent_channel:
self._parent_channel.close()

Notice how the end of the communicating pipe opened on line 187(below) is being closed on that line!

_parent_channel, _child_channel = context.Pipe(duplex=False)

That's exactly the same pipe end (i.e. file descriptor) the DAGFileProcessorManager tries to read from later on! If we look at the traceback we included before it's line 286 which ultimately triggers the OSError:

if self._parent_channel.poll():

What it's trying to do is poll() a closed pipe. If we take a look at multiprocessing's implementation we'll check how, as shown on the traceback, it calls _check_closed() on the pipe's file descriptor (i.e. handle) before proceeding: this, as seen before, triggers the OSError.

So... why are we trying to collect results from a DAGFileProcessor we killed due to a timeout? in order to answer that we took a walk around _run_parsing_loop():

def _run_parsing_loop(self):
# In sync mode we want timeout=None -- wait forever until a message is received
if self._async_mode:
poll_time = 0.0
else:
poll_time = None
self._refresh_dag_dir()
self.prepare_file_path_queue()
if self._async_mode:
# If we're in async mode, we can start up straight away. If we're
# in sync mode we need to be told to start a "loop"
self.start_new_processes()
while True:
loop_start_time = time.monotonic()
ready = multiprocessing.connection.wait(self.waitables.keys(), timeout=poll_time)
if self._signal_conn in ready:
agent_signal = self._signal_conn.recv()
self.log.debug("Received %s signal from DagFileProcessorAgent", agent_signal)
if agent_signal == DagParsingSignal.TERMINATE_MANAGER:
self.terminate()
break
elif agent_signal == DagParsingSignal.END_MANAGER:
self.end()
sys.exit(os.EX_OK)
elif agent_signal == DagParsingSignal.AGENT_RUN_ONCE:
# continue the loop to parse dags
pass
elif isinstance(agent_signal, CallbackRequest):
self._add_callback_to_queue(agent_signal)
else:
raise ValueError(f"Invalid message {type(agent_signal)}")
if not ready and not self._async_mode:
# In "sync" mode we don't want to parse the DAGs until we
# are told to (as that would open another connection to the
# SQLite DB which isn't a good practice
# This shouldn't happen, as in sync mode poll should block for
# ever. Lets be defensive about that.
self.log.warning(
"wait() unexpectedly returned nothing ready after infinite timeout (%r)!", poll_time
)
continue
for sentinel in ready:
if sentinel is self._signal_conn:
continue
processor = self.waitables.get(sentinel)
if not processor:
continue
self._collect_results_from_processor(processor)
self.waitables.pop(sentinel)
self._processors.pop(processor.file_path)
self._refresh_dag_dir()
self._find_zombies()
self._kill_timed_out_processors()
# Generate more file paths to process if we processed all the files
# already.
if not self._file_path_queue:
self.emit_metrics()
self.prepare_file_path_queue()
self.start_new_processes()
# Update number of loop iteration.
self._num_run += 1
if not self._async_mode:
self.log.debug("Waiting for processors to finish since we're using sqlite")
# Wait until the running DAG processors are finished before
# sending a DagParsingStat message back. This means the Agent
# can tell we've got to the end of this iteration when it sees
# this type of message
self.wait_until_finished()
# Collect anything else that has finished, but don't kick off any more processors
self.collect_results()
self._print_stat()
all_files_processed = all(self.get_last_finish_time(x) is not None for x in self.file_paths)
max_runs_reached = self.max_runs_reached()
try:
self._signal_conn.send(
DagParsingStat(
max_runs_reached,
all_files_processed,
)
)
except BlockingIOError:
# Try again next time around the loop!
# It is better to fail, than it is deadlock. This should
# "almost never happen" since the DagParsingStat object is
# small, and in async mode this stat is not actually _required_
# for normal operation (It only drives "max runs")
self.log.debug("BlockingIOError received trying to send DagParsingStat, ignoring")
if max_runs_reached:
self.log.info(
"Exiting dag parsing loop as all files have been processed %s times", self._max_runs
)
break
if self._async_mode:
loop_duration = time.monotonic() - loop_start_time
if loop_duration < 1:
poll_time = 1 - loop_duration
else:
poll_time = 0.0

It basically runs an infinite (unless we specify a maximum number of runs) loop that calls multiprocessing.connection.wait based on the contents of self.waitables. This attribute is a dictionary containing a reference to the different DAGFileProcessor's spawned by the DAGFileProcessorManager. Entries are added on line 1034 within start_new_process():

def start_new_processes(self):
"""Start more processors if we have enough slots and files to process"""
while self._parallelism - len(self._processors) > 0 and self._file_path_queue:
file_path = self._file_path_queue.pop(0)
# Stop creating duplicate processor i.e. processor with the same filepath
if file_path in self._processors.keys():
continue
callback_to_execute_for_file = self._callback_to_execute[file_path]
processor = self._processor_factory(
file_path, callback_to_execute_for_file, self._dag_ids, self._pickle_dags
)
del self._callback_to_execute[file_path]
Stats.incr('dag_processing.processes')
processor.start()
self.log.debug("Started a process (PID: %s) to generate tasks for %s", processor.pid, file_path)
self._processors[file_path] = processor
self.waitables[processor.waitable_handle] = processor

However, this dictionary is not updated when a processor is killed due to a timeout. You can check that out on the snippet we included above. Thus, after the timed out DAGFileProcessor is killed, the infinite loop on _run_parsing_loop() will assume the underlying process is ready (it's done, as we've effectively terminated it) and will try to read from the pipe end we closed on _kill_process(), thus triggering the exception and bringing everything down. In other words, we believe the self.waitables attribute is not being updated as it should when DAGFileProcessors are terminated due to timeouts. The same is true for the self._processors attribute on the DAGFileProcessorManager. After all, _kill_timed_out_processors() iterates over its contents... If we don't update it too we'll see how we try to kill an already terminated process over and over again.

After some testing we arrived at the following implementation of _kill_timed_out_processors():

def _kill_timed_out_processors(self):
    """Kill any file processors that timeout to defend against process hangs."""
    self.log.debug("FOO - ** Entering _kill_timed_out_processors() **")

    # We'll get a clear picture of what these two attributes look like before
        # killing anything.
    self.log.debug(f"FOO - We'll iterate over: {self._processors}")
    self.log.debug(f"FOO - Current waitables dir: {self.waitables}")
    now = timezone.utcnow()
    processors_to_remove = []
    for file_path, processor in self._processors.items():
        duration = now - processor.start_time
        if duration > self._processor_timeout:
            self.log.error(
                "Processor for %s with PID %s started at %s has timed out, killing it.",
                file_path,
                processor.pid,
                processor.start_time.isoformat(),
            )
            Stats.decr('dag_processing.processes')
            Stats.incr('dag_processing.processor_timeouts')
            # TODO: Remove after Airflow 2.0
            Stats.incr('dag_file_processor_timeouts')

            # Add some logging to check stuff out
            self.log.debug(f"FOO - # of waitables BEFORE killing timed out processor: {len(self.waitables)}")
            self.log.debug(f"FOO - We'll iterate over: {self._processors}")
            self.log.debug(f"FOO - Current waitables dir: {self.waitables}")

            # Kill the hanged processor
            processor.kill()

            # Update self.waitables to avoid asking for results later on
            self.waitables.pop(processor.waitable_handle)

            # Make a note of the file_paths we are to remove later on: we feel a bit uneasy about modifying the
                # container we're currently iterating over...
            processors_to_remove.append(file_path)

            # Add some logging to check how stuff is doing...
            self.log.debug(f"FOO - # of waitables AFTER killing timed out processor: {len(self.waitables)}")
            self.log.debug(f"FOO - We'll iterate over: {self._processors}")
            self.log.debug(f"FOO - Current waitables dir: {self.waitables}")

    # Clean up `self._processors` too!
    for proc in processors_to_remove:
        self._processors.pop(proc)

    # And after we're done take a look at the final state
    self.log.debug(f"FOO - Processors after cleanup: {self._processors}")
    self.log.debug(f"FOO - Waitables after cleanup:  {self.waitables}")
    self.log.debug(f"FOO - ** Leaving _kill_timed_out_processors() with **")

We know the above can surely be written in a more succinct/better way: we're by no means good programmers!

Against all odds, the code above seems to prevent the crash! 🎉 It does, however, spawn zombies when we kill the DAGFileProcessor: it's not being wait()ed for...

We decided to also play around with the DAGFileProcessor's _kill_process() method a bit in the name of science to try and prevent that zombie from spawning:

def _kill_process(self) -> None:
    if self._process is None:
        raise AirflowException("Tried to kill process before starting!")

    if self._process.is_alive() and self._process.pid:
        self.log.warning("Killing DAGFileProcessorProcess (PID=%d)", self._process.pid)
        os.kill(self._process.pid, signal.SIGKILL)

        # Reap the resulting zombie! Note the call to `waitpid()` blocks unless we
            # leverage the `WNOHANG` (https://docs.python.org/3/library/os.html#os.WNOHANG)
            # option. Given we were just playing around we decided not to bother with that...
        self.warning(f"FOO - Waiting to reap the zombie spawned from PID {self._process.pid}")
        os.waitpid(self._process.pid)
        self.warning(f"FOO - Reaped the zombie spawned from PID {self._process.pid}")
    if self._parent_channel:
        self._parent_channel.close()

From what we could see, the above reaped the zombie like we initially expected it to.

So, after all this nonsense we just wanted to end up by saying that we believe it's the way DAGFileManagerProcessor's attributes are being cleaned up that crashes Airflow for us. In our experience this is triggered by a DAGFileProcessor being forcefully terminated after a timeout.

We would also like to thank everybody making Airflow possible: it's one heck of a tool!

Feel free to ask for more details and, if we got anything wrong (it wouldn't be the first time), please do let us know!

@potiuk
Copy link
Member

potiuk commented Mar 27, 2022

@pcolladosoto - I ❤️ your detailed description and explanation. It reads like a good crime story 🗡️

Fantastic investigation and outcome.

How about you clean it up a bit and submit a PR fixing ti?

@pcolladosoto
Copy link
Contributor

Hi @potiuk! I'm glad you found our investigation useful and that you had fun reading through it. Reading so may Agatha Christie books has to pay off at some point 😜

I would be more than happy to polish it all up and open a Pull Request so that the changes are incorporated into Airflow itself. I'll do my best to find some time to do it throughout the week.

And thanks a ton for the kind words! I really appreciate it 😋

@potiuk
Copy link
Member

potiuk commented Mar 28, 2022

Cool. If you have any questions during the contribution process - happy to help - just "@" me. And even if you are not sure about some of the decisions, we can discuss it in the PR and iterate before we merge (and drag-in more of the minds here to make it really good)

@ashb ashb added this to the Airflow 2.3.0 milestone Mar 28, 2022
@dlesco
Copy link
Author

dlesco commented Mar 30, 2022

This looks like a better approach than what I was thinking.

One comment on the _kill_process change. self._process is an instance of multiprocessing.Process. An alternative to using os.waitpid is to use self._process.join(optional_timeout), which is multiprocessing's higher-level interface to the OS waitpid code. (Python versions >=3.7 also support a kill() method on the multiprocessing Process, but probably os.kill was used in order to support older versions of Python).

The question is whether to use a timeout or not for the join. Because SIGKILL was used, doing a join afterwards should always work, because KILL should always work (unlike SIGTERM, SIGTERM might not end the process). I don't know if the community wants to be extra cautious, and have a timeout on the join, and if a timeout occurs, abnormally end the whole process with an exception, because the OS did something unexpected. Abending would be better than hanging forever.

@pcolladosoto
Copy link
Contributor

That's one if the issues I wanted to bring up on the PR... As you say, SIGKILL should terminate the process in any circumstance. However, I do lean towards adding an optional (maybe hardcoded?) timeout too. Even though forcefully waiting the process should not be a problem in a normal scenario it can indeed become an indefinitely blocking operation in the event of the kernel misbehaving.

The other key aspect is thinking about how we want that to be handled. Should we halt it all with an exception? Should we just cope with the (hopefully) few zombies that could be spawned and just show something on the log? I'm going to open a PR right now to try and move this discussion there. That way we can iterate over the available choices and decided what's best.

Thanks a ton for the input!

@pcolladosoto
Copy link
Contributor

Hi! As seen above I've just opened a related PR: #22685

It would be great if we could continue the discussion over there and iterate over a few solutions until we arrive at the final thing.

Does the PR look good @potiuk or should I polish it up a bit more?

Thanks a ton for your time!

@potiuk
Copy link
Member

potiuk commented Apr 1, 2022

It does look good. I think it's the simples but also most efficient solution.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:core kind:bug This is a clearly a bug
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants