# Apache Airflow for building pipeline
In the field of Data Science, a data pipeline usually represents a system that ingests, processes, analyzes data.  Sometimes a pipeline also fits and evaluates machine learning models and produce predictions.  As I have came across quite a bit of job postings that ask for experience in building data pipelines with Apache Airflow, I decided to give it a try and increase my own competency.  

Apache Airflow is an open source pipeline building library that all the coding can be based on Python.  Since I mostly practice data science in Python, building pipelines with Airflow becomes a great option for me.

## Project goal
As the purpose is building a data pipeline, I want to dig out something interesting from open data source.  As social media has been a major way of communications, my idea is to cross-analyze Tweeter messages and other societal or environmental factors.

For example, it'll be interesting to see if human communications are affected by the weather, or if results of sports games spur more discussion than government policies do.

As this idea involves multiple data sources and various areas in machine learning, I try to lay out the plan of attack in different phases:  
- Phase 1: Ingest and aggregate weather data.  
    I'm planning to use the API of OpenWeatherMap to get region specific weather data.  Thankfully, OpenWeatherMap provides free-tier of service that allows 1M api calls per month, which suits my requirement very well
    
- Phase 2: Ingest Tweeter data and perform topic modeling.  
    Tweeter also provides free-tier API registration and is suitable for my needs.  For now, I am planning to do topics modeling using Gensim to find the trend of subject matters in people's tweets.  
    
- Phase 3: Ingest news feeds and perform topic modeling.  
    As I'll start by focusing on Canadian news, CBC has RSS feeds to be used.  I am interested in comparing the popular topics of Canadian news with the hot topics on Tweeter.

- Phase 4: Semantic analysis on Tweeter messages.  
    After the trend has been found, I am also interested in finding whether semantics of Tweeter messages do get affected by the weather and the news.

## Phase 1
Let's start by building the skeleton of a functional data pipeline for ingesting and processing weather data:
- Apache Airflow can be installed by entering `conda install -c conda-forge airflow` in the terminal
- Airflow provides an [official introduction to some basic concept of pipeline construction](https://airflow.apache.org/docs/stable/tutorial.html)

> First, we create a file called `AIRFLOW_HOME/dags/my_dag.py`.  In the beginning, import some useful libraries.
```python
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator

from datetime import datetime, date, time
from datetime import timedelta

import requests
import os
import pandas as pd
import json
import re
import altair as alt
```

> Create dictionaries for default arguments and cities I'm interested in.  Here we have only 3 cities for easier troubleshooting.  
The best part is that the arguments can be Python objects
```python
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2020, 9, 1),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(seconds = 5)
}
cityIds = {
    'Burnaby': 5911606,
    'Vancouver': 6173331,
    'Richmond': 6122085
}
```

> Define the first task for ingesting, processing and saving the most current weather data.  `OWM_KEY` is provided upon completing the registration with OpenWeatherMap.
```python
def task1():
    city, timestamp, condition, temperature, humidity, visibility, cloudiness = \
        list(), list(), list(), list(), list(), list(), list()
    absZero = -273.15
    t = datetime.now()

    for name, cityId in cityIds.items():
        w = requests.get(f"http://api.openweathermap.org/data/2.5/weather?id={cityId}&appid={os.environ['OWM_KEY']}") \
            .json()
        city.append(w['name'])
        timestamp.append(t)
        condition.append(w['weather'][0]['main'].lower())
        temperature.append(w['main']['temp'] + absZero)
        humidity.append(w['main']['humidity'])
        visibility.append(w['visibility'])
        cloudiness.append(w['clouds']['all'])
    weathers = pd.DataFrame({
            'city': city,
            'timestamp': timestamp,
            'condition': condition, 
            'temperature': temperature, 
            'humidity': humidity, 
            'visibility': visibility, 
            'cloudiness': cloudiness
        })
    weathers.to_csv(f"weathers_{t.date()}T{t.hour:02}-{t.minute:02}-{t.second:02}.csv", 
                    index = False)
```

> Task 2 reads the most current weather data from the disk and combine it with the historical one.  
```python
def read_csv_timestamp(path, cols = ['timestamp']):
    df = pd.read_csv(path)
    for col in cols:
        df[col] = df[col].apply(lambda x: datetime.fromisoformat(x))
    return df

def task2():
    latestCsv = sorted([x for x in os.listdir() if re.findall(r"^weathers.*\.csv$", x)])[-1]
    newWeathers = read_csv_timestamp(latestCsv)
    if not os.path.exists('all_weathers.csv'):
        allWeathers = pd.DataFrame({
                'city': list(),
                'timestamp': list(),
                'condition': list(), 
                'temperature': list(), 
                'humidity': list(), 
                'visibility': list(), 
                'cloudiness': list()
            })
    else:
        allWeathers = read_csv_timestamp("all_weathers.csv")
    allWeathers = allWeathers.append(newWeathers, ignore_index = True)
    allWeathers.to_csv("all_weathers.csv", index = False)
```

> Task 3 loads the comprehensive weather data and save the visualization as PNG.  Note that the saved PNG uses the pipeline timestamp as part of the filename. Because the pipeline scheduler coordinate the work based on the schedules defined by `start_date`, the pipeline timestamp can be different from the system time.
```python
def task3(ds, ts, **kwargs):
    allWeathers = read_csv_timestamp("all_weathers.csv")
    allWeathers['date'] = allWeathers['timestamp'].apply(lambda x: x.date())
    allWeathers['time'] = allWeathers['timestamp'].apply(lambda x: x.time())
    allWeathers['time'] = allWeathers['time'].apply(lambda x: x.isoformat('seconds'))
    base = alt.Chart(allWeathers.drop(columns = ['date'])).encode(
        alt.X('time:N')
    )
    bar = base.mark_bar().encode(
        alt.Y("temperature:Q"),
        facet = alt.Facet('city:N')
    )
    line = base.mark_line(color = 'red').encode(
        alt.Y("humidity:Q",
            scale = alt.Scale(zero = False))
    ).facet(
        alt.Facet('city:N'),
        columns = 3
    )
    line.save(f'test_{ts}.png', scale_factor = 1.2)
```

> After the main functions are defined, now we set up the default DAG and corresponding operators to perform the tasks.  Note that `provide_context = True` in `op3` passes the scheduler timestamp to `task3()`.
```python
dag = DAG(
    dag_id = 'my_dag',
    description = 'First DAG',
    default_args = default_args,
    schedule_interval = timedelta(minutes = 10),
)


op1 = PythonOperator(
    task_id = 'task1',
    python_callable = task1,
    dag = dag
)

op2 = PythonOperator(
    task_id = 'task2',
    python_callable = task2,
    # provide_context = True,
    dag = dag
)

op3 = PythonOperator(
    task_id = 'task3',
    python_callable = task3,
    provide_context = True,
    dag = dag
)
```

> Here is just a trivial example for showing that Airflow provides operators to execute commands in both Python and Bash.
```python
op_end_command = """
{% for i in range(5) %}
    echo "{{ macros.ds_add(ds, i) }}"
{% endfor %}
"""

op_end = BashOperator(
    task_id = 'last_task',
    bash_command = op_end_command,
    dag = dag
)
```

> At the end of `my_dag.py`, we define the sequence of operator execution.  The code below means: op1 -> op2 -> op3 -> op_end, which is a queued operation.  Later when the pipeline becomes more complicated, the operation sequnce should look more like a Directed Acyclic Graph.
```python
op1 >> op2
op2 >> op3
op3 >> op_end
```

After `my_dag.py` is completed, we can bring up the pipeline.  At AIRFLOW_HOME directory:
- run `airflow scheduler` to bring up the scheduler and start executing the tasks.
- open a new terminal, run `airflow webserver -p 8080` to bring up the Web UI for monitoring the pipeline.

> If the pipeline is running well, you should see a page like below:  

![](Airflow_UI.png)

> In the directory, the saved visualizations can also be found:

![](saved_visualizations.png)

> I also verified that the PNG files correctly represent the aggregated weather data.  At this point, the pipeline is properly ingesting, processing, and visualizing the weather data automatically.  We can look into Phase 2: Tweeter ingestion now.