-
Notifications
You must be signed in to change notification settings - Fork 13.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[AIRFLOW-5880] Enforce unique task ids #6549
Conversation
airflow/models/dag.py
Outdated
@@ -1190,13 +1189,8 @@ def add_task(self, task): | |||
task.end_date = min(task.end_date, self.end_date) | |||
|
|||
if task.task_id in self.task_dict: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dag = DAG("test_dag", start_date=DEFAULT_DATE):
dag.add_task(t1)
dag.add_task(t1)
Will this code still work? In my opinion, re-adding the same task should not cause an exception.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO it should. I often see it going wrong with for loops where people forget to make unique task ids. E.g.:
for i in range(5):
DummyOperator(task_id="task")
In this case, currently no warning/error is given an only the last defined task is saved. I think enforcing unique task ids here would be a good thing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding the same object is slightly different to adding a new task with a duplicate id, but I can't think of any reason good you'd want to do the case as Kamil showed in practice, so I'd go for just making it an error
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently all DAGs are broken.
For example, in example_python_operator we initialize new task using following code
run_this = PythonOperator(
task_id='print_the_context',
python_callable=print_context,
dag=dag,
)
This code refers to the DAG property being set.
https://github.com/apache/airflow/blob/master/airflow/models/baseoperator.py#L410
This property call BaseOperator.add_task method
https://github.com/apache/airflow/blob/master/airflow/models/baseoperator.py#L532
This method set the dag property
https://github.com/apache/airflow/blob/master/airflow/models/dag.py#L1202
This property call BaseOperator.add_task method
https://github.com/apache/airflow/blob/master/airflow/models/baseoperator.py#L532
This flow causes that we return to the same method again, so the first condition is met, so we have exception.
so we need to allow the option of adding the same task multiple times.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My solution:
From de7c6a66108cccdcd23927ad0e821f5150741602 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Kamil=20Bregu=C5=82a?= <kamil.bregula@polidea.com>
Date: Tue, 12 Nov 2019 17:07:04 +0100
Subject: fixup! [AIRFLOW-5880] Enforce unique task ids
---
airflow/models/dag.py | 4 ++++
1 file changed, 4 insertions(+)
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 57091ac7d..13c121dcc 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -1170,6 +1170,10 @@ class DAG(BaseDag, LoggingMixin):
:param task: the task you want to add
:type task: task
"""
+ # if the task has already been added, so do nothing.
+ if task.task_id in self.task_dict and self.task_dict[task.task_id] == task:
+ return
+
if not self.start_date and not task.start_date:
raise AirflowException("Task is missing the start_date parameter")
# if the task has no start date, assign it the same as the DAG
--
2.20.1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding the same task should be fine. Just added another commit with the above solution
airflow/models/baseoperator.py
Outdated
@@ -528,7 +528,7 @@ def dag(self, dag): | |||
elif self.has_dag() and self.dag is not dag: | |||
raise AirflowException( | |||
"The DAG assigned to {} can not be changed.".format(self)) | |||
elif self.task_id not in dag.task_dict: | |||
elif self.task_id: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm missing a if self.task_id in dag.task_dict
check and raising of DuplicateTaskIdFound
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean like this:
elif self.task_id not in dag.task_dict:
dag.add_task(self)
elif self.task_id in dag.task_dict:
raise DuplicateTaskIdFound("Task id '{}' has already been added to the DAG".format(self.task_id))
I removed not in dag.task_dict
as either the task is in task_dict or not I let it go to add_task
where we already validate it.
I am open to changing it either way so let me know which according to your makes more sense !
LGTM. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All current DAGs should work if we want to accept the change.
Codecov Report
@@ Coverage Diff @@
## master #6549 +/- ##
==========================================
- Coverage 84.07% 83.31% -0.77%
==========================================
Files 639 646 +7
Lines 36900 37298 +398
==========================================
+ Hits 31024 31073 +49
- Misses 5876 6225 +349
Continue to review full report at Codecov.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Great. I think we should also enforce unique dag_id / varibales |
To be sure, it seems we cannot enforce if two tasks that are the same operator with the same task_id are in the dag, it will fail to catch this. Is this the experience expected? |
No that is not true. For example, the following would raise an error: from airflow.utils import timezone
from airflow.operators.bash_operator import BashOperator
with DAG("test_dag", start_date=timezone.datetime(2016, 1, 1)) as dag:
t1 = BashOperator(task_id="t1", bash_command="true")
t2 = BashOperator(task_id="t1", bash_command="false")
t1 >> t2 Error:
|
This test exposed a bug in one of the example dags, that wasn't caught by apache#6549. That will be a fixed in a separate issue, but it caused the round-trip tests to fail here Fixes apache#8720
…che#8775) This test exposed a bug in one of the example dags, that wasn't caught by apache#6549. That will be a fixed in a separate issue, but it caused the round-trip tests to fail here Fixes apache#8720 (cherry picked from commit 280f1f0)
…che#8775) This test exposed a bug in one of the example dags, that wasn't caught by apache#6549. That will be a fixed in a separate issue, but it caused the round-trip tests to fail here Fixes apache#8720 (cherry picked from commit 280f1f0)
Make sure you have checked all steps below.
Jira
Description
Currently, task with equal ids in a DAG simply overwrite each other and the last assigned task is saved. This can lead to unexpected behaviour and an exception should be raised.
We've had a PendingDeprecationWarning on this for 4 years, time to get rid of it: 385add2
Tests
Commits
Documentation