# 5. Apache Airflow

Innerhalb des MLOps-Projekts wurde auch Apache Airflow zur Automatisierung von ausgewählten Arbeitsschritten verwendet. <br><br>
Zur Demonstration von Apache Airflow wurde der erste Teil (1.) "Data Correction" automatisiert. 

# 5.1 Vorgehen

Das Vorgehen zur Integration von Apache Airflow lässt sich grundsätzlich in folgende drei Schritte aufteilen:

1. Installation von Apache Airflow
2. Erstellung der Ordnerstruktur
3. Definition der Tasks & Skripte

Im ersten Schritt wurde dabei Apache Airflow installiert. <br>Im anschließenden Schritt wurde die Ordner- und Pfadstruktur festgelegt. Dabei wurde ein Ordner("dags") erstellt in welchem sich die Datei "airflow_mlops.py" befindet. Innerhalb des "dags"-Ordners ist dabei ein weiterer Unterordner ("airflow-dag-reg") hinterlegt. Innerhalb des Unterordners werden die Skripte hinterlegt, welche zur Automatisierung ausgeführt werden sollen. <br><br>
Innerhalb der Definition der Tasks & Skripte wurde die Datei "airflow_mlops.py" erstellt. Innerhalb des Dokuments können die jeweiligen Schritte, welche automatisiert werden sollen, definiert werden. In diesem Beispiel wurde nur eine Task ("t1") zur Automatisierung der Data Correction definiert. Dementsprechend ist nur ein Skript zur Ausführung der Task(s) hinterlegt, "data_prep.py". Durch die Datei "data_prep.py" werden die verschiedenen Schritte zur Data Correction mit Pandas durchgeführt.

# 5.2 Dateiübersicht

## Airflow_mlops.py

In [None]:
#---------------------------------------
# SETUP

# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG

# Operators; we need this to operate!
from airflow.operators.bash import BashOperator

# Module for manipulating dates and times
from datetime import datetime, timedelta

    # To change timezones, use Pendulum https://pendulum.eustace.io/

# Some convenience functions
from textwrap import dedent

#---------------------------------------
# DEFAULT DAG ARGUMENTS

with DAG(
    # the following string is the unique identifier for your DAG
    'airflow_mlops', 
    # These args will get passed on to each operator
    # You can override them on a per-task basis during operator initialization
    default_args={
        'depends_on_past': True,
        'email': ['my-email@example.com'],
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
        # 'queue': 'bash_queue',
        # 'pool': 'backfill',
        # 'priority_weight': 10,
        # 'end_date': datetime(2022, 6, 1),
        # 'wait_for_downstream': False,
        # 'sla': timedelta(hours=2),
        # 'execution_timeout': timedelta(seconds=300),
        # 'on_failure_callback': some_function,
        # 'on_success_callback': some_other_function,
        # 'on_retry_callback': another_function,
        # 'sla_miss_callback': yet_another_function,
        # 'trigger_rule': 'all_success'
    },
    description='Automatisierung der Datacorrection',
    schedule_interval=timedelta(days=1),
    start_date=datetime(2022, 1, 1),
    catchup=False,
    tags=['mlops'],

) as dag:


    #---------------------------------------
    # DEFINE OPERATERS
    
    t1 = BashOperator(
        depends_on_past=False,
        task_id='data_prep',
        bash_command='python /Users/lukas/airflow/dags/airflow-dag-reg/data_prep.py',
    )                      

    # Task documentation
    t1.doc_md = dedent(
        """\
    #### Task Documentation
    
    Data preparation

    """
    )

    # Hier könnten weitere tasks definiert werden (Beispiel Modellerstellung und Training)
    # t2 = ....

    #----------------
    # SETTING UP DEPENDENCIES 
    # Ausführung der definierten Tasks (t1) -> hier könnten weitere Tasks ausgeführt werden (Beispiel t2...)

    t1

Sobald die Task (t1) ausgeführt wird startet das Skript "data_prep.py" und führt die Data Correction durch Anschließend wird der Datensatz lokal auf dem angegebenen Pfad(Desktop) gespeichert.

## Data_prep.py

In [None]:
""" 
    Data preparation for Simulation for Data Science Products: Used car prices

Step 1) Import data (Used car prices) with pandas
Step 2) Data corrections
Step 3) Save data as csv to local folder

"""

#------------------------------------------------------
# Setup
import pandas as pd
from pathlib import Path

from my_path import home_path, airflow_path

#------------------------------------------------------
# Import Data

df = pd.read_csv("/Users/lukas/Documents/MLOPS/PL_2/car_prices.csv", error_bad_lines=False,warn_bad_lines=True)

#------------------------------------------------------
# Data correction

# Rename columns 
df = df.rename(columns={"make":"brand","body":"type","sellingprice":"price","saledate":"date"})

# Drop irrelevant columns for this project
df = df.drop(['vin','mmr'],axis=1)

# Drop missing values
df = df.dropna()

# Lower text to avoid duplicates
df.brand = df.brand.str.lower()
df.model = df.model.str.lower()
df.type = df.type.str.lower()

#------------------------------------------------------
#Export Data to CSV

df.to_csv(r'/Users/lukas/Desktop/car_price_airflow.csv', index = False)