Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions airflow/models/abstractoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,15 +302,18 @@ def get_upstreams_only_setups_and_teardowns(self) -> Iterable[Operator]:
This method is meant to be used when we are clearing the task (non-upstream) and we need
to add in the *relevant* setups and their teardowns.

Relevant in this case means, the setup has a teardown that is downstream of ``self``.
Relevant in this case means, the setup has a teardown that is downstream of ``self``,
or the setup has no teardowns.
"""
downstream_teardown_ids = {
x.task_id for x in self.get_flat_relatives(upstream=False) if x.is_teardown
}
for task in self.get_flat_relatives(upstream=True):
if not task.is_setup:
continue
if not task.downstream_task_ids.isdisjoint(downstream_teardown_ids):
has_no_teardowns = not any(True for x in task.downstream_list if x.is_teardown)
# if task has no teardowns or has teardowns downstream of self
if has_no_teardowns or task.downstream_task_ids.intersection(downstream_teardown_ids):
yield task
for t in task.downstream_list:
if t.is_teardown and not t == self:
Expand Down
4 changes: 0 additions & 4 deletions airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -714,10 +714,6 @@ def validate_setup_teardown(self):
:meta private:
"""
for task in self.tasks:
if task.is_setup and not any(x.is_teardown for x in task.downstream_list):
raise AirflowDagInconsistent(
f"Dag has setup without teardown: dag='{self.dag_id}', task='{task.task_id}'"
)
if task.is_teardown and all(x.is_setup for x in task.upstream_list):
raise AirflowDagInconsistent(
f"Dag has teardown task without an upstream work task: dag='{self.dag_id}',"
Expand Down
86 changes: 24 additions & 62 deletions tests/models/test_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -3623,39 +3623,46 @@ def cleared_neither(task):

def test_get_flat_relative_ids_with_setup(self):
with DAG(dag_id="test_dag", start_date=pendulum.now()) as dag:
s1, w1, w2, w3, t1 = self.make_tasks(dag, "s1, w1, w2, w3, t1")
s1, w1, w2, w3, w4, t1 = self.make_tasks(dag, "s1, w1, w2, w3, w4, t1")

s1 >> w1 >> w2 >> w3

# there is no teardown downstream of w1, so we assume w1 does not need s1
assert set(w1.get_upstreams_only_setups_and_teardowns()) == set()
# w1 is downstream of s1, and s1 has no teardown, so clearing w1 clears s1
assert set(w1.get_upstreams_only_setups_and_teardowns()) == {s1}
# same with w2 and w3
assert set(w2.get_upstreams_only_setups_and_teardowns()) == set()
assert set(w3.get_upstreams_only_setups_and_teardowns()) == set()
assert self.cleared_downstream(w2) == {w2, w3}
assert set(w2.get_upstreams_only_setups_and_teardowns()) == {s1}
assert set(w3.get_upstreams_only_setups_and_teardowns()) == {s1}
# so if we clear w2, we should also get s1, and w3, but not w1
assert self.cleared_downstream(w2) == {s1, w2, w3}

w3 >> t1

# now, w2 has a downstream teardown, but it's not connected directly to s1
# (this is how we signal "this is the teardown for this setup")
# so still, we don't regard s1 as a setup for w2
assert set(w2.get_upstreams_only_setups_and_teardowns()) == set()
assert self.cleared_downstream(w2) == {w2, w3, t1}
assert set(w2.get_upstreams_only_setups_and_teardowns()) == {s1}
# so if we clear downstream then s1 will be cleared, and t1 will be cleared but only by virtue of
# being downstream of w2 -- not as a result of being the teardown for s1, which it ain't
assert self.cleared_downstream(w2) == {s1, w2, w3, t1}
# and, another consequence of not linking s1 and t1 is that when we clear upstream, note that
# t1 doesn't get cleared -- cus it's not upstream and it's not linked to s1
assert self.cleared_upstream(w2) == {s1, w1, w2}
# note also that if we add a 4th work task after t1, it will still be "in scope" for s1
t1 >> w4
assert self.cleared_downstream(w4) == {s1, w4}

s1 >> t1

# now, we know that t1 is the teardown for s1, and it's downstream of
# w2, so we can infer that w2 requires it, so now when we clear w2,
# we will get s1 (because it's a setup for w2) and t1 (because
# it is a teardown for s1)
# now, we know that t1 is the teardown for s1, so now we know that s1 will be "torn down"
# by the time w4 runs, so we now know that w4 no longer requires s1, so when we clear w4,
# s1 will not also be cleared
self.cleared_downstream(w4) == {w4}
assert set(w1.get_upstreams_only_setups_and_teardowns()) == {s1, t1}
assert self.cleared_downstream(w1) == {s1, w1, w2, w3, t1}
assert self.cleared_downstream(w1) == {s1, w1, w2, w3, t1, w4}
assert self.cleared_upstream(w1) == {s1, w1, t1}
assert set(w2.get_upstreams_only_setups_and_teardowns()) == {s1, t1}
assert set(w2.get_upstreams_follow_setups()) == {s1, w1, t1}
assert self.cleared_downstream(w2) == {s1, w2, w3, t1}
assert self.cleared_downstream(w2) == {s1, w2, w3, t1, w4}
assert self.cleared_upstream(w2) == {s1, w1, w2, t1}
assert self.cleared_downstream(w3) == {s1, w3, t1}
assert self.cleared_downstream(w3) == {s1, w3, t1, w4}
assert self.cleared_upstream(w3) == {s1, w1, w2, w3, t1}

def test_get_flat_relative_ids_with_setup_nested_ctx_mgr(self):
Expand Down Expand Up @@ -3695,32 +3702,6 @@ def test_get_flat_relative_ids_with_setup_nested_no_ctx_mgr(self):
# t1 not included because t1 is not downstream
assert self.cleared_downstream(w3) == {s2, w3, t2}

def test_setup_without_teardown(self):
"""A setup needs a teardown to define its scope."""
with DAG(dag_id="test_dag", start_date=pendulum.now()) as dag:
s1, w1, t1 = self.make_tasks(dag, "s1, w1, t1")
# s1 has no teardown: fail
with pytest.raises(AirflowDagInconsistent):
dag.validate_setup_teardown()

s1 >> w1
# w1 depends on s1 but not as a "setup" per se, since s1 doesn't have a teardown to define
# its scope
with pytest.raises(AirflowDagInconsistent):
dag.validate_setup_teardown()

w1 >> t1
# now t1 is technically downstream of s1, but we still must wire it up explicitly
# to define the setup/teardown relationship
with pytest.raises(AirflowDagInconsistent):
dag.validate_setup_teardown()

s1 >> t1
# now, s1 and t1 are linked as setups and teardowns
# anything upstream of t1 and downstream of s1 is in the scope for s1
# so now this passes validation
dag.validate_setup_teardown()

def test_get_flat_relative_ids_follows_teardowns(self):
with DAG(dag_id="test_dag", start_date=pendulum.now()) as dag:
s1, w1, w2, t1 = self.make_tasks(dag, "s1, w1, w2, t1")
Expand Down Expand Up @@ -3918,25 +3899,6 @@ def my_teardown():

def test_validate_setup_teardown_dag(self, dag_maker):
"""Test some invalid setups and teardowns in a dag"""
with dag_maker("test_dag") as dag:
s1, w1, w2, t1 = self.make_tasks(dag, "s1, w1, w2, t1")

with s1:
w1 >> t1
w2 >> t1
with pytest.raises(
AirflowDagInconsistent, match="Dag has setup without teardown: dag='test_dag', task='s1'"
):
dag.validate()

with dag_maker("test_dag") as dag:
s1, w1, w2, t1 = self.make_tasks(dag, "s1, w1, w2, t1")
s1 >> w1 >> w2 >> t1
with pytest.raises(
AirflowDagInconsistent, match="Dag has setup without teardown: dag='test_dag', task='s1'"
):
dag.validate()

with dag_maker("test_dag") as dag:
s1, w1, w2, t1 = self.make_tasks(dag, "s1, w1, w2, t1")
w1 >> w2
Expand Down