Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DAG fails serialization if template_field contains execution_timeout #29819

Closed
1 of 2 tasks
cosinequanon opened this issue Feb 28, 2023 · 1 comment · Fixed by #29821
Closed
1 of 2 tasks

DAG fails serialization if template_field contains execution_timeout #29819

cosinequanon opened this issue Feb 28, 2023 · 1 comment · Fixed by #29821
Labels
affected_version:2.5 Issues Reported for 2.5 area:core kind:bug This is a clearly a bug
Milestone

Comments

@cosinequanon
Copy link

cosinequanon commented Feb 28, 2023

Apache Airflow version

2.5.1

What happened

If an Operator specifies a template_field with execution_timeout then the DAG will serialize correctly but throw an error during deserialization. This causes the entire scheduler to crash and breaks the application.

What you think should happen instead

The scheduler should never go down because of some code someone wrote, this should probably throw an error during serialization.

How to reproduce

Define an operator like this

class ExecutionTimeoutOperator(BaseOperator):
    template_fields = ("execution_timeout", )

    def __init__(self, execution_timeout: timedelta, **kwargs):
        super().__init__(**kwargs)
        self.execution_timeout = execution_timeout

then make a dag like this

dag = DAG(
    "serialize_with_default",
    schedule_interval="0 12 * * *",
    start_date=datetime(2023, 2, 28),
    catchup=False,
    default_args={
        "execution_timeout": timedelta(days=4),
    },
)

with dag:
    execution = ExecutionTimeoutOperator(task_id="execution", execution_timeout=timedelta(hours=1))

that will break the scheduler, you can force the stack trace by doing this

from airflow.models import DagBag
db = DagBag('dags/', read_dags_from_db=True)
db.get_dag('serialize_with_default')
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/local/lib/python3.9/site-packages/airflow/utils/session.py", line 75, in wrapper
    return func(*args, session=session, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/airflow/models/dagbag.py", line 190, in get_dag
    self._add_dag_from_db(dag_id=dag_id, session=session)
  File "/usr/local/lib/python3.9/site-packages/airflow/models/dagbag.py", line 265, in _add_dag_from_db
    dag = row.dag
  File "/usr/local/lib/python3.9/site-packages/airflow/models/serialized_dag.py", line 218, in dag
    dag = SerializedDAG.from_dict(self.data)
  File "/usr/local/lib/python3.9/site-packages/airflow/serialization/serialized_objects.py", line 1287, in from_dict
    return cls.deserialize_dag(serialized_obj["dag"])
  File "/usr/local/lib/python3.9/site-packages/airflow/serialization/serialized_objects.py", line 1194, in deserialize_dag
    v = {task["task_id"]: SerializedBaseOperator.deserialize_operator(task) for task in v}
  File "/usr/local/lib/python3.9/site-packages/airflow/serialization/serialized_objects.py", line 1194, in <dictcomp>
    v = {task["task_id"]: SerializedBaseOperator.deserialize_operator(task) for task in v}
  File "/usr/local/lib/python3.9/site-packages/airflow/serialization/serialized_objects.py", line 955, in deserialize_operator
    cls.populate_operator(op, encoded_op)
  File "/usr/local/lib/python3.9/site-packages/airflow/serialization/serialized_objects.py", line 864, in populate_operator
    v = cls._deserialize_timedelta(v)
  File "/usr/local/lib/python3.9/site-packages/airflow/serialization/serialized_objects.py", line 513, in _deserialize_timedelta
    return datetime.timedelta(seconds=seconds)
TypeError: unsupported type for timedelta seconds component: str

Operating System

Mac 13.1 (22C65)

Versions of Apache Airflow Providers

apache-airflow-providers-amazon==5.1.0
apache-airflow-providers-apache-hdfs==3.2.0
apache-airflow-providers-apache-hive==5.1.1
apache-airflow-providers-apache-spark==4.0.0
apache-airflow-providers-celery==3.1.0
apache-airflow-providers-cncf-kubernetes==5.1.1
apache-airflow-providers-common-sql==1.3.3
apache-airflow-providers-datadog==3.1.0
apache-airflow-providers-ftp==3.3.0
apache-airflow-providers-http==4.1.1
apache-airflow-providers-imap==3.1.1
apache-airflow-providers-jdbc==3.3.0
apache-airflow-providers-jenkins==3.2.0
apache-airflow-providers-mysql==4.0.0
apache-airflow-providers-pagerduty==3.1.0
apache-airflow-providers-postgres==5.4.0
apache-airflow-providers-presto==4.2.1
apache-airflow-providers-slack==7.2.0
apache-airflow-providers-sqlite==3.3.1
apache-airflow-providers-ssh==3.4.0

Deployment

Docker-Compose

Deployment details

I could repro this with docker-compose and in a helm backed deployment so I don't think it's really related to the deployment details

Anything else

In the serialization code there are two pieces of logic that are in
direct conflict with each other. The first dictates how template fields
are serialized, from the code

# Store all template_fields as they are if there are JSON Serializable
# If not, store them as strings

and the second special cases a few names of arguments that need to be
deserialized in a specific way

elif k in {"retry_delay", "execution_timeout", "sla", "max_retry_delay"}:
    v = cls._deserialize_timedelta(v)

so during serialization airflow sees that execution_timeout is a
template field, serializes it as a string, then during deserialization
it is a special name that forces the deserialization as timedelta and BOOM!

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@cosinequanon cosinequanon added area:core kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet labels Feb 28, 2023
@boring-cyborg
Copy link

boring-cyborg bot commented Feb 28, 2023

Thanks for opening your first issue here! Be sure to follow the issue template!

@hussein-awala hussein-awala added affected_version:2.5 Issues Reported for 2.5 and removed needs-triage label for new issues that we didn't triage yet labels Feb 28, 2023
@potiuk potiuk added this to the Airflow 2.5.2 milestone Mar 4, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
affected_version:2.5 Issues Reported for 2.5 area:core kind:bug This is a clearly a bug
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants