**Exercise 1:** Predicting red and white wine quality from our database

- Import the red wine data into a database such as MariaDB (do this outside the pipeline)
- Import the white wine data into a database such as MariaDB (do this outside the pipeline)
- Allow the pipeline’s ingestion stage to now ingest both the red wine data and the white wine
  data from the relational database, combine these into a single table and store them in a column
  store at the end of the ingestion stage
- Allow the pipeline’s validation stage to consume the data from the columnstore database and
  then store this data directly into Redis (without further validation)
- Allow the pipeline’s preparation stage to consume the data from redis and then split the data
  into appropriate training and testing sub-sets (without further cleaning / preparation), storing
  only the separate train_x, train_y, test_x and test_y split data in redis
- Allow the pipeline’s training stage to retrieve the necessary separate training and test data
  samples from redis (train_x, train_y, test_x and test_y), initiate an MLFlow run, fit the model
  then store this model and any associated relevant data into redis
- Allow the pipeline’s evaluation stage to evaluate the model using mean absolute error, mean
  squared error, r2 score and median absolute error then end the run of the mlflow experiment.
  The run should store the relevant parameters used to build the model as well as the evaluation
  metrics used to assess the model quality on the test set of data, it should also log the model to
  the relevant MLFlow data store.


Ingesting redwine and whitewine data and pushing to mariaDB after combining


In [None]:
import pandas as pd
from sqlalchemy import create_engine

eng_conn = create_engine(
    "mysql+pymysql://birat:birat%2312345@localhost:3306/wine_db")

query = """
SELECT * FROM whiteWine
UNION
SELECT * FROM redWine;
"""

wine = pd.read_sql(query, eng_conn, index_col='index')
wine.to_sql("combinedWine", eng_conn, if_exists="replace")

6497

Collection of simple helper functions for making life easy


In [None]:
import redis
import pickle
import pyarrow as pa
import pyarrow.parquet as pq
redis_conn = redis.Redis(host='127.0.0.1', port=6379)


def store_pickle(key, obj):
    redis_conn.set(key, pickle.dumps(obj))


def retrieve_pickle(key):
    return pickle.loads(redis_conn.get(key))


def store_df(key, df):
   
    if isinstance(df, pd.Series):
        df = df.to_frame()
    table = pa.Table.from_pandas(df)
    buffer_reader = pa.BufferOutputStream()
    pq.write_table(table, buffer_reader)
    serialized_df = buffer_reader.getvalue().to_pybytes()
    redis_conn.set(key, serialized_df)


def retrieve_df(key):
    data = redis_conn.get(key)
    buffer_reader = pa.BufferReader(data)
    parquet_table = pq.read_table(buffer_reader)
    return parquet_table.to_pandas()

Collecting data from mariaDB and storing in redis


In [None]:
import warnings
import numpy as np
import pandas as pd
from sqlalchemy import create_engine

eng_conn = create_engine(
    "mysql+pymysql://birat:birat%2312345@localhost:3306/wine_db")

query = "SELECT * FROM combinedWine"
wine = pd.read_sql(query, eng_conn, index_col='index')

for col in wine.columns:
    if wine[col].dtype == np.dtype('object'):
        raise TypeError(f'Column {col} is of type object')

    if wine[col].isna().sum() > 0:
        raise warnings.warn(f'Column {col} contains null values')

store_df('wine_key', wine)

Retrieving df from redis store and splitting into train_x, train_y, test_x, test_y and storing them in redis


In [None]:
from sklearn.model_selection import train_test_split

wine_df = retrieve_df('wine_key')

FEATURES = [col for col in wine_df.columns if col != 'quality']
train_x, test_x, train_y, test_y = train_test_split(
    wine_df[FEATURES], wine_df['quality'], random_state=59)

data_dict = {'train_x': train_x, 'train_y': train_y,
             'test_x': test_x, 'test_y': test_y}
for key in data_dict.keys():
    store_df(key, data_dict[key])
    print(f'{key} serialized and stored to redis.')

train_x serialized and stored to redis.
train_y serialized and stored to redis.
test_x serialized and stored to redis.
test_y serialized and stored to redis.


Retrieving train_x and train_y and training RandomForestRegressor and export model, model params and run_id to redis


In [None]:
import joblib
import mlflow
from sklearn.ensemble import RandomForestRegressor

X_train = retrieve_df('train_x')
y_train = retrieve_df('train_y')

# Enable MLflow tracking
mlflow.set_tracking_uri("http://localhost:5000")  
mlflow.set_experiment("Wine_Quality_Prediction")


with mlflow.start_run() as run:
    n_estimators = 1000
    max_depth = 15
    random_state = 5

    clf = RandomForestRegressor(
        n_estimators=n_estimators, max_depth=max_depth, random_state=random_state)
    clf.fit(X_train, y_train)
    store_pickle('rfr_model', clf)
    hyperparameters = {'n_estimators': n_estimators,
                       'max_depth': max_depth, 'random_state': random_state}
    store_pickle('rfr_hyperparameters', hyperparameters)
    store_pickle('rfr_run_id', run.info.run_id)

Retrieving model, model params and run id and logging model metrics.


In [None]:
import pandas as pd
import mlflow.pyfunc
from sklearn.metrics import mean_absolute_error, r2_score, mean_squared_error, median_absolute_error

clf = retrieve_pickle('rfr_model')
X_test = retrieve_df('test_x')
y_test = retrieve_df('test_y')
hyperparameters = retrieve_pickle('rfr_hyperparameters')
run_id = retrieve_pickle('rfr_run_id')

# Enable MLflow tracking
mlflow.set_tracking_uri("http://localhost:5000")  # or leave as default local

mlflow.set_experiment("Wine_Quality_Prediction")
with mlflow.start_run(run_id=run_id) as run:

    y_pred = clf.predict(X_test)
    metrics = {
        'abs_error': mean_absolute_error(y_pred, y_test),
        'sq_error': mean_squared_error(y_pred, y_test),
        'r2': r2_score(y_pred, y_test),
        'med_abs_error': median_absolute_error(y_pred, y_test)
    }

    # Tracks hyperparameters
    mlflow.log_params(hyperparameters)

    # Records performance
    mlflow.log_metrics(metrics)

    model_path = "wine_quality_model"
    joblib.dump(clf, model_path + ".pkl")
    mlflow.log_artifact(model_path + ".pkl")

    # Logs and optionally registers the model.
    mlflow.sklearn.log_model(clf, "model")

    model_info = mlflow.register_model(
        model_uri=f"runs:/{run.info.run_id}/model",
        name="WineQualityRandomForestModel"
    )

    print(f"Model registered with name: {model_info.name}")
    print(f"Model version: {model_info.version}")
    print(f"Run ID: {run.info.run_id}")


# Assuming model_info is a ModelVersion object
model_name = model_info.name  # Get the registered model name
model_version = model_info.version  # Get the model version

# Construct the model URI for the Model Registry
model_uri = f"models:/{model_name}/{model_version}"

print(model_name)
print(model_version)
print(model_uri)
print("Loading model from Model Registry...")
# Load the model from the Model Registry
loaded_model = mlflow.pyfunc.load_model(model_uri)

predictions = loaded_model.predict(X_test)

# Create result DataFrame
result = pd.DataFrame(X_test)
result["actual_class"] = y_test
result["predicted_class"] = predictions

print(result[:4])

**Exercise 2:** LAX to JFK Flight delays prediction

- Using the previous Airline sample data which we have used in prior exercises
  (https://developer.ibm.com/exchanges/data/all/airline/) , plan out and implement a Machine
  Learning Pipeline using Airflow and MLFlow
- To create a basic plan for this pipeline you should at least:
  - Produce an appropriate data model that describes the data (such as an ERD)
    - This data model should as a minimum:
      - Capture the attributes (columns) each relation (table) contains, along with their types.
      - It should also attempt to capture any relationships between relations (tables). Note: if there is only one table with no relations with another, then this step
    - You may also wish to attempt to produce a Data Structure Diagram, which differs from ERDs in that it allows us to identify the relationships between attributes within an entity
    - Any appropriate diagramming / CASE software may be used for this task (e.g. Draw.io, Visio, VisualParadigm, etc)


- To create a basic plan for this pipeline you should at least:
  - Produce an appropriate DAG pipeline plan that specifies the sequence of tasks that will be performed
  - Produce an appropriate DAG pipeline plan that specifies the sequence of operations that must be performed within each task (e.g. when ingesting the data, what steps need to be taken, and in what order?)
  - Detail what tools / technologies will be needed at each stage


- To implement the pipeline you should at least:
  - Provide a means of appropriately ingesting the initial dataset from a relational database
  - Provide a means of appropriately logging the attempt at training and evaluating the model to MLFlow
  - Provide a means of storing the trained and evaluated model coupled along with the MLFlow results


In [None]:
import pandas as pd
from sqlalchemy import create_engine

eng_conn = create_engine(
    "engine = create_engine('mysql+pymysql://root:sandeshpass@localhost:3308/Airline")
airline = pd.read_sql('SELECT * FROM LAXTOJFK', eng_conn)

In [None]:
airline.head()

In [None]:
import pandas as pd

data = pd.read_csv('airbnb.csv', encoding='latin-1')

In [None]:
cols = list(data.columns)
usable_cols = [col for col in cols if data[col].isna().sum() <=
               len(data[col])*0.1]

In [4]:
pd.set_option('display.max_columns', None)
data[usable_cols].head()

Unnamed: 0,Year,Quarter,Month,DayofMonth,DayOfWeek,FlightDate,Reporting_Airline,DOT_ID_Reporting_Airline,IATA_CODE_Reporting_Airline,Flight_Number_Reporting_Airline,OriginAirportID,OriginAirportSeqID,OriginCityMarketID,Origin,OriginCityName,OriginState,OriginStateFips,OriginStateName,OriginWac,DestAirportID,DestAirportSeqID,DestCityMarketID,Dest,DestCityName,DestState,DestStateFips,DestStateName,DestWac,CRSDepTime,DepTime,DepDelay,DepDelayMinutes,DepDel15,DepartureDelayGroups,DepTimeBlk,CRSArrTime,ArrTime,ArrDelay,ArrDelayMinutes,ArrDel15,ArrivalDelayGroups,ArrTimeBlk,Cancelled,Diverted,CRSElapsedTime,ActualElapsedTime,Flights,Distance,DistanceGroup
0,1998,1,1,2,5,1998-01-02,NW,19386,NW,675,13487,1348701,31650,MSP,"Minneapolis, MN",MN,27.0,Minnesota,63,14869,1486902,34614,SLC,"Salt Lake City, UT",UT,49.0,Utah,87,1640,1659.0,19.0,19.0,1.0,1.0,1600-1659,1836,1859.0,23.0,23.0,1.0,1.0,1800-1859,0.0,0.0,176.0,180.0,1.0,991.0,4
1,2009,2,5,28,4,2009-05-28,FL,20437,FL,671,13342,1334202,33342,MKE,"Milwaukee, WI",WI,55.0,Wisconsin,45,13204,1320401,31454,MCO,"Orlando, FL",FL,12.0,Florida,33,1204,1202.0,-2.0,0.0,0.0,-1.0,1200-1259,1541,1541.0,0.0,0.0,0.0,0.0,1500-1559,0.0,0.0,157.0,159.0,1.0,1066.0,5
2,2013,2,6,29,6,2013-06-29,MQ,20398,MQ,3297,11921,1192102,31921,GJT,"Grand Junction, CO",CO,8.0,Colorado,82,11298,1129803,30194,DFW,"Dallas/Fort Worth, TX",TX,48.0,Texas,74,1630,1644.0,14.0,14.0,0.0,0.0,1600-1659,1945,1942.0,-3.0,0.0,0.0,-1.0,1900-1959,0.0,0.0,135.0,118.0,1.0,773.0,4
3,2010,3,8,31,2,2010-08-31,DL,19790,DL,1806,12892,1289201,32575,LAX,"Los Angeles, CA",CA,6.0,California,91,11433,1143301,31295,DTW,"Detroit, MI",MI,26.0,Michigan,43,1305,1305.0,0.0,0.0,0.0,0.0,1300-1359,2035,2015.0,-20.0,0.0,0.0,-2.0,2000-2059,0.0,0.0,270.0,250.0,1.0,1979.0,8
4,2006,1,1,15,7,2006-01-15,US,20355,US,465,11618,1161801,31703,EWR,"Newark, NJ",NJ,34.0,New Jersey,21,11057,1105702,31057,CLT,"Charlotte, NC",NC,37.0,North Carolina,36,1820,1911.0,51.0,51.0,1.0,3.0,1800-1859,2026,2058.0,32.0,32.0,1.0,2.0,2000-2059,0.0,0.0,126.0,107.0,1.0,529.0,3


In [None]:
# List DepDelayMinutes where ['ArrDel15'] == ['ArrDelay']
delayed_flights = data[usable_cols][data['ArrDel15'] == data['ArrDelay']]

In [6]:
# Display the first few rows of the filtered data
delayed_flights.head()

Unnamed: 0,Year,Quarter,Month,DayofMonth,DayOfWeek,FlightDate,Reporting_Airline,DOT_ID_Reporting_Airline,IATA_CODE_Reporting_Airline,Flight_Number_Reporting_Airline,OriginAirportID,OriginAirportSeqID,OriginCityMarketID,Origin,OriginCityName,OriginState,OriginStateFips,OriginStateName,OriginWac,DestAirportID,DestAirportSeqID,DestCityMarketID,Dest,DestCityName,DestState,DestStateFips,DestStateName,DestWac,CRSDepTime,DepTime,DepDelay,DepDelayMinutes,DepDel15,DepartureDelayGroups,DepTimeBlk,CRSArrTime,ArrTime,ArrDelay,ArrDelayMinutes,ArrDel15,ArrivalDelayGroups,ArrTimeBlk,Cancelled,Diverted,CRSElapsedTime,ActualElapsedTime,Flights,Distance,DistanceGroup
1,2009,2,5,28,4,2009-05-28,FL,20437,FL,671,13342,1334202,33342,MKE,"Milwaukee, WI",WI,55.0,Wisconsin,45,13204,1320401,31454,MCO,"Orlando, FL",FL,12.0,Florida,33,1204,1202.0,-2.0,0.0,0.0,-1.0,1200-1259,1541,1541.0,0.0,0.0,0.0,0.0,1500-1559,0.0,0.0,157.0,159.0,1.0,1066.0,5
53,2004,4,12,21,2,2004-12-21,WN,19393,WN,1111,10423,1042302,30423,AUS,"Austin, TX",TX,48.0,Texas,74,11540,1154001,30615,ELP,"El Paso, TX",TX,48.0,Texas,74,1150,1150.0,0.0,0.0,0.0,0.0,1100-1159,1225,1225.0,0.0,0.0,0.0,0.0,1200-1259,0.0,0.0,95.0,95.0,1.0,528.0,3
56,2001,4,10,11,4,2001-10-11,WN,19393,WN,378,14107,1410701,30466,PHX,"Phoenix, AZ",AZ,4.0,Arizona,81,13796,1379601,32457,OAK,"Oakland, CA",CA,6.0,California,91,650,650.0,0.0,0.0,0.0,0.0,0600-0659,850,850.0,0.0,0.0,0.0,0.0,0800-0859,0.0,0.0,120.0,120.0,1.0,646.0,3
61,1997,2,5,26,1,1997-05-26,HP,19991,HP,2866,14107,1410701,30466,PHX,"Phoenix, AZ",AZ,4.0,Arizona,81,14679,1467902,33570,SAN,"San Diego, CA",CA,6.0,California,91,1555,1555.0,0.0,0.0,0.0,0.0,1500-1559,1659,1659.0,0.0,0.0,0.0,0.0,1600-1659,0.0,0.0,64.0,64.0,1.0,304.0,2
184,2004,4,11,26,5,2004-11-26,WN,19393,WN,80,11259,1125902,30194,DAL,"Dallas, TX",TX,48.0,Texas,74,12191,1219101,31453,HOU,"Houston, TX",TX,48.0,Texas,74,1000,1005.0,5.0,5.0,0.0,0.0,1000-1059,1100,1100.0,0.0,0.0,0.0,0.0,1100-1159,0.0,0.0,60.0,55.0,1.0,239.0,1


- Produce an appropriate DAG pipeline plan that specifies the sequence of operations that
  must be performed within each task (e.g. when ingesting the data, what steps need to be
  taken, and in what order?)


### 1. Load Data:

- A csv file of airline is loaded into pandas

### 2. Data Preprocessing:

- Columns are changed to lower memory hungry data type of float32 or int32 from their 64 bit counterparts
- Rows with missing values are imputed with mean/mode of their respective rows
- Since the dataset is huge only columns that are usable columns will be forwarded

### 3. Data Ingestion:

- A connection to mariadb and redis server is started
- Data is stored in mariadb for future use.
- Data is stored in redis for immediate use

### 4. Model Development:

- A connection to redis server is made
- Data is splitted into X_train, y_train, X_test, y_test
- XGBoost Model is fitted in the training data
- Trained model and test data is pushed to the redis server

### 5. Model Evaluation:

- A connection to the redis server is made
- Model and testing data is loaded into local variables
- Model is evalueted using RMSE, MAE, r2 score, median absolute error
- Model, metrics and its parameters are logged.


In [None]:
import pandas as pd
from sqlalchemy import create_engine
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
import numpy as np
import pyarrow as pa
import pyarrow.parquet as pq
import pickle
import redis
from sklearn.model_selection import train_test_split
import mlflow
from xgboost import XGBClassifier
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from datetime import timedelta, datetime

redis_conn = redis.Redis(host='127.0.0.1', port=6379)

eng_conn = create_engine(
    "engine = create_engine('mysql+pymysql://root:biratpass@localhost:3308/Airline")


def store_pickle(key, obj):
    redis_conn.set(key, pickle.dumps(obj))


def retrieve_pickle(key):
    return pickle.loads(redis_conn.get(key))


def store_df(key, df):
    # Used to_frame here because pa.Table only support dataframe and not series while pa support Series by using method like pa.Array this causes other errors later
    if isinstance(df, pd.Series):
        df = df.to_frame()
    table = pa.Table.from_pandas(df)
    buffer_reader = pa.BufferOutputStream()
    pq.write_table(table, buffer_reader)
    serialized_df = buffer_reader.getvalue().to_pybytes()
    redis_conn.set(key, serialized_df)


def retrieve_df(key):
    data = redis_conn.get(key)
    buffer_reader = pa.BufferReader(data)
    parquet_table = pq.read_table(buffer_reader)
    return parquet_table.to_pandas()


def read_csv():
    df_airline = pd.read_csv('airbnb.csv')
    df_airline.to_sql('Airline', con=eng_conn, if_exists="replace")


def preprocess_data():
    query = "SELECT * FROM Airline"
    df_airline = pd.read_sql(query, eng_conn)
    usable_col = [col for col in df_airline.columns if df_airline[col].isna(
    ).sum() <= len(df_airline[col])*0.1]
    new_df = df_airline[usable_col]
    numerical_cols = new_df.select_dtypes(include=['int64', 'float64'])
    new_df['ArrDel15'] = new_df['ArrDel15'].dropna()
    for col in numerical_cols:
        if new_df[col].dtype == np.dtype('float64'):
            new_df[col] = new_df[col].astype('float32')
        elif new_df[col].dtype == np.dtype('int64'):
            new_df[col] = new_df[col].astype('int32')
        new_df[col] = new_df[col].fillna(new_df[col].mean())

    categorical_cols = new_df.select_dtypes(include=['object'])
    for col in categorical_cols:
        new_df[col] = new_df[col].fillna('nan')
        new_df[col] = new_df[col].astype('category')
    # Storing new_df to mariadb
    new_df.to_sql('airlinePreprocessed', con=eng_conn, if_exists="replace")

    # Storing new_df to redis
    store_df('airlinePreprocessed', new_df)


def model_training():
    airline_df = retrieve_df('airlinePreprocessed')
    FEATURES = [col for col in airline_df.columns if col != 'ArrDel15']
    X_train, X_test, y_train, y_test = train_test_split(
        airline_df[FEATURES], airline_df['ArrDel15'])

    # Enable MLflow tracking
    mlflow.set_tracking_uri("http://localhost:5000")
    mlflow.set_experiment("Airline_Flight_Delay_Prediction")
    with mlflow.start_run() as run:
        xgb = XGBClassifier(enable_categorical=True)
        xgb.fit(X_train, y_train)
        store_pickle('xgb_model', xgb)
        store_df('X_test', X_test)
        store_df('y_test', y_test)
        store_pickle('run_id', run.info.run_id)


def model_evaluation():
    X_test = retrieve_df('X_test')
    y_test = retrieve_df('y_test')
    xgb = retrieve_pickle('xgb_model')
    run_id = retrieve_pickle('run_id')

    mlflow.set_tracking_uri("http://localhost:5000")
    mlflow.set_experiment("Airline_Flight_Delay_Prediction")

    with mlflow.start_run(run_id=run_id) as run:
        y_pred = xgb.predict(X_test)
        accuracy = accuracy_score(y_test, y_pred)
        precision = precision_score(
            y_test, y_pred, average='binary', pos_label=1)
        f1 = f1_score(y_test, y_pred, average='binary', pos_label=1)
        recall = recall_score(y_test, y_pred, average='binary', pos_label=1)

        mlflow.log_metric("accuracy", accuracy)
        mlflow.log_metric("precision", precision)
        mlflow.log_metric("recall", recall)
        mlflow.log_metric("f1_score", f1)


default_args = {
    "owner": "birat gautam",
    "depends_on_past": False,
    "email": ["birat.gautam@mail.bcu.ac.uk"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 0,
    "retry_delay": timedelta(minutes=5)
}

with DAG(
    "lab8",
    default_args=default_args,
    description="A complete mlops pipeline for flight delay prediction",
    schedule_interval=timedelta(days=1),
    start_date=datetime(2021, 10, 10),
    catchup=False,
    tags=["Airline"]
) as dag:
    task1 = PythonOperator(
        task_id="read_csv",
        python_callable=read_csv
    )
    task2 = BashOperator(
        task_id="sleep_for_5_1",
        depends_on_past=False,
        bash_command="sleep 5",
        retries=0
    )
    task3 = PythonOperator(
        task_id="preprocess_data",
        python_callable=preprocess_data
    )
    task4 = BashOperator(
        task_id="sleep_for_5_2",
        depends_on_past=False,
        bash_command="sleep 5",
        retries=0
    )
    task5 = PythonOperator(
        task_id="model_training",
        python_callable=model_training
    )
    task6 = BashOperator(
        task_id="sleep_for_5_3",
        depends_on_past=False,
        bash_command="sleep 5",
        retries=0
    )
    task7 = PythonOperator(
        task_id="model_evaluation",
        python_callable=model_evaluation
    )

    task1 >> task2 >> task3 >> task4 >> task5 >> task6 >> task7