# Programming Apache Airflow with Python


<img style="float: right;" src="airflow-small.png" />

Austin Godber

@godber

http://desertpy.com

"Airflow is a platform to programmatically author, schedule and monitor workflows."

"Airflow is not a data streaming solution."

It's more about running a workflow than it is about the things the workflow acomplishes.

(NOTE: All unattributed quotes come from the [official Airflow docs](http://airflow.apache.org))

# Top Level Features (1/2)

* Author, execute and track workflows or Directed Acyclic Graphs (DAGs)
* Complete CLI and web based GUI
* Many different task types, or "Operators" (e.g. Bash, Python)
* Operator templating with Jinja
* Connections expose external systems to DAGs

# Top Level Features (2/2)

* Variables store arbitrary content accessible by DAGs
* Extensible Web Interface
* XComs allow tasks to (slowly) exchange (limited) info
* Branching allows conditional execution of tasks
* SubDAGs make repeating tasks easier

<img src="airflow.gif" />


# Setup

Quick and dirty sequential execution, SQLITE backed dev setup:

```bash
mkvirtualenv -p `which python3` airflow
export SLUGIFY_USES_TEXT_UNIDECODE=yes
pip3 install apache-airflow[mysql,redis,slack,crypto,password,ssh]
airflow initdb
```

Start the webserver and start the simple scheduler:

```bash
# set host to 127.0.0.1 and use SSH tunneling for access
sed -i 's/^web_server_host.*$/web_server_host = 127.0.0.1/' airflow.cfg
airflow webserver -p 8080
# in another terminal
workon airflow
airflow scheduler
```

## Setup Options

* There is a lengthy list of extra packages you can install
  * Different BD Backends - MySQL, PostgreSQL
  * Different Executors - Dask, Celery, etc

# Tutorial

The [example tutorial from the docs](http://airflow.apache.org/tutorial.html)

Loading modules:

```python
from airflow import DAG
from airflow.operators.bash_operator \
  import BashOperator
from datetime import datetime, timedelta
```

Specify `default_args` to be passed to DAG constructor:

```python
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2018, 10, 1),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}
dag = DAG('tutorial', default_args=default_args)
```

Define the tasks (Operators) to run ...

```python
t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)

t2 = BashOperator(
    task_id='sleep',
    bash_command='sleep 5',
    retries=3,
    dag=dag)
```

Define a Jinja template for a templated operator:

```python
templated_command = """
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 7)}}"
        echo "{{ params.my_param }}"
    {% endfor %}
"""
```

Use the template defined above:

```python
t3 = BashOperator(
    task_id='templated',
    bash_command=templated_command,
    params={'my_param': 'Parameter I passed in'},
    dag=dag)
```

Define the relationships between tasks:

```python
t2.set_upstream(t1)
t3.set_upstream(t1)
```

"One thing to wrap your head around (it may not be very intuitive for everyone at first) is that this Airflow Python script is really just a configuration file specifying the DAG’s structure as code. The actual tasks defined here will run in a different context from the context of this script. Different tasks run on different workers at different points in time, which means that this script cannot be used to cross communicate between tasks. Note that for this purpose we have a more advanced feature called XCom."

"People sometimes think of the DAG definition file as a place where they can do some actual data processing - that is not the case at all! The script’s purpose is to define a DAG object. It needs to evaluate quickly (seconds, not minutes) since the scheduler will execute it periodically to reflect the changes if any."

# Concepts

# DagRuns

Understanding `start_date` and `schedule_interval` is key to understanding how your DAGs are going to run.

"The first `DagRun` to be created will be based on the `min(start_date)` for all your task. From that point on, the scheduler creates new `DagRun`s based on your `schedule_interval` and the corresponding task instances run as your dependencies are met."

# DagRuns Example

So, if today is October 24th, 2018 and you create a DAG, you set your `start_date` to `2018-10-01` and use the default `schedule_interval` which is daily, the first `DagRun` will be with a date stamp (`ds`) of `2018-10-01` and will run immediately.  Then the next one will run with `ds` of `2018-10-02`.  These may run immediately depending on your dependencies.  A total of 23 `DagRun`s will run, up to the current date.

# Operators

* An operator describes the task to be run and all operators inherit from the `BaseOperator`.
* There are three types of operators:
  * **actions** - tell another system to perform an action
  * **transfer** - moves data from one system to another
  * **sensors** - will keep running until a condition is met (e.g. file appears in S3)

# Operators (cont.)

* `BashOperator`, `PythonOperator`, `PythonVirtualenvOperator`, `SimpleHttpOperator`
* `BranchPythonOperator`, `CheckOperator` (sql check)
* `EmailOperator`, `SlackOperator`, `DiscordWebhookOperator`
* `DockerOperator`, `*Hive*`, `(MySql|Postgres|Sqlite)Operator`
* `GenericTransfer` (sql), `S3TransferOperator`
* `ExternalTaskSensor`, `HdfsSensor`, `HttpSensor`, `S3KeySensor`

There's too many, go here: https://airflow.apache.org/code.html#id3

# Template Variables

<img src="airflow-variables-pt1.png" />

https://airflow.apache.org/code.html#default-variables

# Template Variables (cont.)

* Object attributes and methods can be accessed with dot notation `{{task.owner}}`
  * See individual Model documentation for details on attributes
* Variables set in Airflow can be accessed through `var`
  * `{{var.value.<VARIABLE_NAME>}}`
  * `{{var.json.<VARIABLE_NAME>}}`
    * `{{var.json.<VARIABLE_NAME>.<KEY>}}`

# Template Macros

Macros can be used in your templates to manipulate variables or strings

* `ds_add(ds, days)` - add or subtract days from date string
* `ds_format(ds, input_format, output_format)` - reformat date string
* `random()` - returns random number between 0 and 1 (excluding 1)

https://airflow.apache.org/code.html#macros

* IDEMPOTENT!!!
* Manage Connections and Secrets with Connections and Variables
* `airflow delete_dag my_dag_id`


# Resources

* https://github.com/jghoman/awesome-apache-airflow
* https://github.com/airflow-plugins/

* https://github.com/airflow-plugins/Getting-Started/blob/master/Tutorial/creating-ui-modification.md

```bash
$ airflow test plot-phx-gsod get_data 2018-10-15
[2018-10-17 03:18:22,683] {__init__.py:51} INFO - Using executor SequentialExecutor
[2018-10-17 03:18:22,799] {models.py:258} INFO - Filling up the DagBag from /srv/airflow/airflow/dags
```

```bash
$ airflow list_tasks plot-phx-gsod
[2018-10-17 03:19:36,323] {__init__.py:51} INFO - Using executor SequentialExecutor
[2018-10-17 03:19:36,428] {models.py:258} INFO - Filling up the DagBag from /srv/airflow/airflow/dags
extract_data
get_data
get_isd_history
plot_phx
```