Skip to content

Commit

Permalink
[AIRFLOW-5268] Apply same DAG naming conventions as in literature (#5874
Browse files Browse the repository at this point in the history
)
  • Loading branch information
BasPH authored and ashb committed Oct 7, 2019
1 parent 8aa7a01 commit 8f6ca53
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 33 deletions.
21 changes: 14 additions & 7 deletions airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import warnings
from collections import OrderedDict, defaultdict
from datetime import timedelta, datetime
from typing import Union, Optional, Iterable, Dict, Type, Callable, List
from typing import Union, Optional, Iterable, Dict, Type, Callable, List, TYPE_CHECKING

import jinja2
import pendulum
Expand Down Expand Up @@ -57,6 +57,9 @@
from airflow.utils.sqlalchemy import UtcDateTime, Interval
from airflow.utils.state import State

if TYPE_CHECKING:
from airflow.models.baseoperator import BaseOperator # Avoid circular dependency

install_aliases()

ScheduleInterval = Union[str, timedelta, relativedelta]
Expand Down Expand Up @@ -232,7 +235,7 @@ def __init__(
self._description = description
# set file location to caller source path
self.fileloc = sys._getframe().f_back.f_code.co_filename
self.task_dict = dict() # type: Dict[str, TaskInstance]
self.task_dict = dict() # type: Dict[str, BaseOperator]

# set timezone from start_date
if start_date and start_date.tzinfo:
Expand Down Expand Up @@ -777,7 +780,13 @@ def get_task_instances(

@property
def roots(self):
return [t for t in self.tasks if not t.downstream_list]
"""Return nodes with no parents. These are first to execute and are called roots or root nodes."""
return [task for task in self.tasks if not task.upstream_list]

@property
def leaves(self):
"""Return nodes with no children. These are last to execute and are called leaves or leaf nodes."""
return [task for task in self.tasks if not task.downstream_list]

def topological_sort(self):
"""
Expand Down Expand Up @@ -1103,13 +1112,11 @@ def pickle(self, session=None):
return dp

def tree_view(self):
"""
Shows an ascii tree representation of the DAG
"""
"""Print an ASCII tree representation of the DAG."""
def get_downstream(task, level=0):
print((" " * level * 4) + str(task))
level += 1
for t in task.upstream_list:
for t in task.downstream_list:
get_downstream(t, level)

for t in self.roots:
Expand Down
15 changes: 8 additions & 7 deletions airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,20 +299,21 @@ def update_state(self, session=None):
duration = (timezone.utcnow() - start_dttm).total_seconds() * 1000
Stats.timing("dagrun.dependency-check.{}".format(self.dag_id), duration)

root_ids = [t.task_id for t in dag.roots]
roots = [t for t in tis if t.task_id in root_ids]
leaf_tis = [ti for ti in tis if ti.task_id in {t.task_id for t in dag.leaves}]

# if all roots finished and at least one failed, the run failed
if (not unfinished_tasks and
any(r.state in (State.FAILED, State.UPSTREAM_FAILED) for r in roots)):
if not unfinished_tasks and any(
leaf_ti.state in {State.FAILED, State.UPSTREAM_FAILED} for leaf_ti in leaf_tis
):
self.log.info('Marking run %s failed', self)
self.set_state(State.FAILED)
dag.handle_callback(self, success=False, reason='task_failure',
session=session)

# if all roots succeeded and no unfinished tasks, the run succeeded
elif not unfinished_tasks and all(r.state in (State.SUCCESS, State.SKIPPED)
for r in roots):
# if all leafs succeeded and no unfinished tasks, the run succeeded
elif not unfinished_tasks and all(
leaf_ti.state in {State.SUCCESS, State.SKIPPED} for leaf_ti in leaf_tis
):
self.log.info('Marking run %s successful', self)
self.set_state(State.SUCCESS)
dag.handle_callback(self, success=True, reason='success', session=session)
Expand Down
18 changes: 9 additions & 9 deletions airflow/www/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -1532,14 +1532,14 @@ def tree(self, session=None):
# expand/collapse functionality. After 5,000 nodes we stop and fall
# back on a quick DFS search for performance. See PR #320.
node_count = [0]
node_limit = 5000 / max(1, len(dag.roots))
node_limit = 5000 / max(1, len(dag.leaves))

def recurse_nodes(task, visited):
visited.add(task)
node_count[0] += 1

children = [
recurse_nodes(t, visited) for t in task.upstream_list
recurse_nodes(t, visited) for t in task.downstream_list
if node_count[0] < node_limit or t not in visited]

# D3 tree uses children vs _children to define what is
Expand Down Expand Up @@ -1567,7 +1567,7 @@ def set_duration(tid):
}
for d in dates],
children_key: children,
'num_dep': len(task.upstream_list),
'num_dep': len(task.downstream_list),
'operator': task.task_type,
'retries': task.retries,
'owner': task.owner,
Expand Down Expand Up @@ -1630,18 +1630,18 @@ def graph(self, session=None):
}
})

def get_upstream(task):
for t in task.upstream_list:
def get_downstream(task):
for t in task.downstream_list:
edge = {
'u': t.task_id,
'v': task.task_id,
'u': task.task_id,
'v': t.task_id,
}
if edge not in edges:
edges.append(edge)
get_upstream(t)
get_downstream(t)

for t in dag.roots:
get_upstream(t)
get_downstream(t)

dt_nr_dr_data = get_date_time_num_runs_dag_runs_form_data(request, session, dag)
dt_nr_dr_data['arrange'] = arrange
Expand Down
2 changes: 1 addition & 1 deletion airflow/www_rbac/templates/airflow/graph.html
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@

// Set edges
edges.forEach(function(edge) {
g.setEdge(edge.u, edge.v)
g.setEdge(edge.source_id, edge.target_id)
});

var render = dagreD3.render(),
Expand Down
18 changes: 9 additions & 9 deletions airflow/www_rbac/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -1257,14 +1257,14 @@ def tree(self, session=None):
# expand/collapse functionality. After 5,000 nodes we stop and fall
# back on a quick DFS search for performance. See PR #320.
node_count = [0]
node_limit = 5000 / max(1, len(dag.roots))
node_limit = 5000 / max(1, len(dag.leaves))

def recurse_nodes(task, visited):
visited.add(task)
node_count[0] += 1

children = [
recurse_nodes(t, visited) for t in task.upstream_list
recurse_nodes(t, visited) for t in task.downstream_list
if node_count[0] < node_limit or t not in visited]

# D3 tree uses children vs _children to define what is
Expand Down Expand Up @@ -1292,7 +1292,7 @@ def set_duration(tid):
}
for d in dates],
children_key: children,
'num_dep': len(task.upstream_list),
'num_dep': len(task.downstream_list),
'operator': task.task_type,
'retries': task.retries,
'owner': task.owner,
Expand Down Expand Up @@ -1364,18 +1364,18 @@ def graph(self, session=None):
}
})

def get_upstream(task):
for t in task.upstream_list:
def get_downstream(task):
for t in task.downstream_list:
edge = {
'u': t.task_id,
'v': task.task_id,
'source_id': task.task_id,
'target_id': t.task_id,
}
if edge not in edges:
edges.append(edge)
get_upstream(t)
get_downstream(t)

for t in dag.roots:
get_upstream(t)
get_downstream(t)

dt_nr_dr_data = get_date_time_num_runs_dag_runs_form_data(request, session, dag)
dt_nr_dr_data['arrange'] = arrange
Expand Down
43 changes: 43 additions & 0 deletions tests/models/test_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
# under the License.

import datetime
import io
import logging
import os
import re
import unittest
from tempfile import NamedTemporaryFile
from tests.compat import mock

import pendulum
import six
Expand Down Expand Up @@ -802,3 +804,44 @@ def test_dag_naive_default_args_start_date_with_timezone(self):

dag = DAG('DAG', default_args=default_args)
self.assertEqual(dag.timezone.name, local_tz.name)

def test_roots(self):
"""Verify if dag.roots returns the root tasks of a DAG."""
with DAG("test_dag", start_date=DEFAULT_DATE) as dag:
t1 = DummyOperator(task_id="t1")
t2 = DummyOperator(task_id="t2")
t3 = DummyOperator(task_id="t3")
t4 = DummyOperator(task_id="t4")
t5 = DummyOperator(task_id="t5")
[t1, t2] >> t3 >> [t4, t5]

self.assertCountEqual(dag.roots, [t1, t2])

def test_leaves(self):
"""Verify if dag.leaves returns the leaf tasks of a DAG."""
with DAG("test_dag", start_date=DEFAULT_DATE) as dag:
t1 = DummyOperator(task_id="t1")
t2 = DummyOperator(task_id="t2")
t3 = DummyOperator(task_id="t3")
t4 = DummyOperator(task_id="t4")
t5 = DummyOperator(task_id="t5")
[t1, t2] >> t3 >> [t4, t5]

self.assertCountEqual(dag.leaves, [t4, t5])

def test_tree_view(self):
"""Verify correctness of dag.tree_view()."""
with DAG("test_dag", start_date=DEFAULT_DATE) as dag:
t1 = DummyOperator(task_id="t1")
t2 = DummyOperator(task_id="t2")
t3 = DummyOperator(task_id="t3")
t1 >> t2 >> t3

with mock.patch('sys.stdout', new_callable=io.StringIO) as mock_stdout:
dag.tree_view()
stdout = mock_stdout.getvalue()

stdout_lines = stdout.split("\n")
self.assertIn('t1', stdout_lines[0])
self.assertIn('t2', stdout_lines[1])
self.assertIn('t3', stdout_lines[2])

0 comments on commit 8f6ca53

Please sign in to comment.