

#MLOps Pipeline using Apache Spark


Intsllation and importaion of necessary requirements

In [3]:
!pip install pyspark



In [4]:
from pyspark.sql import SparkSession


In [5]:
from pyspark.sql.functions import col, when, sum,to_date, to_timestamp,dayofweek,month,lag, col, expr

from pyspark.ml.feature import StringIndexer, OneHotEncoder, MinMaxScaler, VectorAssembler

from pyspark.sql.window import Window


Inintializing spark session introducing our data to spark.

In [6]:

spark = SparkSession.builder.appName("ScreenTimeAnalysis").getOrCreate()

data = spark.read.csv("/content/drive/MyDrive/Data Sets/screentime_analysis.csv", header=True, inferSchema=True)

data.show()


+----------+---------+---------------+-------------+------------+
|      Date|      App|Usage (minutes)|Notifications|Times Opened|
+----------+---------+---------------+-------------+------------+
|2024-08-07|Instagram|             81|           24|          57|
|2024-08-08|Instagram|             90|           30|          53|
|2024-08-26|Instagram|            112|           33|          17|
|2024-08-22|Instagram|             82|           11|          38|
|2024-08-12|Instagram|             59|           47|          16|
|2024-08-28|Instagram|             50|           42|          26|
|2024-08-26|Instagram|             51|           58|          41|
|2024-08-27|Instagram|             71|           69|          30|
|2024-08-15|Instagram|             91|           71|          61|
|2024-08-30|Instagram|             80|           64|          66|
|2024-08-25|Instagram|             32|           60|          21|
|2024-08-21|Instagram|            102|           48|          32|
|2024-08-0

#Data preprocessing and feature engineering

Checking for null Values

In [7]:
# checkimg null values

null_counts = data.select([sum(col(c).isNull().cast("int")).alias(c) for c in data.columns]).show()


+----+---+---------------+-------------+------------+
|Date|App|Usage (minutes)|Notifications|Times Opened|
+----+---+---------------+-------------+------------+
|   0|  0|              0|            0|           0|
+----+---+---------------+-------------+------------+



Checking dublicates

In [8]:
print(f"dublicate records in our data set are: {( data.count() - data.dropDuplicates().count())}")

dublicate records in our data set are: 0


In [9]:
data = data.withColumn("Date", to_timestamp("Date"))

data.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- App: string (nullable = true)
 |-- Usage (minutes): integer (nullable = true)
 |-- Notifications: integer (nullable = true)
 |-- Times Opened: integer (nullable = true)



In [10]:
data = data.withColumn("DayOfWeek", dayofweek("Date"))
data.show()


+-------------------+---------+---------------+-------------+------------+---------+
|               Date|      App|Usage (minutes)|Notifications|Times Opened|DayOfWeek|
+-------------------+---------+---------------+-------------+------------+---------+
|2024-08-07 00:00:00|Instagram|             81|           24|          57|        4|
|2024-08-08 00:00:00|Instagram|             90|           30|          53|        5|
|2024-08-26 00:00:00|Instagram|            112|           33|          17|        2|
|2024-08-22 00:00:00|Instagram|             82|           11|          38|        5|
|2024-08-12 00:00:00|Instagram|             59|           47|          16|        2|
|2024-08-28 00:00:00|Instagram|             50|           42|          26|        4|
|2024-08-26 00:00:00|Instagram|             51|           58|          41|        2|
|2024-08-27 00:00:00|Instagram|             71|           69|          30|        3|
|2024-08-15 00:00:00|Instagram|             91|           71|    

In [11]:
data = data.withColumn("Month", month("Date"))
data.show()


+-------------------+---------+---------------+-------------+------------+---------+-----+
|               Date|      App|Usage (minutes)|Notifications|Times Opened|DayOfWeek|Month|
+-------------------+---------+---------------+-------------+------------+---------+-----+
|2024-08-07 00:00:00|Instagram|             81|           24|          57|        4|    8|
|2024-08-08 00:00:00|Instagram|             90|           30|          53|        5|    8|
|2024-08-26 00:00:00|Instagram|            112|           33|          17|        2|    8|
|2024-08-22 00:00:00|Instagram|             82|           11|          38|        5|    8|
|2024-08-12 00:00:00|Instagram|             59|           47|          16|        2|    8|
|2024-08-28 00:00:00|Instagram|             50|           42|          26|        4|    8|
|2024-08-26 00:00:00|Instagram|             51|           58|          41|        2|    8|
|2024-08-27 00:00:00|Instagram|             71|           69|          30|        3|    8|

In [12]:
# performing one hot encoding

indexer = StringIndexer(inputCol="App", outputCol="AppIndex")

data = indexer.fit(data).transform(data)

data.show(5)

+-------------------+---------+---------------+-------------+------------+---------+-----+--------+
|               Date|      App|Usage (minutes)|Notifications|Times Opened|DayOfWeek|Month|AppIndex|
+-------------------+---------+---------------+-------------+------------+---------+-----+--------+
|2024-08-07 00:00:00|Instagram|             81|           24|          57|        4|    8|     2.0|
|2024-08-08 00:00:00|Instagram|             90|           30|          53|        5|    8|     2.0|
|2024-08-26 00:00:00|Instagram|            112|           33|          17|        2|    8|     2.0|
|2024-08-22 00:00:00|Instagram|             82|           11|          38|        5|    8|     2.0|
|2024-08-12 00:00:00|Instagram|             59|           47|          16|        2|    8|     2.0|
+-------------------+---------+---------------+-------------+------------+---------+-----+--------+
only showing top 5 rows



In [13]:
encoder = OneHotEncoder(inputCol="AppIndex", outputCol="AppEncoded", dropLast=True)

data = encoder.fit(data).transform(data)

data = data.drop("App", "AppIndex")

data.show(10)

+-------------------+---------------+-------------+------------+---------+-----+-------------+
|               Date|Usage (minutes)|Notifications|Times Opened|DayOfWeek|Month|   AppEncoded|
+-------------------+---------------+-------------+------------+---------+-----+-------------+
|2024-08-07 00:00:00|             81|           24|          57|        4|    8|(7,[2],[1.0])|
|2024-08-08 00:00:00|             90|           30|          53|        5|    8|(7,[2],[1.0])|
|2024-08-26 00:00:00|            112|           33|          17|        2|    8|(7,[2],[1.0])|
|2024-08-22 00:00:00|             82|           11|          38|        5|    8|(7,[2],[1.0])|
|2024-08-12 00:00:00|             59|           47|          16|        2|    8|(7,[2],[1.0])|
|2024-08-28 00:00:00|             50|           42|          26|        4|    8|(7,[2],[1.0])|
|2024-08-26 00:00:00|             51|           58|          41|        2|    8|(7,[2],[1.0])|
|2024-08-27 00:00:00|             71|           69

In [14]:
assembler = VectorAssembler(inputCols=["Notifications", "Times Opened"], outputCol="features")

data = assembler.transform(data)

data.show(10)

+-------------------+---------------+-------------+------------+---------+-----+-------------+-----------+
|               Date|Usage (minutes)|Notifications|Times Opened|DayOfWeek|Month|   AppEncoded|   features|
+-------------------+---------------+-------------+------------+---------+-----+-------------+-----------+
|2024-08-07 00:00:00|             81|           24|          57|        4|    8|(7,[2],[1.0])|[24.0,57.0]|
|2024-08-08 00:00:00|             90|           30|          53|        5|    8|(7,[2],[1.0])|[30.0,53.0]|
|2024-08-26 00:00:00|            112|           33|          17|        2|    8|(7,[2],[1.0])|[33.0,17.0]|
|2024-08-22 00:00:00|             82|           11|          38|        5|    8|(7,[2],[1.0])|[11.0,38.0]|
|2024-08-12 00:00:00|             59|           47|          16|        2|    8|(7,[2],[1.0])|[47.0,16.0]|
|2024-08-28 00:00:00|             50|           42|          26|        4|    8|(7,[2],[1.0])|[42.0,26.0]|
|2024-08-26 00:00:00|             51|

In [15]:
scaler = MinMaxScaler(inputCol="features", outputCol="scaled_features")

scaler_model = scaler.fit(data)

data = scaler_model.transform(data)

data.show(10)

+-------------------+---------------+-------------+------------+---------+-----+-------------+-----------+--------------------+
|               Date|Usage (minutes)|Notifications|Times Opened|DayOfWeek|Month|   AppEncoded|   features|     scaled_features|
+-------------------+---------------+-------------+------------+---------+-----+-------------+-----------+--------------------+
|2024-08-07 00:00:00|             81|           24|          57|        4|    8|(7,[2],[1.0])|[24.0,57.0]|[0.16326530612244...|
|2024-08-08 00:00:00|             90|           30|          53|        5|    8|(7,[2],[1.0])|[30.0,53.0]|[0.20408163265306...|
|2024-08-26 00:00:00|            112|           33|          17|        2|    8|(7,[2],[1.0])|[33.0,17.0]|[0.22448979591836...|
|2024-08-22 00:00:00|             82|           11|          38|        5|    8|(7,[2],[1.0])|[11.0,38.0]|[0.07482993197278...|
|2024-08-12 00:00:00|             59|           47|          16|        2|    8|(7,[2],[1.0])|[47.0,16.0

In [16]:
window_spec = Window.orderBy("Date")

data = data.withColumn("Previous_Day_Usage", lag("Usage (minutes)", 1).over(window_spec))

data = data.withColumn("Notifications_x_TimesOpened", col("Notifications") * col("Times Opened"))

data.select("Date", "Usage (minutes)", "Previous_Day_Usage", "Notifications", "Times Opened", "Notifications_x_TimesOpened").show(10)


+-------------------+---------------+------------------+-------------+------------+---------------------------+
|               Date|Usage (minutes)|Previous_Day_Usage|Notifications|Times Opened|Notifications_x_TimesOpened|
+-------------------+---------------+------------------+-------------+------------+---------------------------+
|2024-08-01 00:00:00|             39|              NULL|           11|          13|                        143|
|2024-08-01 00:00:00|             41|                39|            9|          14|                        126|
|2024-08-01 00:00:00|             30|                41|           27|           9|                        243|
|2024-08-01 00:00:00|             17|                30|           41|          19|                        779|
|2024-08-01 00:00:00|              8|                17|            1|          14|                         14|
|2024-08-01 00:00:00|             20|                 8|            3|           6|                     

In [17]:
data.show(10)

+-------------------+---------------+-------------+------------+---------+-----+-------------+-----------+--------------------+------------------+---------------------------+
|               Date|Usage (minutes)|Notifications|Times Opened|DayOfWeek|Month|   AppEncoded|   features|     scaled_features|Previous_Day_Usage|Notifications_x_TimesOpened|
+-------------------+---------------+-------------+------------+---------+-----+-------------+-----------+--------------------+------------------+---------------------------+
|2024-08-01 00:00:00|             39|           11|          13|        5|    8|    (7,[],[])|[11.0,13.0]|[0.07482993197278...|              NULL|                        143|
|2024-08-01 00:00:00|             41|            9|          14|        5|    8|    (7,[],[])| [9.0,14.0]|[0.06122448979591...|                39|                        126|
|2024-08-01 00:00:00|             30|           27|           9|        5|    8|    (7,[],[])| [27.0,9.0]|[0.18367346938775..

In [18]:
# AS expected, first cell of `Previous_Day_Usage` has got a null values.
# Now to handle this, it will be filled as follows:

data = data.withColumn("Previous_Day_Usage", when(col("Previous_Day_Usage").isNull(), 0).otherwise(col("Previous_Day_Usage")))

data.show()


+-------------------+---------------+-------------+------------+---------+-----+-------------+-----------+--------------------+------------------+---------------------------+
|               Date|Usage (minutes)|Notifications|Times Opened|DayOfWeek|Month|   AppEncoded|   features|     scaled_features|Previous_Day_Usage|Notifications_x_TimesOpened|
+-------------------+---------------+-------------+------------+---------+-----+-------------+-----------+--------------------+------------------+---------------------------+
|2024-08-01 00:00:00|             39|           11|          13|        5|    8|    (7,[],[])|[11.0,13.0]|[0.07482993197278...|                 0|                        143|
|2024-08-01 00:00:00|             41|            9|          14|        5|    8|    (7,[],[])| [9.0,14.0]|[0.06122448979591...|                39|                        126|
|2024-08-01 00:00:00|             30|           27|           9|        5|    8|    (7,[],[])| [27.0,9.0]|[0.18367346938775..

In [19]:
# lets check if still, there is null value created during the overall process.
null_counts = data.select([sum(col(c).isNull().cast("int")).alias(c) for c in data.columns]).show()


+----+---------------+-------------+------------+---------+-----+----------+--------+---------------+------------------+---------------------------+
|Date|Usage (minutes)|Notifications|Times Opened|DayOfWeek|Month|AppEncoded|features|scaled_features|Previous_Day_Usage|Notifications_x_TimesOpened|
+----+---------------+-------------+------------+---------+-----+----------+--------+---------------+------------------+---------------------------+
|   0|              0|            0|           0|        0|    0|         0|       0|              0|                 0|                          0|
+----+---------------+-------------+------------+---------+-----+----------+--------+---------------+------------------+---------------------------+



Saving this processed data in parquet formate to ensure to ensure this analysis is stored.
Parquet preserves data types, including AppEncoded, so it's the best choice

In [20]:
data.write.parquet("/content/drive/MyDrive/Data Sets/processed_screentime_analysis.parquet", mode="overwrite")


#Model Training

importing necessary libraries

In [21]:
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline

Now prepare the paramters and training includes 10 Steps


In [22]:
# Step 1: Drop unnecessary columns
feature_cols = [col for col in data.columns if col not in ["Usage (minutes)", "Date"]]

# Step 2: Convert features into a single vector column
assembler = VectorAssembler(inputCols=feature_cols, outputCol="out_features")

# Step 3: Split data into training and testing (80-20 split)
train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)

# Step 4: Define the Random Forest Regressor
rf = RandomForestRegressor(featuresCol="out_features", labelCol="Usage (minutes)", numTrees=100, seed=42)

# Step 5: Create a Pipeline (Assembler → Model)
pipeline = Pipeline(stages=[assembler, rf])

# Step 6: Train the model
model = pipeline.fit(train_data)

# Step 7: Make predictions
predictions = model.transform(test_data)

# Step 8: Evaluate using Mean Absolute Error (MAE)
evaluator = RegressionEvaluator(labelCol="Usage (minutes)", predictionCol="prediction", metricName="mae")
mae = evaluator.evaluate(predictions)

# Step 9: Print MAE
print(f"Mean Absolute Error: {mae}")

# Step 10: Show some predictions
predictions.select("Usage (minutes)", "prediction").show(5)

Mean Absolute Error: 9.388130378117081
+---------------+------------------+
|Usage (minutes)|        prediction|
+---------------+------------------+
|             10|18.085812854109147|
|             30|25.525878605983245|
|             41|19.480783358299743|
|             13|18.106343040535545|
|             32|27.735439188776827|
+---------------+------------------+
only showing top 5 rows



Saving the model in to the drive so it can be used in deployement

In [23]:
model.save("/content/drive/MyDrive/Data Sets/screentime_rf_model")
