#Make sure airflow installed
#pip install apache-airflow

Let’s start building an MLOps pipeline using Apache Airflow with the necessary data preprocessing steps:

In [1]:
import pandas as pd
from sklearn.preprocessing import MinMaxScaler

# load the dataset
data = pd.read_csv('C://Users//ajays//Downloads//screentime_analysis.csv')

# check for missing values and duplicates
print(data.isnull().sum())
print(data.duplicated().sum())

# convert Date column to datetime and extract features
data['Date'] = pd.to_datetime(data['Date'])
data['DayOfWeek'] = data['Date'].dt.dayofweek
data['Month'] = data['Date'].dt.month

# encode the categorical 'App' column using one-hot encoding
data = pd.get_dummies(data, columns=['App'], drop_first=True)

# scale numerical features using MinMaxScaler
scaler = MinMaxScaler()
data[['Notifications', 'Times Opened']] = scaler.fit_transform(data[['Notifications', 'Times Opened']])

# feature engineering
data['Previous_Day_Usage'] = data['Usage (minutes)'].shift(1)
data['Notifications_x_TimesOpened'] = data['Notifications'] * data['Times Opened']

# save the preprocessed data to a file
data.to_csv('preprocessed_screentime_analysis.csv', index=False)

Date               0
App                0
Usage (minutes)    0
Notifications      0
Times Opened       0
dtype: int64
0


Next, after preprocessing, we will train a Random Forest model to predict app usage:

In [2]:
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_absolute_error

# Assuming 'data' is your DataFrame

# Step 1: Check for missing values
print("Missing values in each column:")
print(data.isnull().sum())

# Step 2: Handle missing values by filling them with the mean of each column
data = data.fillna(data.mean())

# Step 3: Select only numeric columns
# Alternative way to select numeric columns
numeric_data = data._get_numeric_data()

# Step 4: Check for infinite values in numeric columns
print("Infinite values in each numeric column:")
print(np.isinf(numeric_data).sum())

# Step 5: Handle infinite values by replacing them with NaN and then filling with the mean
data[numeric_data.columns] = numeric_data.replace([np.inf, -np.inf], np.nan)
data = data.fillna(data.mean())

# Step 6: Split data into features (X) and target variable (y)
# Ensure 'Usage (minutes)' and 'Date' are valid column names in your dataset
X = data.drop(columns=['Usage (minutes)', 'Date'])
y = data['Usage (minutes)']

# Step 7: Train-test split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Step 8: Train the RandomForestRegressor model
model = RandomForestRegressor(random_state=42)
model.fit(X_train, y_train)

# Step 9: Evaluate the model
predictions = model.predict(X_test)
mae = mean_absolute_error(y_test, predictions)
print(f'Mean Absolute Error: {mae}')

Missing values in each column:
Date                           0
Usage (minutes)                0
Notifications                  0
Times Opened                   0
DayOfWeek                      0
Month                          0
App_Facebook                   0
App_Instagram                  0
App_LinkedIn                   0
App_Netflix                    0
App_Safari                     0
App_WhatsApp                   0
App_X                          0
Previous_Day_Usage             1
Notifications_x_TimesOpened    0
dtype: int64
Infinite values in each numeric column:
Usage (minutes)                0
Notifications                  0
Times Opened                   0
DayOfWeek                      0
Month                          0
App_Facebook                   0
App_Instagram                  0
App_LinkedIn                   0
App_Netflix                    0
App_Safari                     0
App_WhatsApp                   0
App_X                          0
Previous_Day_Usage       

In [6]:
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
import pandas as pd
from sklearn.preprocessing import MinMaxScaler

# Define the data preprocessing function
def preprocess_data():
    # Load the data
    file_path = 'screentime_analysis.csv'
    data = pd.read_csv(file_path)

    # Convert 'Date' column to datetime and extract features
    data['Date'] = pd.to_datetime(data['Date'])
    data['DayOfWeek'] = data['Date'].dt.dayofweek
    data['Month'] = data['Date'].dt.month

    # Drop the original 'Date' column
    data = data.drop(columns=['Date'])

    # One-hot encode the 'App' column
    data = pd.get_dummies(data, columns=['App'], drop_first=True)

    # Normalize 'Notifications' and 'Times Opened' columns
    scaler = MinMaxScaler()
    data[['Notifications', 'Times Opened']] = scaler.fit_transform(data[['Notifications', 'Times Opened']])

    # Save the preprocessed data to a new file
    preprocessed_path = 'preprocessed_screentime_analysis.csv'
    data.to_csv(preprocessed_path, index=False)
    print(f"Preprocessed data saved to {preprocessed_path}")

# Define the DAG
dag = DAG(
    dag_id='data_preprocessing',
    schedule='@daily',  # Use 'schedule' instead of 'schedule_interval'
    start_date=datetime(2025, 1, 1),
    catchup=False,
    description='A DAG for preprocessing screentime analysis data',
)

# Define the task
preprocess_task = PythonOperator(
    task_id='preprocess',
    python_callable=preprocess_data,
    dag=dag,
)

# Set task dependencies (if you add more tasks later)
# preprocess_task >> another_task

Testing and Running the Pipeline
Testing and Running the Pipeline are meant to be executed in the terminal. These commands should be run in separate terminal windows (or tabs) while your Python environment is active. First, initialize the database:

airflow db init

This command initializes the metadata database used by Airflow to store details about tasks, DAGs, and schedules.

Next, start the Airflow webserver:

airflow webserver --port 8080

This starts the Airflow webserver, which hosts the user interface for managing DAGs and monitoring task execution. The default port is 8080, but you can specify a different port if needed.

Finally, start the Airflow scheduler:

airflow scheduler

The scheduler is responsible for executing tasks as per the DAG schedule. It monitors the tasks and ensures they are executed in the correct order.

To access the Airflow UI, navigate to http://localhost:8080 in your browser. Once there, enable the data_preprocessing DAG and manually trigger it to execute the defined tasks. After the DAG has run successfully, validate the output by checking the preprocessed file to ensure it contains the updated and preprocessed data.

Summary
So, building an MLOps pipeline using Apache Airflow simplifies the end-to-end process of data preprocessing, model training, and deployment. Automating tasks through DAGs ensures efficiency, scalability, and reproducibility in managing machine learning workflows. 