In [39]:
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG

# Operators; we need this to operate!
from airflow.operators.bash_operator import BashOperator

In [40]:
# https://github.com/apache/incubator-airflow/blob/master/airflow/models.py#L2168
# Defines params you can pass in
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2015, 6, 1),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}


In [41]:
# tutorial = dag_id
# timedelta(1) = 1 day

# dag to nest nodes into
dag = DAG(
    'tutorial', default_args=default_args, schedule_interval=timedelta(1))

In [42]:
# Tasks are generated when instantiating operator objects.
# task_id is unique identifier

t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)

t2 = BashOperator(
    task_id='sleep',
    bash_command='sleep 5',
    retries=3,
    dag=dag)

But how do you know which arguments will be passed in, here is PRECENDENCE RULES

(1 - most important) Explicitly passed arguments
(2) Values that exist in the default_args dictionary
(3 - default) The operator’s default value, if one exists

In [43]:
# noteeeee: A task must include or inherit the arguments task_id and owner, 
# otherwise Airflow will raise an exception.

In [44]:
# Jinja templating http://jinja.pocoo.org/docs/dev/
# {{ ds }} (today’s “date stamp”

templated_command = """
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 7) }}"
        echo "{{ params.my_param }}"
    {% endfor %}
"""

t3 = BashOperator(
    task_id='templated',
    bash_command=templated_command,
    params={'my_param': 'Parameter I passed in'},
    dag=dag)

In [45]:
# Setting up Dependencies

t2.set_upstream(t1)

# This means that t2 will depend on t1
# running successfully to run
# It is equivalent to
# t1.set_downstream(t2)

t3.set_upstream(t1)

# all of this is equivalent to
# dag.set_dependency('print_date', 'sleep')
# dag.set_dependency('print_date', 'templated')

#### t3 -> t1 <- t2 (intuitively the opposite, t1 must first run and then t2 and t3)

In [46]:
%%bash
# print the list of active DAGs
airflow list_dags

# prints the list of tasks the "tutorial" dag_id
airflow list_tasks tutorial

# prints the hierarchy of tasks in the tutorial DAG
airflow list_tasks tutorial --tree

[2018-07-23 18:45:00,070] {__init__.py:45} INFO - Using executor SequentialExecutor
[2018-07-23 18:45:00,112] {models.py:189} INFO - Filling up the DagBag from /Users/sgupta/airflow/dags


-------------------------------------------------------------------
DAGS
-------------------------------------------------------------------
example_bash_operator
example_branch_dop_operator_v3
example_branch_operator
example_http_operator
example_passing_params_via_test_command
example_python_operator
example_short_circuit_operator
example_skip_dag
example_subdag_operator
example_subdag_operator.section-1
example_subdag_operator.section-2
example_trigger_controller_dag
example_trigger_target_dag
example_xcom
latest_only
latest_only_with_trigger
test_utils
tutorial

[2018-07-23 18:45:00,634] {__init__.py:45} INFO - Using executor SequentialExecutor
[2018-07-23 18:45:00,669] {models.py:189} INFO - Filling up the DagBag from /Users/sgupta/airflow/dags
print_date
sleep
templated
[2018-07-23 18:45:01,192

In [48]:
%%bash
airflow test tutorial print_date 2015-06-01


[2018-07-23 18:47:21,610] {__init__.py:45} INFO - Using executor SequentialExecutor
[2018-07-23 18:47:21,664] {models.py:189} INFO - Filling up the DagBag from /Users/sgupta/airflow/dags


Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 27, in <module>
    args.func(args)
  File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 528, in test
    ti.run(ignore_task_deps=True, ignore_ti_state=True, test_mode=True)
  File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1577, in run
    session=session)
  File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1338, in _check_and_change_state_before_execution
    self.refresh_from_db(session=session, lock_for_update=True)
  File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python2.7/site-packages/airflow/models.py", li