Skip to content

DAGs not executing in airflow 3.1.2 aborting with SIGKILL -9 in dedicated on-prem RHEL VM #62783

@ricardojosepereira

Description

@ricardojosepereira

Apache Airflow version

Other Airflow 3 version (please specify below)

If "Other Airflow 3 version" selected, which one?

3.1.2

What happened?

Apache airflow 3.1.2 deployed in on-prem dedicated RHEL 9 VM with the following providers:
apache-airflow-providers-fab 3.0.1
apache-airflow-providers-oracle 4.3.0
apache-airflow-providers-ssh 4.2.1
apache-airflow-providers-standard 1.9.1
apache-airflow-providers-postgres 6.5.2
pandas 3.0.0

All DAGs executions terminated with in scheduler log:
2026-03-03T09:35:00.510227Z [info ] Trying to enqueue tasks: [<TaskInstance: remote_file_watcher.detect_files scheduled__2026-03-03T09:35:00+00:00 [scheduled]>] for executor: LocalExecutor(parallelism=32) [airflow.jobs.scheduler_job_runner.SchedulerJobRunner] loc=scheduler_job_runner.py:740
2026-03-03T09:35:00.513691Z [info ] Secrets backends loaded for worker [supervisor] backend_classes=['EnvironmentVariablesBackend', 'MetastoreBackend'] count=2 loc=supervisor.py:1931
2026-03-03T09:35:00.522952Z [warning ] Starting call to 'airflow.sdk.api.client.Client.request', this is the 1st time calling it. [airflow.sdk.api.client] loc=before.py:42
2026-03-03T09:35:01.524535Z [warning ] Starting call to 'airflow.sdk.api.client.Client.request', this is the 2nd time calling it. [airflow.sdk.api.client] loc=before.py:42
2026-03-03T09:35:02.800685Z [warning ] Starting call to 'airflow.sdk.api.client.Client.request', this is the 3rd time calling it. [airflow.sdk.api.client] loc=before.py:42
2026-03-03T09:35:06.097942Z [warning ] Starting call to 'airflow.sdk.api.client.Client.request', this is the 4th time calling it. [airflow.sdk.api.client] loc=before.py:42
2026-03-03T09:35:12.054394Z [info ] Process exited [supervisor] exit_code=<Negsignal.SIGKILL: -9> loc=supervisor.py:709 pid=3999744 signal_sent=SIGKILL

And the log execution get created but with size zero:

$ cd dag_id=remote_file_watcher/run_id=scheduled__2026-03-03T09:35:00+00:00/task_id=detect_files/
$ ll
total 0
-rw-r--r--. 1 airflow airflow 0 Mar 3 09:35 'attempt=1.log'

What you think should happen instead?

The DAG should execute the ssh since all the conditions are met to execute with success. But all other DAGs, terminate with same error

How to reproduce

Have a DAG with this CODE, and try to execute via scheduler:

dags/file_watcher_dag.py

from airflow.decorators import dag, task
from airflow.providers.ssh.hooks.ssh import SSHHook
from airflow.operators.python import get_current_context
from airflow.exceptions import AirflowSkipException
from airflow.models import Variable

from utils.datasets import FILES_CONFIG

REMOTE_PATH = "/db/warehous/operador_automatico/eventos/"

def check_remote_file(filename: str) -> bool:
"""Check if a file exists on the remote server via SSH"""
ssh_hook = SSHHook(ssh_conn_id="Server22")

with ssh_hook.get_conn() as ssh_client:
    stdin, stdout, stderr = ssh_client.exec_command(
        f"test -f {REMOTE_PATH}/{filename} && echo FOUND || echo NOT_FOUND"
    )
    # Use tolerant decoding in case remote output is not strict UTF-8
    result = stdout.read().decode("utf-8", errors="replace").strip()
    return result == "FOUND"

def skip_if_already_processed(dataset_uri: str):
"""Skip emitting the dataset if it was already emitted today.

Uses Airflow Variables so the state is shared across all DAG runs
during the same logical date.
"""
context = get_current_context()
logical_date = context["logical_date"].date()

var_key = f"{dataset_uri}_processed_{logical_date.isoformat()}"

already_run = Variable.get(var_key, default_var="False")
if already_run == "True":
    raise AirflowSkipException(f"{dataset_uri} already processed today")

# mark as processed for today
Variable.set(var_key, "True")

@dag(
schedule="*/5 * * * *", # check every 5 minutes
catchup=False,
max_active_runs=1, # only one DAG run at a time
tags=["watcher"]
)
def remote_file_watcher():

@task
def detect_files():
    """Detect which datasets should be emitted based on remote files.

    Returns a list of dataset URIs for which the trigger file exists.
    """
    detected_dataset_uris = []

    # Open a single SSH connection and list all files once, to avoid
    # poking the server separately for each configured filename.
    ssh_hook = SSHHook(ssh_conn_id="Server22")
    with ssh_hook.get_conn() as ssh_client:
        stdin, stdout, stderr = ssh_client.exec_command(f"ls -1 {REMOTE_PATH}")
        # Use tolerant decoding to avoid UnicodeDecodeError if filenames
        # contain bytes that are not valid UTF-8
        output = stdout.read().decode("utf-8", errors="replace").splitlines()

    remote_files = set(output)

    for filename, dataset in FILES_CONFIG.items():
        if filename in remote_files:
            print(f"{filename} detected and ready to emit dataset {dataset.uri}")
            detected_dataset_uris.append(dataset.uri)
        else:
            print(f"{filename} not found, will not emit {dataset.uri}")

    return detected_dataset_uris

@task
def emit_dataset(detected_dataset_uris, dataset_uri: str):
    """Emit a single dataset event if its file was detected and not yet processed today."""
    if dataset_uri not in detected_dataset_uris:
        raise AirflowSkipException("File for this dataset not detected, skipping emit")

    skip_if_already_processed(dataset_uri)
    print(f"Emitting dataset event: {dataset_uri}")

detected = detect_files()

# Create one emit task per dataset in FILES_CONFIG, with its own outlet,
# but reusing the same emit_dataset implementation so it's scalable.
for filename, dataset in FILES_CONFIG.items():
    emit_dataset.override(
        task_id=f"emit_{filename}",
        outlets=[dataset],
    )(detected, dataset_uri=dataset.uri)

remote_file_watcher()

Operating System

Red Hat Enterprise Linux 9.7

Versions of Apache Airflow Providers

apache-airflow-providers-fab 3.0.1
apache-airflow-providers-oracle 4.3.0
apache-airflow-providers-ssh 4.2.1
apache-airflow-providers-standard 1.9.1
apache-airflow-providers-postgres 6.5.2
pandas 3.0.0

Deployment

Virtualenv installation

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    area:Schedulerincluding HA (high availability) schedulerarea:corekind:bugThis is a clearly a bugneeds-triagelabel for new issues that we didn't triage yet

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions