# Airflow Classification Pipeline

Let's have a look at some of the components that Airflow provides. Please note: Airflow DAGs are to be specified in `.py` files, not Jupyter notebooks. This notebook only serves the presentation purpose and can not be directly used in Airflow.

# Directed Acyclic Graph (DAG)

Each Airflow pipeline has to be an directed acyclic graph (i.e. no loops, clear task dependencies).
We define a DAG in the following way:

```python
from airflow import DAG

# the default args will be passed to all tasks belonging to the DAG
default_args = {
    'owner': 'airflow',
    'start_date': datetime(2019, 8, 1),
    ...
}

dag = DAG(
    'keras2production-pipeline',
    default_args=default_args,
    schedule_interval=timedelta(days=1),
    ...
)

```

DAGs need to be placed into the folder `$AIRFLOW_HOME/dags`, where `$AIRFLOW_HOME` is usually set to `/usr/local/airflow`.

# Operators

Each pipeline consists of steps, which are called operators. Operators execute some kind of task and finish when the task is done. There are also sensors, a special type of operators, which finish when a condition is met (i.e. another task has finished or a file exists). Don't worry so much about the difference between a normal operator and a sensor, just lookup what fits your needs the best in the well-documented API.

This might be the most used operator: The `PythonOperator`.

```python
from airflow.operators.python_operator import PythonOperator

def print_sth():
    print("Our slides are still there!")
    
python_task = PythonOperator(
    task_id="print",
    python_callable=print_sth,
    provide_context=False,
    dag=dag
)

```

This is an example of a sensor:

```python
from airflow.contrib.sensors.file_sensor import FileSensor

fs_task = FileSensor(
    task_id="lookup_file",
    filepath="/keras2production/slides/slides.pdf",
    dag=dag
)
```

# Connecting Operators

Airflow uses the bitshift operators `>>` and `<<` to connect pipeline steps together (you can also use `set_upstream()` or `set_downstream()`, but just don't...).

`task1 >> task2` means: `task1` will be executed before `task2`, and vice versa for `<<`.

**Tip**: Only use the `>>` to keep your code consistent and readable.

Let's connect our operator and sensor that we defined above:

```python
fs_task >> python_task
```

That's it!

# XCom

Ideally all of our operators and sensors are independent of each other, that is: No variables, states etc. should have to be communicated between them. This allows an easy exchange of tasks. Since we cannot always avoid communication between tasks, Airflow provides the XCom (_cross-communication_) system, which follows a simple push-pull idea.

You can push a variable in an operator via `xcom_push()` or the `return` statement, and pull (i.e. get) the variable in an other task by `xcom_pull()`.

This is an example, how two python operators could communicate with each other via XCom:

```python
def foo():
    x = "hello world"
    return x


def bar(**context):
    x = context["task_instance"].xcom_pull(task_ids="task1")
    print(x)
    
    
task1 = PythonOperator(
    task_id="task1",
    python_callable=foo,
)

task2 = PythonOperator(
    task_id="task2",
    python_callable=bar,
    provide_context=True,
)

task1 >> task2
```

**Note**: The pulling task needs to be provided with context.

# The web interface

To monitor and manage your pipelines, Airflow offers a web interface. You can find it today at `localhost:8080`.

# There is so much more!

Airflow has many concepts to offer, which allow a robust but flexible execution. Some highlights are:
- Error and retry management
- Different executors (Celery, Sequential, Local), allowing for parallel execution on multiple nodes
- Integration with GCP, AWS, Azure, ...

Check out the [documentation page](https://airflow.apache.org/) for more information.