# AirFlow-Tutorial

- 入门教程，https://my.oschina.net/u/2306127/blog/1843515
- 使用文档，https://airflow.incubator.apache.org/tutorial.html
- 项目源码，https://github.com/apache/incubator-airflow

In [11]:
help(DAG)

Help on class DAG in module airflow.models:

class DAG(airflow.dag.base_dag.BaseDag, airflow.utils.logging.LoggingMixin)
 |  A dag (directed acyclic graph) is a collection of tasks with directional
 |  dependencies. A dag also has a schedule, a start end an end date
 |  (optional). For each schedule, (say daily or hourly), the DAG needs to run
 |  each individual tasks as their dependencies are met. Certain tasks have
 |  the property of depending on their own past, meaning that they can't run
 |  until their previous schedule (and upstream tasks) are completed.
 |  
 |  DAGs essentially act as namespaces for tasks. A task_id can only be
 |  added once to a DAG.
 |  
 |  :param dag_id: The id of the DAG
 |  :type dag_id: string
 |  :param description: The description for the DAG to e.g. be shown on the webserver
 |  :type description: string
 |  :param schedule_interval: Defines how often that DAG runs, this
 |      timedelta object gets added to your latest task instance's
 |      exe

## 定义DAG模型。

In [1]:
"""
Code that goes along with the Airflow tutorial located at:
https://github.com/airbnb/airflow/blob/master/airflow/example_dags/tutorial.py
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2015, 6, 1),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

dag = DAG('tutorial', default_args=default_args)

# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)

t2 = BashOperator(
    task_id='sleep',
    bash_command='sleep 5',
    retries=3,
    dag=dag)

templated_command = """
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 7)}}"
        echo "{{ params.my_param }}"
    {% endfor %}
"""

t3 = BashOperator(
    task_id='templated',
    bash_command=templated_command,
    params={'my_param': 'Parameter I passed in'},
    dag=dag)

t2.set_upstream(t1)
t3.set_upstream(t1)

[2018-07-21 14:23:30,273] {__init__.py:57} INFO - Using executor SequentialExecutor


#### 查看定义信息。

In [2]:
?t2

[0;31mType:[0m        BashOperator
[0;31mString form:[0m <Task(BashOperator): sleep>
[0;31mFile:[0m        /srv/conda/lib/python3.6/site-packages/airflow/operators/bash_operator.py
[0;31mDocstring:[0m  
Execute a Bash script, command or set of commands.

:param bash_command: The command, set of commands or reference to a
    bash script (must be '.sh') to be executed.
:type bash_command: string
:param xcom_push: If xcom_push is True, the last line written to stdout
    will also be pushed to an XCom when the bash command completes.
:type xcom_push: bool
:param env: If env is not None, it must be a mapping that defines the
    environment variables for the new process; these are used instead
    of inheriting the current process environment, which is the default
    behavior. (templated)
:type env: dict
:type output_encoding: output encoding of bash command


In [3]:
?t3

[0;31mType:[0m        BashOperator
[0;31mString form:[0m <Task(BashOperator): templated>
[0;31mFile:[0m        /srv/conda/lib/python3.6/site-packages/airflow/operators/bash_operator.py
[0;31mDocstring:[0m  
Execute a Bash script, command or set of commands.

:param bash_command: The command, set of commands or reference to a
    bash script (must be '.sh') to be executed.
:type bash_command: string
:param xcom_push: If xcom_push is True, the last line written to stdout
    will also be pushed to an XCom when the bash command completes.
:type xcom_push: bool
:param env: If env is not None, it must be a mapping that defines the
    environment variables for the new process; these are used instead
    of inheriting the current process environment, which is the default
    behavior. (templated)
:type env: dict
:type output_encoding: output encoding of bash command


In [4]:
?t1

[0;31mType:[0m        BashOperator
[0;31mString form:[0m <Task(BashOperator): print_date>
[0;31mFile:[0m        /srv/conda/lib/python3.6/site-packages/airflow/operators/bash_operator.py
[0;31mDocstring:[0m  
Execute a Bash script, command or set of commands.

:param bash_command: The command, set of commands or reference to a
    bash script (must be '.sh') to be executed.
:type bash_command: string
:param xcom_push: If xcom_push is True, the last line written to stdout
    will also be pushed to an XCom when the bash command completes.
:type xcom_push: bool
:param env: If env is not None, it must be a mapping that defines the
    environment variables for the new process; these are used instead
    of inheriting the current process environment, which is the default
    behavior. (templated)
:type env: dict
:type output_encoding: output encoding of bash command


In [5]:
dag.active_tasks

[<Task(BashOperator): print_date>,
 <Task(BashOperator): sleep>,
 <Task(BashOperator): templated>]

In [6]:
dag.tree_view()

<Task(BashOperator): sleep>
    <Task(BashOperator): print_date>
<Task(BashOperator): templated>
    <Task(BashOperator): print_date>


In [7]:
!airflow test first_dag print_date 2015-06-01

[2018-07-21 14:23:50,730] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-07-21 14:23:51,304] {models.py:167} INFO - Filling up the DagBag from /home/jovyan/airflow/dags
Traceback (most recent call last):
  File "/srv/conda/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1193, in _execute_context
    context)
  File "/srv/conda/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 509, in do_execute
    cursor.execute(statement, parameters)
sqlite3.OperationalError: no such table: task_instance

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/srv/conda/bin/airflow", line 28, in <module>
    args.func(args)
  File "/srv/conda/lib/python3.6/site-packages/airflow/bin/cli.py", line 585, in test
    ti.run(ignore_task_deps=True, ignore_ti_state=True, test_mode=True)
  File "/srv/conda/lib/python3.6/site-packages/airflow/utils/db.py", line 53, in wrapper
    result = func(*args, **kwargs)

In [19]:
!airflow test first_dag sleep 2015-06-01

[2018-07-21 12:03:01,781] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-07-21 12:03:02,357] {models.py:167} INFO - Filling up the DagBag from /home/jovyan/airflow/dags
Traceback (most recent call last):
  File "/srv/conda/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1193, in _execute_context
    context)
  File "/srv/conda/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 509, in do_execute
    cursor.execute(statement, parameters)
sqlite3.OperationalError: no such table: task_instance

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/srv/conda/bin/airflow", line 28, in <module>
    args.func(args)
  File "/srv/conda/lib/python3.6/site-packages/airflow/bin/cli.py", line 585, in test
    ti.run(ignore_task_deps=True, ignore_ti_state=True, test_mode=True)
  File "/srv/conda/lib/python3.6/site-packages/airflow/utils/db.py", line 53, in wrapper
    result = func(*args, **kwargs)

In [15]:
dag.run()

OperationalError: (sqlite3.OperationalError) no such table: job [SQL: 'INSERT INTO job (dag_id, state, job_type, start_date, end_date, latest_heartbeat, executor_class, hostname, unixname) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)'] [parameters: ('tutorial', 'running', 'BackfillJob', '2018-07-21 11:57:02.407213', None, '2018-07-21 11:57:02.407221', 'SequentialExecutor', 'jupyter-openthings-2ddatabook-2d7grw66kv', 'jovyan')] (Background on this error at: http://sqlalche.me/e/e3q8)

## 运行DAG，查看信息。

In [3]:
!python ~/airflow/dags/airflowfirst.py

[2018-07-21 11:45:16,001] {__init__.py:57} INFO - Using executor SequentialExecutor


#### 查看DAG列表。

In [4]:
!airflow list_dags

[2018-07-21 11:45:41,296] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-07-21 11:45:41,847] {models.py:167} INFO - Filling up the DagBag from /home/jovyan/airflow/dags


-------------------------------------------------------------------
DAGS
-------------------------------------------------------------------
example_bash_operator
example_branch_dop_operator_v3
example_branch_operator
example_http_operator
example_passing_params_via_test_command
example_python_operator
example_short_circuit_operator
example_skip_dag
example_subdag_operator
example_subdag_operator.section-1
example_subdag_operator.section-2
example_trigger_controller_dag
example_trigger_target_dag
example_xcom
first_dag
latest_only
latest_only_with_trigger
test_utils
tutorial



#### 列出DAG中的任务。

In [1]:
!airflow list_tasks first_dag --tree

[2018-07-21 11:44:36,769] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-07-21 11:44:37,318] {models.py:167} INFO - Filling up the DagBag from /home/jovyan/airflow/dags
<Task(BashOperator): sleep>
    <Task(BashOperator): print_date>
<Task(BashOperator): templated>
    <Task(BashOperator): print_date>


In [5]:
!airflow list_tasks first_dag --tree

[2018-07-21 11:47:35,807] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-07-21 11:47:36,359] {models.py:167} INFO - Filling up the DagBag from /home/jovyan/airflow/dags
<Task(BashOperator): sleep>
    <Task(BashOperator): print_date>
<Task(BashOperator): templated>
    <Task(BashOperator): print_date>
