Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions UPDATING.md
Original file line number Diff line number Diff line change
Expand Up @@ -1239,6 +1239,10 @@ dag.get_task_instances()
dag.get_task_instances(session=your_session)
```

### wait_for_downstream is no longer blocked on past skipped tasks

The condition for a wait_for_downstream task is changed from all past downstream tasks to be successful to all past downstream tasks to be successful or skipped.

## Airflow 1.10.3

### New `dag_discovery_safe_mode` config option
Expand Down
4 changes: 2 additions & 2 deletions airflow/models/baseoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,11 @@ class derived from this one results in the creation of a task object,
:type end_date: datetime.datetime
:param depends_on_past: when set to true, task instances will run
sequentially while relying on the previous task's schedule to
succeed. The task instance for the start_date is allowed to run.
succeed or skip. The task instance for the start_date is allowed to run.
:type depends_on_past: bool
:param wait_for_downstream: when set to true, an instance of task
X will wait for tasks immediately downstream of the previous instance
of task X to finish successfully before it runs. This is useful if the
of task X to finish successfully or skip before it runs. This is useful if the
different instances of a task X alter the same asset, and this asset
is used by tasks downstream of task X. Note that depends_on_past
is forced to True wherever wait_for_downstream is used. Also note that
Expand Down
5 changes: 3 additions & 2 deletions airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,7 @@ def is_premature(self):
@provide_session
def are_dependents_done(self, session=None):
"""
Checks whether the dependents of this task instance have all succeeded.
Checks whether all dependents of this task instance have been succeeded or skipped.
This is meant to be used by wait_for_downstream.

This is useful when you do not want to start processing the next
Expand All @@ -553,7 +553,7 @@ def are_dependents_done(self, session=None):
TaskInstance.dag_id == self.dag_id,
TaskInstance.task_id.in_(task.downstream_task_ids),
TaskInstance.execution_date == self.execution_date,
TaskInstance.state == State.SUCCESS,
TaskInstance.state.in_([State.SKIPPED, State.SUCCESS]),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(this is the only line of non-test code that is changed -- the rest is either testing or documentation)

)
count = ti[0][0]
return count == len(task.downstream_task_ids)
Expand All @@ -565,6 +565,7 @@ def _get_previous_ti(
session: Session = None
) -> Optional['TaskInstance']:
dag = self.task.dag

if dag:
dr = self.get_dagrun(session=session)

Expand Down
7 changes: 0 additions & 7 deletions tests/dags/test_issue_1225.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,6 @@ def fail():
dag=dag1,
pool='test_backfill_pooled_task_pool',)

# DAG tests depends_on_past dependencies
dag2 = DAG(dag_id='test_depends_on_past', default_args=default_args)
dag2_task1 = DummyOperator(
task_id='test_dop_task',
dag=dag2,
depends_on_past=True,)

# DAG tests that a Dag run that doesn't complete is marked failed
dag3 = DAG(dag_id='test_dagrun_states_fail', default_args=default_args)
dag3_task1 = PythonOperator(
Expand Down
46 changes: 46 additions & 0 deletions tests/dags/test_past_dagrun_deps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from datetime import datetime

from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator

DEFAULT_DATE = datetime(2016, 1, 1)
default_args = dict(
start_date=DEFAULT_DATE,
owner='airflow')

# DAG tests depends_on_past dependencies
dag_dop = DAG(dag_id='test_depends_on_past', default_args=default_args)
dag_dop_task1 = DummyOperator(
task_id='test_dop_task',
dag=dag_dop,
depends_on_past=True,)


dag_wfd = DAG(dag_id='test_wait_for_downstream', default_args=default_args)
upstream_task = DummyOperator(
task_id='upstream_task',
dag=dag_wfd,
wait_for_downstream=True,
)
downstream_task = DummyOperator(
task_id='downstream_task',
dag=dag_wfd,
)
upstream_task.set_downstream(downstream_task)
88 changes: 88 additions & 0 deletions tests/models/test_dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import unittest

from airflow import models, settings
from airflow.exceptions import AirflowException
from airflow.models import DAG, TaskInstance as TI, clear_task_instances
from airflow.models.dagrun import DagRun
from airflow.operators.dummy_operator import DummyOperator
Expand All @@ -29,6 +30,7 @@
from airflow.utils.trigger_rule import TriggerRule
from airflow.utils.types import DagRunType
from tests.models import DEFAULT_DATE
from tests.test_utils.mock_executor import MockExecutor


class TestDagRun(unittest.TestCase):
Expand Down Expand Up @@ -559,3 +561,89 @@ def with_all_tasks_removed(dag):
dagrun.verify_integrity()
flaky_ti.refresh_from_db()
self.assertEqual(State.NONE, flaky_ti.state)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not really sure where to put these integration-like tests. They only check functionality on task instances. But they require having DagRuns for the tasks because of how previous_ti works (

dr.dag = dag
if dag.catchup:
last_dagrun = dr.get_previous_scheduled_dagrun(session=session)
else:
last_dagrun = dr.get_previous_dagrun(session=session)
if last_dagrun:
return last_dagrun.get_task_instance(self.task_id, session=session)
). They also require something to actually try running the tasks (I used dag.run). And the exception they raise is because of scheduler deadlock as expected. So I could rationalize putting this here, SchedulerJobTest, or TaskInstanceTest. Let me know if someone more familiar with the code has a preference.

I'm also not sure they're strictly necessary. I think this code is reasonably unit tested through the combination of https://github.com/apache/airflow/blob/e1c1a8dad0e01a7baa50ebe02e748429d33241fd/tests/ti_deps/deps/test_prev_dagrun_dep.py and https://github.com/apache/airflow/blob/5acc221a0993296b00430626de107515acb9c2d4/tests/models/test_taskinstance.py#L519-L545. At the same time, it might be more straightforward to follow this example in an actual Dag like here, and the task_instance.run() part of this is only tested in accidentally tangent unit tests.

def test_depends_on_past(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Love the test.


dag_id = 'test_depends_on_past'
dag = models.DagBag().get_dag(dag_id)
task = dag.tasks[0]
self.create_dag_run(dag, execution_date=timezone.datetime(2016, 1, 1, 0, 0, 0))
self.create_dag_run(dag, execution_date=timezone.datetime(2016, 1, 2, 0, 0, 0))
ti1 = TI(task, timezone.datetime(2016, 1, 1, 0, 0, 0))
ti2 = TI(task, timezone.datetime(2016, 1, 2, 0, 0, 0))

def run_2nd_dagrun():
dag.run(
start_date=timezone.datetime(2016, 1, 2, 0, 0, 0),
end_date=timezone.datetime(2016, 1, 2, 0, 0, 0),
executor=MockExecutor()
)

# 2nd run of task fails by itself
self.assertRaises(AirflowException, run_2nd_dagrun)
ti2.refresh_from_db()
self.assertEqual(ti2.state, State.SCHEDULED)

# 2nd run af task fails if 1st run of task failed
ti1.set_state(State.FAILED)
self.assertRaises(AirflowException, run_2nd_dagrun)
ti2.refresh_from_db()
self.assertEqual(ti2.state, State.SCHEDULED)

# but it works after 1st instance of task is marked as skipped
ti1.set_state(State.SKIPPED)
run_2nd_dagrun()
ti2.refresh_from_db()
self.assertEqual(ti2.state, State.SUCCESS)

# and it also works if 1st instance is success
ti2.set_state(State.NONE)
ti1.set_state(State.SUCCESS)
run_2nd_dagrun()
ti2.refresh_from_db()
self.assertEqual(ti2.state, State.SUCCESS)

def test_wait_for_downstream(self):
dag_id = 'test_wait_for_downstream'
dag = models.DagBag().get_dag(dag_id)

# ti.previous_ti requires a dagrun to exist
self.create_dag_run(dag, execution_date=timezone.datetime(2016, 1, 1, 0, 0, 0))
self.create_dag_run(dag, execution_date=timezone.datetime(2016, 1, 2, 0, 0, 0))
upstream, downstream = dag.tasks

def run_2nd_dagrun():
dag.run(
start_date=timezone.datetime(2016, 1, 2, 0, 0, 0),
end_date=timezone.datetime(2016, 1, 2, 0, 0, 0),
executor=MockExecutor()
)

# doesn't run if downstream task for previous day has a null state
uti_2 = TI(task=upstream, execution_date=timezone.datetime(2016, 1, 2, 0, 0, 0))
uti_2.previous_ti.set_state(State.SUCCESS)
self.assertEqual(uti_2.previous_ti.state, State.SUCCESS)
self.assertRaises(AirflowException, run_2nd_dagrun)
uti_2.refresh_from_db()
self.assertEqual(uti_2.state, State.SCHEDULED)

# doesn't run if downstream task for previous day has a failed state
dti_1 = TI(task=downstream, execution_date=timezone.datetime(2016, 1, 1, 0, 0, 0))
dti_1.set_state(State.FAILED)
self.assertRaises(AirflowException, run_2nd_dagrun)
uti_2.refresh_from_db()
self.assertEqual(uti_2.state, State.SCHEDULED)

# runs if downstream task for previous day has a skipped state
dti_1 = TI(task=downstream, execution_date=timezone.datetime(2016, 1, 1, 0, 0, 0))
dti_1.set_state(State.SKIPPED)
run_2nd_dagrun()
uti_2.refresh_from_db()
self.assertEqual(uti_2.state, State.SUCCESS)

# runs if downstream task for previous day has a success state
uti_2.set_state(State.NONE)
dti_1.set_state(State.SUCCESS)
run_2nd_dagrun()
uti_2.refresh_from_db()
self.assertEqual(uti_2.state, State.SUCCESS)
28 changes: 28 additions & 0 deletions tests/models/test_taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,34 @@ def run_ti_and_assert(run_date, expected_start_date, expected_end_date,
done, fail = True, False
run_ti_and_assert(date4, date3, date4, 60, State.SUCCESS, 3, 0)

def test_respects_prev_dagrun_dep(self):
dag = models.DAG(dag_id='test_dag')
task = DummyOperator(dag=dag, task_id='test_task', start_date=DEFAULT_DATE)
ti = TI(task, DEFAULT_DATE)
failing_status = [TIDepStatus('test fail status name', False, 'test fail reason')]
passing_status = [TIDepStatus('test pass status name', True, 'test passing reason')]
with patch('airflow.ti_deps.deps.prev_dagrun_dep.PrevDagrunDep.get_dep_statuses',
return_value=failing_status):
self.assertFalse(ti.are_dependencies_met())
with patch('airflow.ti_deps.deps.prev_dagrun_dep.PrevDagrunDep.get_dep_statuses',
return_value=passing_status):
self.assertTrue(ti.are_dependencies_met())

def test_are_dependents_done(self):
dag = models.DAG(dag_id='test_dag')
upstream_task = DummyOperator(dag=dag, task_id='upstream_task', start_date=DEFAULT_DATE)
downstream_task = DummyOperator(dag=dag, task_id='downstream_task', start_date=DEFAULT_DATE)
upstream_task.set_downstream(downstream_task)
uti = TI(upstream_task, DEFAULT_DATE)
dti = TI(downstream_task, DEFAULT_DATE)
self.assertFalse(uti.are_dependents_done())
dti.set_state(State.SKIPPED)
self.assertTrue(uti.are_dependents_done())
dti.set_state(State.FAILED)
self.assertFalse(uti.are_dependents_done())
dti.set_state(State.SUCCESS)
self.assertTrue(uti.are_dependents_done())

def test_reschedule_handling_clear_reschedules(self):
"""
Test that task reschedules clearing are handled properly
Expand Down