In [1]:
# Defining a DAG for model training
# Components:
# 1. Data ingestion
# 2. Data preprocessing
# 3. Model training
# 4. Model evaluation
# 5. Model deployment

In [2]:
## Local
# export AIRFLOW_HOME=~/airflow
# airflow standalone
# or
# airflow db init
# airflow users  create --role Admin --username admin --email admin --firstname admin --lastname admin --password admin
# airflow webserver -p 8080
# airflow scheduler

In [None]:
## With docker compose:
# curl -LfO 'https://airflow.apache.org/docs/apache-airflow/latest/docker-compose.yaml'
# Add AIRFLOW__CORE__ENABLE_XCOM_PICKLING: 'true' in the env variables section
# mkdir -p ./dags ./logs ./plugins
# echo -e "AIRFLOW_UID=$(id -u)" > .env
# docker compose up airflow-init
# docker-compose up

# docker-compose down --volumes --remove-orphans

In [26]:
%%writefile airflow-docker/dags/training_module/training_model_pipeline.py
# Imports
import os
from datetime import datetime, timedelta
from airflow import DAG
from airflow.decorators import task, dag, task_group

import logging

# # Set the logging level
# logging.basicConfig(level=logging.INFO)
# # Specify the format of the log messages, the date format and the log level
# logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', datefmt='%d-%b-%y %H:%M:%S')
# logger = logging.getLogger(__name__)

# Define the DAG
# Tasks should encode outputs as dill, not json
@dag("training_model_pipeline", schedule_interval=None, start_date=datetime(2021, 1, 1), catchup=False)
def training_model_pipeline():
    @task.virtualenv(requirements=['pandas', 'scikit-learn', 'numpy'], use_dill=True)
    def ingest_data():
        from sklearn import datasets
        import pandas as pd
        
        print("Ingesting data")
        #Load iris dataset from sklearn
        print("Loading iris dataset")
        iris = datasets.load_iris()
        X = iris.data
        y = iris.target
        print("Data loaded")
        
        # Put the data into a dataframe
        print("Creating dataframe")
        df = pd.DataFrame(X, columns=iris.feature_names)
        df = pd.concat([df, pd.DataFrame(y, columns=["target"])], axis=1)
        print("Dataframe created")
        return df  
    
    @task.virtualenv(requirements=['pandas', 'scikit-learn', 'numpy'], use_dill=True)
    def preprocess_data(df):
        print("Preprocessing data")
        df = df.dropna()
        print("Data preprocessed")
        return df
    
    @task.virtualenv(requirements=['pandas', 'scikit-learn', 'numpy'], use_dill=True, multiple_outputs=True)
    def train_test_split(df):
        from sklearn.model_selection import train_test_split
        
        print("Splitting data")
        X = df.drop("target", axis=1)
        y = df["target"]
        
        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
        print("Data split")
        return {"X_train": X_train, 
                "X_test": X_test,
                "y_train": y_train,
                "y_test": y_test}
    
    @task.virtualenv(requirements=['pandas', 'scikit-learn', 'numpy', 'dill'], use_dill=True)
    def train_model(X_train, y_train):
        from sklearn.ensemble import RandomForestClassifier
        import dill
        from pathlib import Path
        from datetime import datetime
        
        print("Training model")
        model = RandomForestClassifier()
        model.fit(X_train, y_train)
        print("Model trained")
        
        # Save the model in airflow data folder
        model_path = Path("/opt/airflow/data/")
        model_path.mkdir(parents=True, exist_ok=True)
        
        model_path = str(model_path / f"model_{datetime.now().strftime('%Y%m%d%H%M%S')}.pkl")
        
        with open(model_path, "wb") as f:
            dill.dump(model, f)
        return model_path
    
    @task.virtualenv(requirements=['pandas', 'scikit-learn', 'numpy', 'dill'], use_dill=True, multiple_outputs=True)
    def evaluate_model(model, X_test, y_test):
        from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
        import dill
        
        print("Loading model")
        with open(model, "rb") as f:
            model = dill.load(f)
            
        print("Model loaded")
        
        print("Evaluating model")
        y_pred = model.predict(X_test)
        accuracy = accuracy_score(y_test, y_pred)
        report = classification_report(y_test, y_pred)
        matrix = confusion_matrix(y_test, y_pred)
        print("Model evaluated")
        print(f"Accuracy: {accuracy}")
        return {"accuracy": accuracy, "report": report, "matrix": matrix}
    
    # Build the DAG
    df = ingest_data()
    df = preprocess_data(df)
    split_data = train_test_split(df)
    model_path = train_model(split_data["X_train"], split_data["y_train"])
    result = evaluate_model(model_path, split_data["X_test"], split_data["y_test"])
    
# Instantiate the DAG
dag = training_model_pipeline()

Overwriting airflow-docker/dags/training_module/training_model_pipeline.py
