<a href="https://colab.research.google.com/github/BrianKipngeno/MLOPS-pipeline-using-Apache-Airflow/blob/main/MLOPS_using_Apache_airflow.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Introduction

Machine Learning Operations (MLOps) integrates machine learning model development and deployment into a robust, automated pipeline.

I’ll demonstrate how to build an MLOps pipeline using Apache Airflow to automate preprocessing, model training, and deployment tasks.

# Overview

The given dataset contains app usage behaviour with five key columns:

- Date (usage day)
- App (e.g., Instagram, WhatsApp)
- Usage (minutes spent)
- Notifications (alerts received)
and Times Opened (app launches).

The goal of this pipeline is to streamline the process of analyzing screentime data by automating its preprocessing and utilizing machine learning to predict app usage.

To ensure seamless execution, we will design an Airflow DAG to schedule and automate daily data preprocessing tasks to support a robust and scalable workflow.

# Building an MLOps Pipeline using Apache Airflow

## Preprocessing

In [None]:
# Let's begin with the necessary data preprocessing steps

import pandas as pd
from sklearn.preprocessing import MinMaxScaler

# load the dataset
data = pd.read_csv('/content/screentime_analysis (1).csv')


In [None]:
data.head()

Unnamed: 0,Date,App,Usage (minutes),Notifications,Times Opened
0,2024-08-07,Instagram,81,24,57
1,2024-08-08,Instagram,90,30,53
2,2024-08-26,Instagram,112,33,17
3,2024-08-22,Instagram,82,11,38
4,2024-08-12,Instagram,59,47,16


In [None]:
# check for missing
print(data.isnull().sum())

Date               0
App                0
Usage (minutes)    0
Notifications      0
Times Opened       0
dtype: int64


In [None]:
# Checking for duplicates
print(data.duplicated().sum())

0


In [None]:
# Checking the datatypes
data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 200 entries, 0 to 199
Data columns (total 5 columns):
 #   Column           Non-Null Count  Dtype 
---  ------           --------------  ----- 
 0   Date             200 non-null    object
 1   App              200 non-null    object
 2   Usage (minutes)  200 non-null    int64 
 3   Notifications    200 non-null    int64 
 4   Times Opened     200 non-null    int64 
dtypes: int64(3), object(2)
memory usage: 7.9+ KB


In [None]:
# coDate column to datetime and extract features
data['Date'] = pd.to_datetime(data['Date'])
data['DayOfWeek'] = data['Date'].dt.dayofweek
data['Month'] = data['Date'].dt.month

In [None]:
# Encoding the categorical 'App' column using one-hot encoding
data = pd.get_dummies(data, columns=['App'], drop_first=True)

In [None]:
# Scaling numerical features using MinMaxScaler
scaler = MinMaxScaler()
data[['Notifications', 'Times Opened']] = scaler.fit_transform(data[['Notifications', 'Times Opened']])


In [None]:
# Feature engineering
data['Previous_Day_Usage'] = data['Usage (minutes)'].shift(1)
data['Notifications_x_TimesOpened'] = data['Notifications'] * data['Times Opened']


In [None]:
# Let's save the preprocessed data to a file
data.to_csv('preprocessed_screentime_analysis.csv', index=False)

## Summary of preprocessing

- The above code performs data preprocessing to prepare the screentime dataset for machine learning.

- It begins by loading the dataset and ensuring data quality through checks for missing values and duplicates.

- It then processes the Date column to extract useful temporal features like DayOfWeek and Month.

- The App column is transformed using one-hot encoding to convert it into a numeric format.

- The process scales numerical columns, such as Notifications and Times Opened, using MinMaxScaler to ensure uniformity.
- Feature engineering creates lagged (Previous_Day_Usage) and interaction (Notifications_x_TimesOpened) features to enhance predictive power.

## Training the Model

In [None]:
#  We will train a Random Forest model to predict app usage

from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_absolute_error

# split data into features and target variable
X = data.drop(columns=['Usage (minutes)', 'Date'])
y = data['Usage (minutes)']

# train-test split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# train the model
model = RandomForestRegressor(random_state=42)
model.fit(X_train, y_train)

# evaluate the model
predictions = model.predict(X_test)
mae = mean_absolute_error(y_test, predictions)
print(f'Mean Absolute Error: {mae}')

Mean Absolute Error: 15.398500000000002


 - We are splitting the preprocessed data into training and testing sets, training a Random Forest Regressor model, and evaluating its performance.

- First, the process separates the target variable (Usage (minutes)) from the features and performs an 80-20 train-test split. The training data is used to train the RandomForestRegressor model. After completing the training, the model generates predictions on the test set, and the Mean Absolute Error (MAE) metric quantifies the average difference between the predicted and actual values to assess performance.

- The Mean Absolute Error (MAE) of 15.3985 indicates that, on average, the model’s predicted screentime differs from the actual screentime by approximately 15.4 minutes. This gives a measure of the model’s predictive accuracy, showing that while the model performs reasonably well, there is still room for improvement in reducing this error to make predictions more precise.

# Automating Preprocessing with a Pipeline using Apache Airflow

Apache Airflow enables the automation of tasks using Directed Acyclic Graphs (DAGs). Here, we will use a DAG to build a pipeline to preprocess data daily.

In [None]:
# Let's first install Apache airflow

!pip install apache-airflow

Collecting apache-airflow
  Downloading apache_airflow-2.10.5-py3-none-any.whl.metadata (45 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/45.4 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m45.4/45.4 kB[0m [31m2.2 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting alembic<2.0,>=1.13.1 (from apache-airflow)
  Downloading alembic-1.15.1-py3-none-any.whl.metadata (7.2 kB)
Collecting argcomplete>=1.10 (from apache-airflow)
  Downloading argcomplete-3.6.0-py3-none-any.whl.metadata (16 kB)
Collecting asgiref>=2.3.0 (from apache-airflow)
  Downloading asgiref-3.8.1-py3-none-any.whl.metadata (9.3 kB)
Collecting colorlog>=6.8.2 (from apache-airflow)
  Downloading colorlog-6.9.0-py3-none-any.whl.metadata (10 kB)
Collecting configupdater>=3.1.1 (from apache-airflow)
  Downloading ConfigUpdater-3.2-py2.py3-none-any.whl.metadata (10 kB)
Collecting connexion<3.0,>=2.14.2 (from connexion[flask]<3.0,>=2.14.2->apache-airf

In [None]:
# We will define the DAG and task to build the pipeline
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

# define the data preprocessing function
def preprocess_data():
    file_path = '/content/screentime_analysis (1).csv'
    data = pd.read_csv(file_path)

    data['Date'] = pd.to_datetime(data['Date'])
    data['DayOfWeek'] = data['Date'].dt.dayofweek
    data['Month'] = data['Date'].dt.month

    data = data.drop(columns=['Date'])

    data = pd.get_dummies(data, columns=['App'], drop_first=True)

    scaler = MinMaxScaler()
    data[['Notifications', 'Times Opened']] = scaler.fit_transform(data[['Notifications', 'Times Opened']])

    preprocessed_path = 'preprocessed_screentime_analysis.csv'
    data.to_csv(preprocessed_path, index=False)
    print(f"Preprocessed data saved to {preprocessed_path}")

# define the DAG
dag = DAG(
    dag_id='data_preprocessing',
    schedule_interval='@daily',
    start_date=datetime(2025, 1, 1),
    catchup=False,
)

# define the task
preprocess_task = PythonOperator(
    task_id='preprocess',
    python_callable=preprocess_data,
    dag=dag,
)

- The above code defines a Directed Acyclic Graph with a single task to preprocess screentime data.
- The preprocess_data function loads the dataset, extracts temporal features (DayOfWeek and Month) from the Date column, encodes the App column using one-hot encoding, and scales numerical features (Notifications and Times Opened) using MinMaxScaler.

- Next, the system saves the processed data to a new CSV file.
- The Airflow DAG schedules this task daily, which ensures automation and reproducibility in the data preparation process.