# Moving and Transforming Data with

# Airflow

<br/>

![Airflow](https://raw.githubusercontent.com/apache/incubator-airflow/master/airflow/www/static/pin_100.png)

<br/>

*[Daniel Moreno](https://github.com/demorenoc)*

*Data Science & Analytics - [Mercadoni](https://www.mercadoni.com.co)*

*PyCon Colombia (2017-02-11)*

# Hi
<br/>
I am Daniel.

I am a statistician.


<i class="fa fa-github" aria-hidden="true"></i> [@demorenoc](htpps://github.com/demorenoc)
<br/>
<i class="fa fa-linkedin" aria-hidden="true"></i> [in/demorenoc](https://www.linkedin.com/in/demorenoc)

### Currently:
<img src="https://s29.postimg.org/pm48ubxfr/mercadonilogobig_3x.png" alt="Mercadoni" style="width: 300px;"/>

### Previously:
<img src="https://tappsi.co/wp-content/press-kit/logo/logo-principal-02.png" alt="Tappsi" style="width: 300px;"/>

# Outline

- Motivation

- Whishlist

- Airflow

- Airflow - Components
  - API and Concepts
  - Scheduler
  - Web UI
  - CLI

- Airflow - Deployment (brief)

- Resources and References

# Motivation

- Modern companies operate on complex and diverse processes powered by technology and producing diverse and vast amounts of data

![](http://www.intorobotics.com/wp-content/uploads/2014/10/HRInternetofThingsStandardsiStockphoto27050372-1393867723706.jpg)


# Motivation

- *Continously available* and *usable* data is key for modern companies success.
- "Clean" data is vital for: 
  - Better decision making
  - Continuous improvement (Measure-Analyze-Improve)
  - Reaserch and Development
  
![](http://databasesquad.net/wp-content/uploads/2014/08/data-warehouse-services.jpg)

# Motivation

- **Moving and transforming data** can get costly, specially when needed continously:
  - Overlooked process in early stages
  - Large diversity of tools producing complex and specialized "stacks"
  - Cleaning takes around 80% of the time in data analysis
  - Task repetition and redundancy

![](http://vignette3.wikia.nocookie.net/lego/images/6/6c/Janitor.jpg)

# Motivation

## How does your company manage their data *workflows*?

![](img/mercadoni-etl.png)

# Wishlist

![](https://decisions.com/wp-content/uploads/2015/11/99-c.png)

# Wishlist

- Compact, homogeneous **code**

- Simplicity (defining workflows and **colaborating**)

- Flexibility (modularity + extensibility)

- Traceability/accountability and ownership

- Operational metadata

- Scalability

- Automation (**programmatic**/dynamic)

- Useful UI

- Flat learning curve

# Airflow

<br/>

![Airflow](https://raw.githubusercontent.com/apache/incubator-airflow/master/airflow/www/static/pin_100.png)

![Incubator](http://incubator.apache.org/images/egg-logo2.png)

Originally by [Maxime Beauchemin @mistercrunch](https://github.com/mistercrunch) from Airbnb.
<br/>

https://github.com/apache/incubator-airflow

https://airflow.incubator.apache.org/

<br/>

```sh
pip install airflow
pip install "airflow[mysql]"
pip install "airflow[postgres]"
pip install "airflow[crypto, password]"
```

# Alternatives

- Luigi by Spotify <img src="https://raw.githubusercontent.com/spotify/luigi/master/doc/luigi.png" alt="Luigi" style="width: 150px;"/>

- Pinball by Pinterest

- See a some comparisons:
     - [Luigi vs Airflow vs Pinball](http://bytepawn.com/luigi-airflow-pinball.html#luigi-airflow-pinball)
     - [Apache Airflow - Presentation by Sumit Maheshwari (Quoble)](http://www.slideshare.net/sumitmaheshwari007/apache-airflow)

## For Hadoop

- Azkaban by LinkedIn

- Apache Oozie (originally by Yahoo!) ![](http://oozie.apache.org/images/oozie_200x.png)

# Airflow - Components

![Airflow-Architecture](https://image.slidesharecdn.com/june101115amairbnbbeaucheminv2-150701231554-lva1-app6892/95/airflow-an-open-source-platform-to-author-and-monitor-data-pipelines-23-638.jpg)

Image from [Maxime Beauchemin's original presentation in the 2015 Hadoop Summit](http://www.slideshare.net/Hadoop_Summit/airflow-an-open-source-platform-to-author-and-monitor-data-pipelines) 

# Airflow - Components

- API (http://airflow.incubator.apache.org/code.html#api-reference)
```python
from airflow import DAG
from airflow.operators import SimpleHttpOperator, MySqlOperator
```

- Executor ("backend" system)
    - Sequential
    - Local
    - Celery (and Mesos)

- Scheduler
        airflow scheduler -n 10

- Web UI
        airflow webserver -p 80

- CLI
        airflow --help

# Airflow - API and Concepts

![](img/dag-example-0.png)

# Airflow - API and Concepts

### Workflow Building Blocks - DAG

- Building a workflow -- a set of tasks with a dependency structure -- is the main goal.
- Workflows are modeled as DAGs: Directed Acyclic Graphs.

```python
from airflow import DAG
from datetime import datetime, timedelta

default_args = {'owner': 'daniel', 'email': ['daniel.moreno@mercadoni.com'],
                'start_date': datetime(2017, 2, 10), 'depends_on_past': False,
                'retries': 1, 'retry_delay': timedelta(seconds = 30)}

dag = DAG(dag_id            = 'model',
          schedule_interval = timedelta(minutes = 10), # '0/10 * * * *'
          default_args      = default_args,
          dagrun_timeout    = timedelta(minutes = 3)
          template_searchpath = ['path/to/queries'])
```

# Airflow - API and Concepts

### Workflow Building Blocks - Operators and Tasks

- Operators ⟶ Tasks ⟶ Task instances
- Operators are Task "factories"
- When a Taks is executed a Task instance is produced

```python
from airflow.operators import BashOperator, MySqlOperator, PythonOperator

bash_task = BashOperator(task_id = 'bash_task',
                         bash_command = 'echo "Hello PyCon"',
                         dag = dag)

mysql_task = MySqlOperator(task_id = 'mysql_task',
                           mysql_conn_id = 'mysql_db_conn',
                           sql = 'query.sql',
                           dag = dag)
                           
py_task = PythonOperator(task_id = 'py_task',
                         python_callable = py_function,
                         op_kwargs = {'arg' : 'value'},
                         dag = dag)
                         
mysql_task.set_upstream(bash_task)

py_task.set_upstream(mysql_task)
```

# Airflow - API and Concepts

### Workflow Building Blocks - Operators

- Local:
    - BashOperator
    - PythonOperator
    - DockerOperator
    - S3FileTransformOperator

- Remote:
    - SimpleHttpOperator
    - [Hive|Pig|BigQuery]Operator
    - [MySql|Postgres|Sqlite|Jdbc|MsSql|Oracle]Operator
    - [PrestoCheck|PrestoValueCheck|PrestoIntervalCheck]Operator

- Messaging:
    - EmailOperator
    - [SlackAPI|SlackAPIPost]Operator

# Airflow - API and Concepts

### Workflow Building Blocks - Operators

- Transfer:
    - GenericTransfer
    - [MySqlToHive|S3ToHive|HiveToMySql]Transfer
    - [PrestoToMySql|HiveToDruid]Transfer

- Flow:
    - [BranchPython|ShortCircuit]Operator
    - [TriggerDagRun|SubDag]Operator
    - DummyOperator

- Sensors:
    - [ExternalTask|Hdfs|HivePartition|Http|MetastorePartition]Sensor
    - [S3Key|S3Prefix|Sql|TimeDelta|Time|WebHdfs]Sensor

- Other (contributed):
    - Ssh, GCS, Spark SQL, Sqoop, Vertica, AWS EMR, Hipchat


# Airflow - API and Concepts

### A real-life DAG

![](img/dag-example.png)

# Airflow - API and Concepts

### Key Elements

- Hooks: the building blocks of operators

   ```python
   from airflow.hooks import MySqlHook
   
   mysql_db = MySqlHook(mysql_conn_id = "mysql_dwh")
   result = mysql_db.get_first(sql = "SELECT 'Airflow is cool' AS txt;")
   ```

# Airflow - API and Concepts

### Key Elements

- Connections

![connections-ui](img/connection.gif)

# Airflow - API and Concepts

### Further Elements

- Variables and XComs ⟶ Share "state"

- Templates (Jinja) and Macros ⟶ Patterns and/or Parametrization

- Pools and queues ⟶ Resources

- SubDAGs, Branches and trigger rules ⟶ Flow

- SLAs and policies ⟶ Quality and accountability

- Documentation and notes ⟶ Collaboration and maintainability

# Airflow - Scheduler


<img src="https://www.tangentehr.com/final/wp-content/uploads/2015/03/Enterprise-Schedular-2-512x512.png" alt="scheduler" style="width: 100px;"/>

        airflow scheduler -n $NUM
        
- Monitors and triggers the DAGs creating `DAG Runs` and starting the executor

- First `DAG Run` from DAG's `start_date`

- Subsequent runs based on `schedule_interval` (cron expression or `datetime.timedelta`)     


# Airflow - Web UI

![](http://airflow.incubator.apache.org/_images/airflow.gif)

# Airflow - CLI

<img src="http://cdn.osxdaily.com/wp-content/uploads/2014/07/iterm2-icon.jpg" alt="cli" style="width: 80px;"/>

Rich set of commands for dev-ops: `airflow [-h] <command>`

**Core services**
        
        webserver -p 80
        scheduler -n $NUM
**Meta-DB operations**
       
       initdb resetdb upgradedb
**Operate on DAGs**
       
       pause unpause run trigger_dag backfill dag_state task_state clear

**Development and test**

       list_dags list_tasks variables render test

# Airflow - Deployment

### Setup

- Setup a meta-DB (SQLALchemy powered) for the backend (executor)

- Setup the env var for Airflow and the config file:
    - `$AIRFLOW_HOME`
    - `$AIRFLOW_HOME/airflow.cfg`

 Key vars: `airflow_home`, `dags_folder`, `executor`, `sql_alchemy_conn`, `parallelism`, `dag_concurrency`

- Initialiaze the DB (once)

        airflow initdb

# Airflow - Deployment

### Initialize

        airflow webserver -p 80
        airflow scheduler -n $NUM

### Run as a service

- Setup Airflow (Web UI and Scheduler) as services with systemd or upstart (launchd in OSX):

   - [`scripts/systemd`](https://github.com/apache/incubator-airflow/tree/master/scripts/systemd)
   - [`scripts/upstart`](https://github.com/apache/incubator-airflow/tree/master/scripts/upstart)    

            initctl start airflow-webserver
            initctl start airflow-scheduler

# Resources and References

- GitHub: https://github.com/apache/incubator-airflow

- Docs: http://airflow.incubator.apache.org/

- Wiki: https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Home
    - Further links: https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Links
    - Common pitfalls: https://cwiki.apache.org/confluence/display/AIRFLOW/Common+Pitfalls

- [Maxime Beauchemin's original presentation](http://www.slideshare.net/Hadoop_Summit/airflow-an-open-source-platform-to-author-and-monitor-data-pipelines)

- ETL With Airflow (deep example) https://github.com/gtoonstra/etl-with-airflow


# Colophon

#### [**demorenoc.github.io/slides/pycon-co-2017/airflow**](https://demorenoc.github.io/slides/pycon-co-2017/airflow)

This presentation was written in a [Jupyter Notebook](https://jupyter.org) (available [here](https://github.com/demorenoc/slides/blob/gh-pages/pycon-co-2017/airflow/moving-tranforming-data-airflow.ipynb)) and compiled into a [reveal.js](http://lab.hakim.se/reveal-js) presentation by [`nbconvert`](https://nbconvert.readthedocs.io/en/latest/) in the default slides template and powered by [CDNJS](https://cdnjs.com/). Published online in [GitHub](https://github.com/demorenoc/slides) thanks to [GitHub Pages](https://pages.github.com/).

This presentation was prepared by [Daniel Moreno](https://github.com/demorenoc) for a talk given in [PyCon.co](http://www.pycon.co/) and it's original content (not much really) is under the [CC-BY 4.0](http://creativecommons.org/licenses/by/4.0/) license.

[![CC BY](http://i.creativecommons.org/l/by/4.0/88x31.png)](http://creativecommons.org/licenses/by/4.0/ "WEB")