# Get In Control Of Your Workflows With Airflow


Christian Trebing, Europython 2016

@ctrebing

## Imagine: 
- you are a data driven company
- each night you get data from your customers and this data wants to be processed
- processing happens is done in separate steps (for example booking, machine learning, decision taking)
- if errors happen, you want to get an overview on what happened when
- as you might have already guessed: you have a tight time schedule each night

What options do you have?

## Doing it with cron
- works for the start
- only time triggers possible, no dependency
- error handling is hard

![gantt chart](images/cron_chart.png)

![gantt chart](images/cron_chart_ideal.png)

![gantt chart](images/cron_chart_exception.png)

## Writing a workflow processing tool
- we did that for the start. and not just one. 

- start is easy, everything is great.
- soon you reach the limits. Then you either have to invest much more than you thought initially or live with the limits
 - some ideas: concurrency, traceability, manual triggers, external interfaces, ui


# Using an open source workflow processing tool
- We evaluated multiple ones and decided for airflow

# Why did we decide for airflow?
- written in python. we know that and we like it.
- also workflows are defined in python code
- view of present and past runs, logging features
- extensible through plugins
- active development (apache incubator project)
- nice ui, possibility to define a REST interface
- relatively lightweight: two processes on a server + some database

In [None]:
from airflow import DAG
from airflow.operators import BookData, Predict, Decide

dag_id = "daily_processing"
schedule_interval = '0 22 * * *'

default_args = {
    'retries': 2,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG(
    dag_id,
    start_date=datetime.date(2016, 12, 7),
    schedule_interval=schedule_interval,
    default_args=default_args)

book = BookData(dag=dag)

predict = Predict(dag=dag)
predict.set_upstream(book)

decide = Decide(dag=dag)
decide.set_upstream(predict)

![straight flow](images/straight_flow.jpg)

In [None]:
# Fan In / Fan Out

book = BookData(dag=dag)

predict_ger = Predict(dag=dag, country='GER')
predict_ger.set_upstream(book)

predict_uk = Predict(dag=dag, country='UK')
predict_uk.set_upstream(book)

decide = Decide(dag=dag)
decide.set_upstream(predict_ger)
decide.set_upstream(predict_uk)

![diamond](images/diamond.jpg)

# On the UI
- Dag overview (start screen)
- run view
- tree view
- runtimes
- gantt chart
- log view

# DAG Overview (start screen)

![DAG Overview](images/dag_overview.jpg)

# DAG Run View
![DAG Run](images/dag_run.jpg)


# Tree View
![DAG Run](images/dag_tree_view.jpg)

# Runtimes
![DAG Run](images/dag_task_duration.jpg)


# Gantt Chart
![DAG Run](images/dag_gantt.jpg)

# Log View
![DAG Log](images/dag_log.jpg)

# Operators

Many basic operators are included in airflow:
- BashOperator
- SimpleHttpOperator
- PostgresOperator / SqliteOperator
- PythonOperator
- EmailOperator
- ...

Also there are sensors to wait for things:
- HttpSensor
- HdfsSensor
- SqlSensor
- ...


# But I need a different operator...

![async http request](images/async_http.png)

1. I could use a SimpleHttpOperator and afterwards an HttpSensor
 - would work functional wise
 - but wouldn't it be nice to see the execution time directly as the operator run time?
 
2. Time for a new operator!

In [None]:
# operator implementation

import time, logging
from airflow import models, hooks

class Decide(models.BaseOperator):
    @airflow_utils.apply_defaults
    def __init__(self, **kwargs):
        super(Decide, self).__init__(
            task_id='decide',
            **kwargs)
        self.http_conn_id = 'DECISION_SERVER'
        self.endpoint_job_start = 'decide/'
        self.endpoint_job_status = 'job_status/'

    def execute(self, context):
        http = hooks.HttpHook(method='POST', http_conn_id=self.http_conn_id)
        response = http.run(endpoint=self.endpoint_job_start)
        job_id = response.json()['job_id']
        logging.info('started decision job with job id {}'.format(job_id))
        self.wait_for_job(job_id)

    def wait_for_job(self, job_id):
        job_status = None
        http = hooks.HttpHook(method='GET', http_conn_id=self.http_conn_id)
        while not job_status == 'FINISHED':
            time.sleep(1)
            response = http.run(endpoint=self.endpoint_job_status + str(job_id))
            job_status = response.json()['status']
            logging.info('status of decision job {} is {}'.format(job_id, job_status))

# Plugin Concept
- own operators
- own blueprints
- in the airflow configuration, give path to plugin

# Plugin Implementation

In [None]:
from airflow.plugins_manager import AirflowPlugin

from plugins import blueprints
from plugins import operators


# Defining the plugin class
class EuropythonPlugin(AirflowPlugin):
    name = "europython_plugin"
    operators = [
        operators.BookData,
        operators.Predict,
        operators.Decide
    ]
    flask_blueprints = [blueprints.TriggerBlueprint]

# Defining own Blueprints

Extends the web server

For example: currently, no REST API exists to ask trigger dags or ask for the state of a dag run

you can add your own blueprints that run within the webserver and can access all airflow functionality
- add as a flask blueprint
- we defined endpoints for the above (trigger dags/ask for state of a dag run)
- need to be careful of maintaining them through an airflow version upgrade

for implementation, see the example git repo

In [None]:
curl -X POST localhost:8080/trigger/daily_processing    
{ "dag_id": "daily_processing", "run_id": "external_trigger_2016-07-19T15:12:28.398352" }

In [None]:
curl localhost:8080/trigger/daily_processing/external_trigger_2016-07-19T15:12:28.398352
{ "dag_id": "daily_processing",
  "execution_date": "2016-07-19T15:12:28",
  "run_id": "external_trigger_2016-07-19T15:12:28.398352",
  "state": "running"}

# Deployment / What happens inside airflow

Two processes and a database
- Scheduler
- Webserver
- different databases supported: postgres, sqlite(with restrictions), ...

Executor: different possibilities exist
- SequentialExecutor (within scheduler)
- LocalExecutor (...)
- Celery Framework

# How we use it
- automatic and manual triggers
- one airflow instance per system we manage
- database: sometimes postgres, sometimes sqlite
- lightweight executors, only triggers http requests
- contributing to airflow with pull requests
 - external_triggers functionality (PR 503/540)
 - plugin detection mechanism (PR 730)

# Challenges / Pitfalls
- scheduling
- start time and backfill

# Scheduling

![execution_date](images/execution_date.png)
- start date: when did it start really
- execution date: 
 - more like a description for that run
 - always one iteration back in time
 - comes from ETL scenarios where data was available only on the next day

# Start Time and Backfill

- for every dag, you have to define a start time
- if the dag has a schedule, the scheduler will trigger a backfill to that date

![backfill](images/backfill.png)

When you know the start time at design time of your dag, this is fine.

If not, you have to take care what date to enter:
- it should not be too much in the past, otherwise backfill will be triggered
- ideally it should be one iteration before your first intended run

# If you want to dig deeper:


[https://github.com/apache/incubator-airflow](https://github.com/apache/incubator-airflow)

airflow documentation [http://pythonhosted.org/airflow/](http://pythonhosted.org/airflow/)

common pitfalls (from airflow wiki) [https://cwiki.apache.org/confluence/display/AIRFLOW/Common+Pitfalls](https://cwiki.apache.org/confluence/display/AIRFLOW/Common+Pitfalls)

plugin example from this talk: [https://github.com/blue-yonder/airflow-plugin-demo](https://github.com/blue-yonder/airflow-plugin-demo)