diff --git a/airflow/models/abstractoperator.py b/airflow/models/abstractoperator.py index 9a8f88ce7df24..0c6d89fff3b5d 100644 --- a/airflow/models/abstractoperator.py +++ b/airflow/models/abstractoperator.py @@ -302,7 +302,8 @@ def get_upstreams_only_setups_and_teardowns(self) -> Iterable[Operator]: This method is meant to be used when we are clearing the task (non-upstream) and we need to add in the *relevant* setups and their teardowns. - Relevant in this case means, the setup has a teardown that is downstream of ``self``. + Relevant in this case means, the setup has a teardown that is downstream of ``self``, + or the setup has no teardowns. """ downstream_teardown_ids = { x.task_id for x in self.get_flat_relatives(upstream=False) if x.is_teardown @@ -310,7 +311,9 @@ def get_upstreams_only_setups_and_teardowns(self) -> Iterable[Operator]: for task in self.get_flat_relatives(upstream=True): if not task.is_setup: continue - if not task.downstream_task_ids.isdisjoint(downstream_teardown_ids): + has_no_teardowns = not any(True for x in task.downstream_list if x.is_teardown) + # if task has no teardowns or has teardowns downstream of self + if has_no_teardowns or task.downstream_task_ids.intersection(downstream_teardown_ids): yield task for t in task.downstream_list: if t.is_teardown and not t == self: diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 391be9e582367..1d380274f63a4 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -714,10 +714,6 @@ def validate_setup_teardown(self): :meta private: """ for task in self.tasks: - if task.is_setup and not any(x.is_teardown for x in task.downstream_list): - raise AirflowDagInconsistent( - f"Dag has setup without teardown: dag='{self.dag_id}', task='{task.task_id}'" - ) if task.is_teardown and all(x.is_setup for x in task.upstream_list): raise AirflowDagInconsistent( f"Dag has teardown task without an upstream work task: dag='{self.dag_id}'," diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py index 14527fd7facc3..494c7c7c66123 100644 --- a/tests/models/test_dag.py +++ b/tests/models/test_dag.py @@ -3623,39 +3623,46 @@ def cleared_neither(task): def test_get_flat_relative_ids_with_setup(self): with DAG(dag_id="test_dag", start_date=pendulum.now()) as dag: - s1, w1, w2, w3, t1 = self.make_tasks(dag, "s1, w1, w2, w3, t1") + s1, w1, w2, w3, w4, t1 = self.make_tasks(dag, "s1, w1, w2, w3, w4, t1") s1 >> w1 >> w2 >> w3 - # there is no teardown downstream of w1, so we assume w1 does not need s1 - assert set(w1.get_upstreams_only_setups_and_teardowns()) == set() + # w1 is downstream of s1, and s1 has no teardown, so clearing w1 clears s1 + assert set(w1.get_upstreams_only_setups_and_teardowns()) == {s1} # same with w2 and w3 - assert set(w2.get_upstreams_only_setups_and_teardowns()) == set() - assert set(w3.get_upstreams_only_setups_and_teardowns()) == set() - assert self.cleared_downstream(w2) == {w2, w3} + assert set(w2.get_upstreams_only_setups_and_teardowns()) == {s1} + assert set(w3.get_upstreams_only_setups_and_teardowns()) == {s1} + # so if we clear w2, we should also get s1, and w3, but not w1 + assert self.cleared_downstream(w2) == {s1, w2, w3} w3 >> t1 # now, w2 has a downstream teardown, but it's not connected directly to s1 - # (this is how we signal "this is the teardown for this setup") - # so still, we don't regard s1 as a setup for w2 - assert set(w2.get_upstreams_only_setups_and_teardowns()) == set() - assert self.cleared_downstream(w2) == {w2, w3, t1} + assert set(w2.get_upstreams_only_setups_and_teardowns()) == {s1} + # so if we clear downstream then s1 will be cleared, and t1 will be cleared but only by virtue of + # being downstream of w2 -- not as a result of being the teardown for s1, which it ain't + assert self.cleared_downstream(w2) == {s1, w2, w3, t1} + # and, another consequence of not linking s1 and t1 is that when we clear upstream, note that + # t1 doesn't get cleared -- cus it's not upstream and it's not linked to s1 + assert self.cleared_upstream(w2) == {s1, w1, w2} + # note also that if we add a 4th work task after t1, it will still be "in scope" for s1 + t1 >> w4 + assert self.cleared_downstream(w4) == {s1, w4} s1 >> t1 - # now, we know that t1 is the teardown for s1, and it's downstream of - # w2, so we can infer that w2 requires it, so now when we clear w2, - # we will get s1 (because it's a setup for w2) and t1 (because - # it is a teardown for s1) + # now, we know that t1 is the teardown for s1, so now we know that s1 will be "torn down" + # by the time w4 runs, so we now know that w4 no longer requires s1, so when we clear w4, + # s1 will not also be cleared + self.cleared_downstream(w4) == {w4} assert set(w1.get_upstreams_only_setups_and_teardowns()) == {s1, t1} - assert self.cleared_downstream(w1) == {s1, w1, w2, w3, t1} + assert self.cleared_downstream(w1) == {s1, w1, w2, w3, t1, w4} assert self.cleared_upstream(w1) == {s1, w1, t1} assert set(w2.get_upstreams_only_setups_and_teardowns()) == {s1, t1} assert set(w2.get_upstreams_follow_setups()) == {s1, w1, t1} - assert self.cleared_downstream(w2) == {s1, w2, w3, t1} + assert self.cleared_downstream(w2) == {s1, w2, w3, t1, w4} assert self.cleared_upstream(w2) == {s1, w1, w2, t1} - assert self.cleared_downstream(w3) == {s1, w3, t1} + assert self.cleared_downstream(w3) == {s1, w3, t1, w4} assert self.cleared_upstream(w3) == {s1, w1, w2, w3, t1} def test_get_flat_relative_ids_with_setup_nested_ctx_mgr(self): @@ -3695,32 +3702,6 @@ def test_get_flat_relative_ids_with_setup_nested_no_ctx_mgr(self): # t1 not included because t1 is not downstream assert self.cleared_downstream(w3) == {s2, w3, t2} - def test_setup_without_teardown(self): - """A setup needs a teardown to define its scope.""" - with DAG(dag_id="test_dag", start_date=pendulum.now()) as dag: - s1, w1, t1 = self.make_tasks(dag, "s1, w1, t1") - # s1 has no teardown: fail - with pytest.raises(AirflowDagInconsistent): - dag.validate_setup_teardown() - - s1 >> w1 - # w1 depends on s1 but not as a "setup" per se, since s1 doesn't have a teardown to define - # its scope - with pytest.raises(AirflowDagInconsistent): - dag.validate_setup_teardown() - - w1 >> t1 - # now t1 is technically downstream of s1, but we still must wire it up explicitly - # to define the setup/teardown relationship - with pytest.raises(AirflowDagInconsistent): - dag.validate_setup_teardown() - - s1 >> t1 - # now, s1 and t1 are linked as setups and teardowns - # anything upstream of t1 and downstream of s1 is in the scope for s1 - # so now this passes validation - dag.validate_setup_teardown() - def test_get_flat_relative_ids_follows_teardowns(self): with DAG(dag_id="test_dag", start_date=pendulum.now()) as dag: s1, w1, w2, t1 = self.make_tasks(dag, "s1, w1, w2, t1") @@ -3918,25 +3899,6 @@ def my_teardown(): def test_validate_setup_teardown_dag(self, dag_maker): """Test some invalid setups and teardowns in a dag""" - with dag_maker("test_dag") as dag: - s1, w1, w2, t1 = self.make_tasks(dag, "s1, w1, w2, t1") - - with s1: - w1 >> t1 - w2 >> t1 - with pytest.raises( - AirflowDagInconsistent, match="Dag has setup without teardown: dag='test_dag', task='s1'" - ): - dag.validate() - - with dag_maker("test_dag") as dag: - s1, w1, w2, t1 = self.make_tasks(dag, "s1, w1, w2, t1") - s1 >> w1 >> w2 >> t1 - with pytest.raises( - AirflowDagInconsistent, match="Dag has setup without teardown: dag='test_dag', task='s1'" - ): - dag.validate() - with dag_maker("test_dag") as dag: s1, w1, w2, t1 = self.make_tasks(dag, "s1, w1, w2, t1") w1 >> w2