Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cluster policies on mapped tasks cause broken DAGs #37013

Closed
2 tasks done
nathadfield opened this issue Jan 25, 2024 · 3 comments · Fixed by #37828
Closed
2 tasks done

Cluster policies on mapped tasks cause broken DAGs #37013

nathadfield opened this issue Jan 25, 2024 · 3 comments · Fixed by #37828
Assignees
Labels
affected_version:2.8 Issues Reported for 2.8 area:core area:dynamic-task-mapping AIP-42 kind:bug This is a clearly a bug

Comments

@nathadfield
Copy link
Collaborator

nathadfield commented Jan 25, 2024

Apache Airflow version

2.8.1

What happened?

Defining a task policy that affects an operator in a mapped task causes a DAG Import Error.

How to reproduce

Consider the following Airflow Task Policy which changes the execution_timeout for BashOperators to 30 seconds.

from datetime import timedelta
from airflow.models.baseoperator import BaseOperator
from abc import ABC


class TimedOperator(BaseOperator, ABC):
    timeout: timedelta


def task_policy(task: TimedOperator):
    if task.operator_name == 'BashOperator':
        task.execution_timeout = timedelta(seconds=30)

If we then create the following DAG and run it, we can see that the task does indeed get killed after 30 seconds according to the policy.

from datetime import datetime
from airflow import models

from airflow.operators.bash import BashOperator

with models.DAG(
    dag_id='policy_test',
    start_date=datetime(2024, 1, 1),
    catchup=False,
    schedule='0 0 * * *',
) as dag:

    task1 = BashOperator(
        task_id='single_task',
        bash_command='echo "Hello World!"; sleep 60; echo "Goodbye World!"',
    )
[2024-01-25, 15:35:21 UTC] {task_command.py:423} INFO - Running <TaskInstance: policy_test.single_task scheduled__2024-01-24T00:00:00+00:00 [running]> on host 67fd7cade381
[2024-01-25, 15:35:21 UTC] {taskinstance.py:2480} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='policy_test' AIRFLOW_CTX_TASK_ID='single_task' AIRFLOW_CTX_EXECUTION_DATE='2024-01-24T00:00:00+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='scheduled__2024-01-24T00:00:00+00:00'
[2024-01-25, 15:35:21 UTC] {subprocess.py:63} INFO - Tmp dir root location: /tmp
[2024-01-25, 15:35:21 UTC] {subprocess.py:75} INFO - Running command: ['/bin/bash', '-c', 'echo "Hello World!"; sleep 60; echo "Goodbye World!"']
[2024-01-25, 15:35:21 UTC] {subprocess.py:86} INFO - Output:
[2024-01-25, 15:35:21 UTC] {subprocess.py:93} INFO - Hello World!
[2024-01-25, 15:35:51 UTC] {timeout.py:68} ERROR - Process timed out, PID: 4315
[2024-01-25, 15:35:51 UTC] {subprocess.py:104} INFO - Sending SIGTERM signal to process group
[2024-01-25, 15:35:51 UTC] {taskinstance.py:2698} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 428, in _execute_task
    result = execute_callable(context=context, **execute_callable_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/operators/bash.py", line 203, in execute
    result = self.subprocess_hook.run_command(
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/hooks/subprocess.py", line 91, in run_command
    for raw_line in iter(self.sub_process.stdout.readline, b""):
  File "/usr/local/lib/python3.11/site-packages/airflow/utils/timeout.py", line 69, in handle_timeout
    raise AirflowTaskTimeout(self.error_message)
airflow.exceptions.AirflowTaskTimeout: Timeout, PID: 4315

However, if we introduce a dynamically mapped BashOperator task, then we are presented with the following error.

mapped_task = BashOperator.partial(
        task_id='mapped_tasks',
    ).expand(
        bash_command=[
            'echo "Hello World!"; sleep 60; echo "Goodbye World!"',
            'echo "Hello World!"; sleep 60; echo "Goodbye World!"'
        ]
    )

Screenshot 2024-01-25 at 15 37 48

Operating System

PRETTY_NAME="Debian GNU/Linux 11 (bullseye)" NAME="Debian GNU/Linux" VERSION_ID="11" VERSION="11 (bullseye)" VERSION_CODENAME=bullseye ID=debian HOME_URL="https://www.debian.org/" SUPPORT_URL="https://www.debian.org/support" BUG_REPORT_URL="https://bugs.debian.org/"

Deployment

Astronomer

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@nathadfield nathadfield added kind:bug This is a clearly a bug area:core needs-triage label for new issues that we didn't triage yet area:dynamic-task-mapping AIP-42 labels Jan 25, 2024
@RNHTTR RNHTTR removed the needs-triage label for new issues that we didn't triage yet label Jan 27, 2024
@RNHTTR
Copy link
Collaborator

RNHTTR commented Jan 27, 2024

Possibly related: #28313

@eladkal eladkal added the affected_version:2.8 Issues Reported for 2.8 label Feb 8, 2024
@eladkal
Copy link
Contributor

eladkal commented Feb 8, 2024

@cccs-seb will you have time to check if this is a regression due to #28313 ?

@nathadfield nathadfield self-assigned this Mar 1, 2024
@nathadfield
Copy link
Collaborator Author

I'm going to fix this as it's quite simple I think, i.e. adding more setter methods. So, I was initially planning to add methods for execution_timeout and queue but should I just add them for everything?

Basically, I think that any task attribute that is modified by a policy in a dynamically mapped task will be affected unless it has a setter method.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
affected_version:2.8 Issues Reported for 2.8 area:core area:dynamic-task-mapping AIP-42 kind:bug This is a clearly a bug
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants