# Scheduling Machine Learning Pipelines using Apache Airflow

In this workshop, you will use Airflow to schedule a basic machine learning pipeline. The workshop consists of 3 assignments.

1. Schedule a basic 'hello world' example on Airflow
2. Schedule a machine learning pipeline on Airflow
3. Improve the the pipeline by creating your own custom Airflow operator

Along the way, you will come across these blockquotes (as shown below). In there, we provide supplemental information related to the assignments you are doing. This information is not necessary to understand the assignment, but offers additional reading material for Aiflow enthousiasts.

> This is a blockquote with supplemental information!

You are provided with a random username for this workshop. This username is used to interact with the resources on AWS. Your user can be retrieved from the environment variable `WORKSHOP_USER`. Execute the cell below to find out your user

In [6]:
!echo $WORKSHOP_USER

KeyError: 'WORKSHOP_USER'

## Assignment 1: Hello World

In this assignment, we are going to schedule a simple workflow on Airflow to get used with the concepts. Airflow uses the following concepts for worfklows:

- DAG ([directed acyclic graph](https://en.wikipedia.org/wiki/Directed_acyclic_graph)): a description of the order in which work should take place
- Operator: a class that acts as a template for carrying out some work
- Task: a parameterized instance of an operator
- Task Instance: a task that 
     1. has been assigned to a DAG and 
     2. has a state associated with a specific run of the DAG

The code below defines a DAG (directed acyclic graph) in Airflow. The DAG has the following properties:

- The DAG starts on 2019-11-27 and runs every day at midnight
- This DAG covers 2 tasks. 1 of them is an instance of [`PythonOperator`](https://github.com/apache/airflow/blob/1.10.4/airflow/operators/python_operator.py)s, and 1 is an instance of the [`BashOperator`](https://github.com/apache/airflow/blob/1.10.4/airflow/operators/bash_operator.py). Both tasks will simply print a word to the logs.
- Task 'print_hello' runs first. Then, task 'print_world' will run 

Inspect the code and try to understand how the properties above are defined:

In [1]:
import datetime
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator

with DAG(
    dag_id='hello_world',
    schedule_interval='@daily',
    start_date=datetime.datetime(2019, 11, 27) 
) as dag:
    print_hello_operator = PythonOperator(
        task_id='print_hello', 
        python_callable=print, 
        op_args=['hello'] 
    )
    
    print_world_operator = BashOperator(
        task_id='print_world',
        bash_command='echo world'
    )
    
    print_hello_operator >> print_world_operator

> In this DAG, we use de `'@daily'` schedule interval, which means that the DAG will run every day at midnight. For more complex scheduling, a [cron expression](https://en.wikipedia.org/wiki/Cron#CRON_expression) can be used. 

> The bitshift operator `>>` is overloaded for operators in Airflow. It is used to define dependencies between tasks in a DAG. For more information about defining relationships between tasks, check out the [bitshift composition](https://airflow.apache.org/concepts.html#bitshift-composition) and [relationship helper](https://airflow.apache.org/concepts.html#relationship-helper) in the Airflow documentation.

This is everything we need to do to define a DAG that can be scheduled on Airflow. Because DAGs are defined using Python, we have a lot of freedom in how we want to design our DAG. We could, for example, dynamically create tasks by looping over lists. Furthermore, defining your DAGs as code makes it it easy to keep track on their version in a source code management system. 

The next step is to actually run this example on Airflow. The Airflow scheduler periodically scans a folder, called 'the DagBag', for files that define DAGs. There is a folder in this jupyter notebook server, called `dags`, which is also present at the airflow scheduler via a network file system. Any python file that we put there, will be picked up by the scheduler.

- Copy the code snippet above. Go to the dags folder in the file explorer, press the 'New' button in the upper right corner, and create a **Text File**. Name it `hello_world.py`. The name of the file is flexible, but should end with `.py`. Paste the copied code inside.
- Go to port 8080 of your personal load balancer (you are now on 8888). Your DAG should appear within a few seconds.
- The DAG is turned off at first. Turn it on and frequently refresh the page to see what happens. In the 'Recent Tasks' and 'DAG Runs' columns, you will see the circles changing. Hover over them to see what they mean. After a while, only the leftmost, dark green circles are filled. They show that all tasks and DAGs have completed successfully.
- Click on 'hello_world' in the 'DAG' column. You are brought to the tree view, showing all DAG runs and task instances and their status. They are all dark green, meaning they all completed successfully. 
- Press the lower left square, which is the 'print_hello' task instance of the DAG run of 27-11-2019. You will see a menu with various actions that can be performed on the task instance. Press 'View Log'. This brings you to the logs produced by the task instance. We can also see that is has printed 'hello', as we expected from this task instance.
- Click on 'Graph View'. Here we can see the graphical representation of our DAG. We can see that 'print_hello' is followed by 'print_world', as we defined this in the DAG definition above.

Congratulations, you have scheduled your first DAG on Airflow, and learned the basic features of the Airflow UI. This DAG does not do anything useful yet though. In the next assignment, we will create a DAG that will process actual data.

In [1]:
# TODO add pictures above to guide the user through te UI

## Assignment 2: Machine Learning Pipeline

In this assignment, we will create an Airflow DAG to schedule a basic machine learning pipeline. The pipeline will use the famous Iris dataset and consists of 2 steps:

1. Preprocess the dataset by adding some new features
2. Train a predictive model on the dataset

The goal is to schedule this training pipeline on a regular interval. This makes sure your model gets updated frequently with the latest data. 

> In this example, we will use the same dataset for every run, but in reality you would like to use a new dataset every time the pipeline is run. This can be done using Airflow's [templating](https://airflow.apache.org/macros.html) mechanism.

You are provided with 2 scripts located in the folder `transform_scripts`, called. Each script transforms an input file and stores it in an output file. The locations of the input and output files are provided as arguments to the scripts. The first script, `preprocess.py`, takes a CSV with raw training data, and outputs a CSV with preprocessed training data. The second script, `train.py`, takes a CSV with preprocessed training data, and outputs a pickled machine learning model. Also, you are provided with an S3 bucket containing our raw training data. Our DAG should to the following:

1. Retrieve the raw training data from S3, apply the `preprocess.py` transform script to it, and send the preprocessed CSV back to S3
2. Retrieve the preprocessed training data from S3, apply the `train.py` transform script to it, and send the pickled model to S3

Inspect the DAG below. It uses the `S3FileTransformOperator` to first preprocess the data, and then train the model. We see that the connection ID's to S3 are set to `'s3'`. Airflow has a feature that allows one to safely store connection information to external systems in the Airflow database. The connection to your bucket is already setup for you, and has the name 's3'. This name is specified in the operators, to let Airflow know which connection to use. 

Since you have your own S3 bucket for this workshop, you need to specify your user in the script to make it complete. Fill in your user in the cell below, and copy its contents. In the `dags` folder, create a new file called `ml_pipeline.py`, and paste the copied content inside. DAG also uses the transform scripts. Therefore, we also need to move the `transform_scripts` folder into the `dags` folder. This can be done in the file explorer of Jupyter, or by executing the cell below.

In [None]:
!cp -R transform_scripts dags/transform_scripts

In [5]:
import datetime
from airflow.models import DAG
from airflow.operators.s3_file_transform_operator import S3FileTransformOperator

user = ''  # TODO fill in your user here
bucket = f'pydata-eindhoven-2019-airflow-{user}'

with DAG(
    dag_id='ml_pipeline',
    schedule_interval='@daily',
    start_date=datetime.datetime(2019, 11, 27)
) as dag:
    preprocess_operator = S3FileTransformOperator(
        task_id='preprocess',
        transform_script='transform_scripts/preprocess.py',
        source_s3_key=f's3://{bucket}/raw_training_data.csv',
        dest_s3_key=f's3://{bucket}/preprocessed_training_data.csv',
        source_aws_conn_id='s3',
        dest_aws_conn_id='s3',
        replace=True
    )

    train_operator = S3FileTransformOperator(
        task_id='train',
        transform_script='transform_scripts/train.py',
        source_s3_key=f's3://{bucket}/preprocessed_training_data.csv',
        dest_s3_key=f's3://{bucket}/trained_model.pkl',
        source_aws_conn_id='s3',
        dest_aws_conn_id='s3',
        replace=True
    )

    preprocess_operator >> train_operator

In the Airflow UI, on port 8080, you should see your second DAG appear very soon. As in the first assignment, turn it on and refresh regularly to see the DAG runs succeed.

> We ran this pipeline on a single machine for this workshop. For large datasets or a large number of jobs, this does not scale well. Since Airflow is not a big data processing tool, a recommendation is to push computation to external systems as much as possible. For this example, we could decide to run our workload for example [AWS Sagemaker](https://aws.amazon.com/sagemaker/). 
>
> Also, Airflow provides various options for making it scalable. An example is the Kubernetes executor, which will spawn Kubernetes pods for every task instances ran. This allows Airflow to schedule a large number of parallel tasks, with as much resources as required for the task. To get started with this, check out [this blog](https://towardsdatascience.com/kubernetesexecutor-for-airflow-e2155e0f909c) by Brecht de Vlieger.

Now that the DAG runs are finished, we can check which files are generated on S3. Airflow uses the concept of [hooks](https://airflow.apache.org/concepts.html#hooks) to interact with external sources. Hooks usually only require a connection ID. Operators like the `S3FileTransformOperator` utilize these hooks to communicate with external systems.

We can use the [`S3Hook`](https://github.com/apache/airflow/blob/1.10.4/airflow/hooks/S3_hook.py) to list the resources contained in our S3 bucket. This is done in the code snippet below. Execute the code below to find out the contents of your bucket. Don't forget to add your user here again.