<a href="https://colab.research.google.com/github/LINA-LY/AI/blob/main/MLOps_Pipeline_Case_Study.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Building an MLOps Pipeline using Apache Airflow

In [None]:
import pandas as pd
from sklearn.preprocessing import MinMaxScaler

#lead the dataset
data = pd.read_csv('/content/screentime_analysis.csv')

#check for missing values and duplicates
print(data.isnull().sum())
print(data.duplicated().sum())

#convert data column to datatime and extract feartures
data['Date'] = pd.to_datetime(data['Date'])
data['DayOfWeek'] = data['Date'].dt.dayofweek
data['Month'] = data['Date'].dt.month

#encode the categorical 'App' column using one_hot encoding
data = pd.get_dummies(data, columns=['App'], drop_first=True)

#scale numerical features using MinMaxScaler
scaler = MinMaxScaler()
data[['Notifications' , 'Times Opened']] = scaler.fit_transform(data[['Notifications', 'Times Opened']])

#feature enginneering
data['Previous_Day_Usage'] = data['Usage (minutes)'].shift(1)
data['Notifications_x_TimesOpened'] = data['Notifications'] * data['Times Opened']

#save the preprocessed data to file
data.to_csv('preprocessed_screentime_analysis.csv', index=False)

Date               0
App                0
Usage (minutes)    0
Notifications      0
Times Opened       0
dtype: int64
0


**Training the Model**

In [None]:
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_absolute_error

#split data into features and target variable
X = data.drop(columns=['Usage (minutes)', 'Date'])
y = data['Usage (minutes)']

#tain-test split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

#train the model
model = RandomForestRegressor(random_state=42)
model.fit(X_train, y_train)

#evaluate the model
predictions = model.predict(X_test)
mae  = mean_absolute_error(y_test, predictions)
print(f'Mean Absolute Error: {mae}')

Mean Absolute Error: 15.398500000000002


**Now, we will define the DAG and task to build the pipeline:**

In [None]:
pip install apache-airflow



In [None]:
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

#define the data preprocessing function
def proprecess_data():
  file_path = '/content/screentime_analysis.csv'
  data = pd.read_csv(file_path)

  data['Data'] = pd.to_datetime(data['Date'])
  data['DayOfWeek'] = data['Date'].dt.dayofweek
  data['Month'] = data['Date'].dt.month

  data = data.drop(columns=['Date'])

  data = pd.get_dummies(data, columns=['App'], drop_first=True)

  scaler = MinMaxScaler()
  data[['Notifications', 'Times Opened']] = scaler.fit_transform(data[['Notifications', 'Times Opened']])
  preprocessed_path = '/content/preprocessed_screentime_analysis.csv'
  data.to_csv(preprocessed_path, index= False )
  print(f"preprocessed data saved to {preprocessed_path}")


#define the DAG
dag = DAG(
    dag_id='data_preprecessing',
    schedule_interval='@daily',
    start_date=datetime(2025,1,1),
    catchup=False,
)

#define the task
preprocess_task = PythonOperator(
    task_id= 'preprocess',
    python_callable = proprecess_data,
    dag=dag,
)

  dag = DAG(


In [None]:
!pip install apache-airflow

Collecting apache-airflow
  Downloading apache_airflow-2.10.5-py3-none-any.whl.metadata (45 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/45.4 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m45.4/45.4 kB[0m [31m1.6 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting alembic<2.0,>=1.13.1 (from apache-airflow)
  Downloading alembic-1.15.2-py3-none-any.whl.metadata (7.3 kB)
Collecting argcomplete>=1.10 (from apache-airflow)
  Downloading argcomplete-3.6.2-py3-none-any.whl.metadata (16 kB)
Collecting asgiref>=2.3.0 (from apache-airflow)
  Downloading asgiref-3.8.1-py3-none-any.whl.metadata (9.3 kB)
Collecting colorlog>=6.8.2 (from apache-airflow)
  Downloading colorlog-6.9.0-py3-none-any.whl.metadata (10 kB)
Collecting configupdater>=3.1.1 (from apache-airflow)
  Downloading ConfigUpdater-3.2-py2.py3-none-any.whl.metadata (10 kB)
Collecting connexion<3.0,>=2.14.2 (from connexion[flask]<3.0,>=2.14.2->apache-airf

In [None]:
!airflow db init

DB: sqlite:////root/airflow/airflow.db
[[34m2025-04-13T15:37:47.390+0000[0m] {[34mmigration.py:[0m207} INFO[0m - Context impl [1mSQLiteImpl[22m.[0m
[[34m2025-04-13T15:37:47.392+0000[0m] {[34mmigration.py:[0m210} INFO[0m - Will assume [1mnon-transactional[22m DDL.[0m
INFO  [alembic.runtime.migration] Context impl SQLiteImpl.
INFO  [alembic.runtime.migration] Will assume non-transactional DDL.
INFO  [alembic.runtime.migration] Running stamp_revision  -> 5f2621c13b39
WARNI [airflow.models.crypto] empty cryptography key - values will not be stored encrypted.
Initialization done


In [None]:
!airflow webserver --port 8080

[[34m2025-04-13T15:38:47.674+0000[0m] {[34mconfiguration.py:[0m2112} INFO[0m - Creating new FAB webserver config file in: [1m/root/airflow/webserver_config.py[22m[0m
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
Running the Gunicorn Server with:
Workers: 4 sync
Host: 0.0.0.0:8080
Timeout: 120
Logfiles: - -
Access Logformat: 
[[34m2025-04-13T15:38:52.588+0000[0m] {[34moverride.py:[0m1526} INFO[0m - Inserted Role: [1mAdmin[22m[0m
[[34m2025-04-13T15:38:52.658+0000[0m] {[34moverride.py:[0m1930} INFO[0m - Created Permission View: [1mcan edit on Passwords[22m[0m
[[34m2025-04-13T15:38:52.668+0000[0m] {[34moverride.py:[0m1981} INFO[0m - Added Permission [1mcan edit on Passwords[22m to role [1mAdmin[22m[0m
[[34m2025-04-13T15:38:52.685+0000[0m] {[34moverride.py:[0m1930} I

In [None]:
!airflow scheduler

  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
[[34m2025-04-13T15:41:14.712+0000[0m] {[34mutils.py:[0m162} INFO[0m - NumExpr defaulting to 2 threads.[0m
[[34m2025-04-13T15:41:14.879+0000[0m] {[34mexecutor_loader.py:[0m258} INFO[0m - Loaded executor: [1mSequentialExecutor[22m[0m
[2025-04-13 15:41:15 +0000] [2179] [INFO] Starting gunicorn 23.0.0
[2025-04-13 15:41:15 +0000] [2179] [INFO] Listening at: http://[::]:8793 (2179)
[2025-04-13 15:41:15 +0000] [2179] [INFO] Using worker: sync
[[34m2025-04-13T15:41:15.040+0000[0m] {[34mscheduler_job_runner.py:[0m950} INFO[0m - Starting the scheduler[0m
[[34m2025-04-13T15:41:15.041+0000[0m] {[34mscheduler_job_runner.py:[0m957} INFO[0m - Processing each file at most -1 times[0m
[2025-04-13 15:41:15 +0000] [2180] [INFO] Booting worker with