Skip to content

Task Killed because of Recorded pid does not match the current pid  #38113

@keysersoza

Description

@keysersoza

Apache Airflow version

Other Airflow 2 version (please specify below)

If "Other Airflow 2 version" selected, which one?

2.5.3

What happened?

The task is being killed due to Recorded pid 248 does not match the current pid 249

What you think should happen instead?

The task should run

How to reproduce

  1. Create a DAG
  2. Use a BashOperator to run a task
  3. Run a command that starts a virtual;

Operating System

Linux

Versions of Apache Airflow Providers

No response

Deployment

Official Apache Airflow Helm Chart

Deployment details

We use the helm chart but workers are started on the side using simple kubernetes deployment files.
We use CeleryKubernetesExecutor.

Airflow is installed as a pip package in a virtual environment baked in the worker docker image.

The Worker Docker Image is then started in Kubernetes with the following command

["source /venv/bin/activate && /venv/bin/python3.7 -m airflow celery worker -q my-queuet"]

Anything else?

The other DAGs work fine. The crashing DAG has the following characteristics:

  1. Runs a BashOperator which activates a virtual environment and runs a python script in it
  2. Defines the dag in the following way:
@dag(
    schedule_interval=None,
    default_args={
        "owner": "..."
    },
    start_date=datetime.now(),
    catchup=False,
    dag_id='...',
    tags=["..."],
    params={
        "...": Param("..."),
        "...": Param("..."),
        "...": Param(1),
        "...": Param(100),
        "...": Param(0.32),
        "...": Param({}, type="object")
    },
    max_active_runs=3,

The only task is a BashOperator:

op= BashOperator(
        task_id='ExecuteRunner',
        queue="my-queue",
        bash_command="source ...;cd /home/.../data/{{ params....}};source ...;source ...;source setPyVenv;python ....py,
        env={"AIRFLOW_DAG_RUN_ID": '{{ run_id }}', "AIRFLOW_DAG_NAME": '{{ dag.dag_id }}'},
        append_env=True,
        run_as_user="my-user"
    )

The user is added to the Docker Image as a sudo user; the sudoers file has been edited to allow impersonification:

# User privilege specification
root    ALL=(ALL:ALL) ALL

airflow ALL=(ALL) NOPASSWD: ALL
# Members of the admin group may gain root privileges
%admin ALL=(ALL) ALL

# Allow members of group sudo to execute any command
%sudo   ALL=(ALL:ALL) ALL

The error happens as soon as the python script executes, everything else before logs successfully:

[2024-03-13, 15:19:53 UTC] {local_task_job.py:260} WARNING - Recorded pid 248 does not match the current pid 249
[2024-03-13, 15:19:53 UTC] {process_utils.py:133} INFO - Sending Signals.SIGTERM to group 249. PIDs of all processes in the group: [253, 309, 249]
[2024-03-13, 15:19:53 UTC] {process_utils.py:84} INFO - Sending the signal Signals.SIGTERM to group 249
[2024-03-13, 15:19:53 UTC] {taskinstance.py:1488} ERROR - Received SIGTERM. Terminating subprocesses.
[2024-03-13, 15:19:53 UTC] {subprocess.py:104} INFO - Sending SIGTERM signal to process group
[2024-03-13, 15:19:53 UTC] {taskinstance.py:1776} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/venv/lib/python3.7/site-packages/airflow/operators/bash.py", line 191, in execute
    cwd=self.cwd,
  File "/venv/lib/python3.7/site-packages/airflow/hooks/subprocess.py", line 91, in run_command
    for raw_line in iter(self.sub_process.stdout.readline, b""):
  File "/venv/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1490, in signal_handler
    raise AirflowException("Task received SIGTERM signal")
airflow.exceptions.AirflowException: Task received SIGTERM signal
[2024-03-13, 15:19:53 UTC] {taskinstance.py:1332} INFO - Marking task as FAILED. dag_id=..., task_id=ExecuteRunner, execution_date=20240313T151945, start_date=20240313T151948, end_date=20240313T151953
[2024-03-13, 15:19:53 UTC] {base_task_runner.py:116} INFO - Job 3938175: Subtask ExecuteRunner Traceback (most recent call last):
[2024-03-13, 15:19:53 UTC] {base_task_runner.py:116} INFO - Job 3938175: Subtask ExecuteRunner   File "/venv/bin/airflow", line 8, in <module>
[2024-03-13, 15:19:53 UTC] {base_task_runner.py:116} INFO - Job 3938175: Subtask ExecuteRunner     sys.exit(main())
[2024-03-13, 15:19:53 UTC] {base_task_runner.py:116} INFO - Job 3938175: Subtask ExecuteRunner   File "/venv/lib/python3.7/site-packages/airflow/__main__.py", line 48, in main
[2024-03-13, 15:19:53 UTC] {base_task_runner.py:116} INFO - Job 3938175: Subtask ExecuteRunner     args.func(args)
[2024-03-13, 15:19:53 UTC] {base_task_runner.py:116} INFO - Job 3938175: Subtask ExecuteRunner   File "/venv/lib/python3.7/site-packages/airflow/cli/cli_parser.py", line 52, in command
[2024-03-13, 15:19:53 UTC] {base_task_runner.py:116} INFO - Job 3938175: Subtask ExecuteRunner     return func(*args, **kwargs)
[2024-03-13, 15:19:53 UTC] {base_task_runner.py:116} INFO - Job 3938175: Subtask ExecuteRunner   File "/venv/lib/python3.7/site-packages/airflow/utils/cli.py", line 108, in wrapper
[2024-03-13, 15:19:53 UTC] {base_task_runner.py:116} INFO - Job 3938175: Subtask ExecuteRunner     return f(*args, **kwargs)
[2024-03-13, 15:19:53 UTC] {base_task_runner.py:116} INFO - Job 3938175: Subtask ExecuteRunner   File "/venv/lib/python3.7/site-packages/airflow/cli/commands/task_command.py", line 396, in task_run
[2024-03-13, 15:19:53 UTC] {base_task_runner.py:116} INFO - Job 3938175: Subtask ExecuteRunner     _run_task_by_selected_method(args, dag, ti)
[2024-03-13, 15:19:53 UTC] {base_task_runner.py:116} INFO - Job 3938175: Subtask ExecuteRunner   File "/venv/lib/python3.7/site-packages/airflow/cli/commands/task_command.py", line 196, in _run_task_by_selected_method
[2024-03-13, 15:19:53 UTC] {base_task_runner.py:116} INFO - Job 3938175: Subtask ExecuteRunner     _run_raw_task(args, ti)
[2024-03-13, 15:19:53 UTC] {base_task_runner.py:116} INFO - Job 3938175: Subtask ExecuteRunner   File "/venv/lib/python3.7/site-packages/airflow/cli/commands/task_command.py", line 273, in _run_raw_task
[2024-03-13, 15:19:53 UTC] {base_task_runner.py:116} INFO - Job 3938175: Subtask ExecuteRunner     pool=args.pool,
[2024-03-13, 15:19:53 UTC] {base_task_runner.py:116} INFO - Job 3938175: Subtask ExecuteRunner   File "/venv/lib/python3.7/site-packages/airflow/utils/session.py", line 75, in wrapper
[2024-03-13, 15:19:53 UTC] {base_task_runner.py:116} INFO - Job 3938175: Subtask ExecuteRunner     return func(*args, session=session, **kwargs)
[2024-03-13, 15:19:53 UTC] {base_task_runner.py:116} INFO - Job 3938175: Subtask ExecuteRunner   File "/venv/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1383, in _run_raw_task
[2024-03-13, 15:19:53 UTC] {base_task_runner.py:116} INFO - Job 3938175: Subtask ExecuteRunner     self._execute_task_with_callbacks(context, test_mode)
[2024-03-13, 15:19:53 UTC] {base_task_runner.py:116} INFO - Job 3938175: Subtask ExecuteRunner   File "/venv/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1529, in _execute_task_with_callbacks
[2024-03-13, 15:19:53 UTC] {base_task_runner.py:116} INFO - Job 3938175: Subtask ExecuteRunner     result = self._execute_task(context, task_orig)
[2024-03-13, 15:19:53 UTC] {base_task_runner.py:116} INFO - Job 3938175: Subtask ExecuteRunner   File "/venv/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1589, in _execute_task
[2024-03-13, 15:19:53 UTC] {base_task_runner.py:116} INFO - Job 3938175: Subtask ExecuteRunner     result = execute_callable(context=context)
[2024-03-13, 15:19:53 UTC] {base_task_runner.py:116} INFO - Job 3938175: Subtask ExecuteRunner   File "/venv/lib/python3.7/site-packages/airflow/operators/bash.py", line 191, in execute
[2024-03-13, 15:19:53 UTC] {base_task_runner.py:116} INFO - Job 3938175: Subtask ExecuteRunner     cwd=self.cwd,
[2024-03-13, 15:19:53 UTC] {base_task_runner.py:116} INFO - Job 3938175: Subtask ExecuteRunner   File "/venv/lib/python3.7/site-packages/airflow/hooks/subprocess.py", line 91, in run_command
[2024-03-13, 15:19:53 UTC] {base_task_runner.py:116} INFO - Job 3938175: Subtask ExecuteRunner     for raw_line in iter(self.sub_process.stdout.readline, b""):
[2024-03-13, 15:19:53 UTC] {base_task_runner.py:116} INFO - Job 3938175: Subtask ExecuteRunner   File "/venv/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1490, in signal_handler
[2024-03-13, 15:19:53 UTC] {base_task_runner.py:116} INFO - Job 3938175: Subtask ExecuteRunner     raise AirflowException("Task received SIGTERM signal")
[2024-03-13, 15:19:53 UTC] {base_task_runner.py:116} INFO - Job 3938175: Subtask ExecuteRunner airflow.exceptions.AirflowException: Task received SIGTERM signal

Full traceback from worker:

Traceback (most recent call last):
  File "/venv/lib/python3.7/site-packages/airflow/executors/celery_executor.py", line 130, in _execute_in_fork
    args.func(args)
  File "/venv/lib/python3.7/site-packages/airflow/cli/cli_parser.py", line 52, in command
    return func(*args, **kwargs)
  File "/venv/lib/python3.7/site-packages/airflow/utils/cli.py", line 108, in wrapper
    return f(*args, **kwargs)
  File "/venv/lib/python3.7/site-packages/airflow/cli/commands/task_command.py", line 396, in task_run
    _run_task_by_selected_method(args, dag, ti)
  File "/venv/lib/python3.7/site-packages/airflow/cli/commands/task_command.py", line 194, in _run_task_by_selected_method
    _run_task_by_local_task_job(args, ti)
  File "/venv/lib/python3.7/site-packages/airflow/cli/commands/task_command.py", line 253, in _run_task_by_local_task_job
    run_job.run()
  File "/venv/lib/python3.7/site-packages/airflow/jobs/base_job.py", line 258, in run
    self._execute()
  File "/venv/lib/python3.7/site-packages/airflow/jobs/local_task_job.py", line 188, in _execute
    self.heartbeat()
  File "/venv/lib/python3.7/site-packages/airflow/jobs/base_job.py", line 239, in heartbeat
    self.heartbeat_callback(session=session)
  File "/venv/lib/python3.7/site-packages/airflow/utils/session.py", line 72, in wrapper
    return func(*args, **kwargs)
  File "/venv/lib/python3.7/site-packages/airflow/jobs/local_task_job.py", line 262, in heartbeat_callback
    raise AirflowException("PID of job runner does not match")
airflow.exceptions.AirflowException: PID of job runner does not match

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    area:corekind:bugThis is a clearly a bugneeds-triagelabel for new issues that we didn't triage yet

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions