In [1]:
# Step 1: Install Java and PySpark
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!pip install -q pyspark

0% [Working]            Hit:1 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease
Hit:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
Hit:3 http://security.ubuntu.com/ubuntu jammy-security InRelease
Hit:4 http://archive.ubuntu.com/ubuntu jammy InRelease
Hit:5 https://r2u.stat.illinois.edu/ubuntu jammy InRelease
Hit:6 http://archive.ubuntu.com/ubuntu jammy-updates InRelease
Hit:7 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:8 http://archive.ubuntu.com/ubuntu jammy-backports InRelease
Hit:9 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Hit:10 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Reading package lists... Done
W: Skipping acquire of configured file 'main/source/Sources' as repository 'https://r2u.stat.illinois.edu/ubuntu jammy InRelease' does not seem to provide it (sources.list entry misspelt?)


In [2]:
# Step 2: Set JAVA_HOME environment variable
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"

In [3]:
# Step 3: Create a SparkSession
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .appName("Colab Spark Example") \
    .getOrCreate()

In [4]:
# Step 4: Verify Spark session
print("Spark Session Initialized")
print("Spark Version:", spark.version)

Spark Session Initialized
Spark Version: 3.5.1


In [7]:
# Step 5: Create a sample CSV file
sample_data = """id,name,department,salary
1,Alice,Engineering,70000
2,Bob,Sales,50000
3,Charlie,HR,45000
4,David,Engineering,80000
5,Eva,Marketing,60000
"""
with open("sample.csv", "w") as f:
    f.write(sample_data)

In [8]:
# Step 6: Read CSV using Spark
df = spark.read.option("header", "true").csv("sample.csv")
df.show(5)

df.printSchema()

+---+-------+-----------+------+
| id|   name| department|salary|
+---+-------+-----------+------+
|  1|  Alice|Engineering| 70000|
|  2|    Bob|      Sales| 50000|
|  3|Charlie|         HR| 45000|
|  4|  David|Engineering| 80000|
|  5|    Eva|  Marketing| 60000|
+---+-------+-----------+------+

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- salary: string (nullable = true)



In [11]:
# 3️⃣ Define Data Paths
base_path = "/content"

users_path = f"{base_path}/users.csv"
movies_path = f"{base_path}/movies.csv"
ratings_path = f"{base_path}/ratings.csv"
watch_history_path = f"{base_path}/watch_history.csv"
search_logs_path = f"{base_path}/search_logs.csv"

In [12]:
# 4️⃣ Load CSVs as Spark DataFrames
users_df = spark.read.option("header", True).csv(users_path)
movies_df = spark.read.option("header", True).csv(movies_path)
ratings_df = spark.read.option("header", True).csv(ratings_path)
watch_history_df = spark.read.option("header", True).csv(watch_history_path)
search_logs_df = spark.read.option("header", True).csv(search_logs_path)

In [13]:
# 5️⃣ Display Sample Records
print("Users:")
users_df.show(5)

print("Movies:")
movies_df.show(5)

print("Ratings:")
ratings_df.show(5)

print("Watch History:")
watch_history_df.show(5)

print("Search Logs:")
search_logs_df.show(5)

Users:
+-------+-------+---+---------------+
|user_id|   name|age|       location|
+-------+-------+---+---------------+
|      1|  Kevin| 56|  Nicholsonport|
|      2|Cassidy| 46|      Josephton|
|      3|  Kelly| 32|West Sherriside|
|      4| Samuel| 60|    East Andrew|
|      5|  Tracy| 25|    Ryanchester|
+-------+-------+---+---------------+
only showing top 5 rows

Movies:
+--------+--------------+------+------------+
|movie_id|         title| genre|release_year|
+--------+--------------+------+------------+
|       1|Process garden|Sci-Fi|        1994|
|       2|   Too defense|Action|        2023|
|       3|  Detail plant|Action|        1987|
|       4|    See couple|Sci-Fi|        1989|
|       5|  Front really| Drama|        1984|
+--------+--------------+------+------------+
only showing top 5 rows

Ratings:
+-------+--------+------+-------------------+
|user_id|movie_id|rating|          timestamp|
+-------+--------+------+-------------------+
|   9429|    4455|     3|2023-07

# Netflix Data Cleaning and Preparation with PySpark

In [19]:
from pyspark.sql.functions import rand, when, col

# Add a random number column and map to gender
users_df = users_df.withColumn("random_val", rand())

users_df = users_df.withColumn(
    "gender",
    when(col("random_val") < 0.4, "Male")
    .when(col("random_val") < 0.8, "Female")
    .otherwise("Other")
).drop("random_val")  # Drop helper column


In [20]:
users_df.printSchema()
users_df.show(5)

root
 |-- user_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- age: string (nullable = false)
 |-- location: string (nullable = true)
 |-- gender: string (nullable = false)

+-------+-------+---+---------------+------+
|user_id|   name|age|       location|gender|
+-------+-------+---+---------------+------+
|      1|  Kevin| 56|  Nicholsonport|Female|
|      2|Cassidy| 46|      Josephton|Female|
|      3|  Kelly| 32|West Sherriside|Female|
|      4| Samuel| 60|    East Andrew| Other|
|      5|  Tracy| 25|    Ryanchester|  Male|
+-------+-------+---+---------------+------+
only showing top 5 rows



In [21]:
# ----------------------------
# Step 1: Handle Missing Values
# ----------------------------

# Fill missing age with average age
avg_age = users_df.selectExpr("avg(age)").first()[0]
users_df = users_df.fillna({"age": int(avg_age)})

# Fill missing gender with 'Unknown'
users_df = users_df.fillna({"gender": "Unknown"})

# Drop records with missing critical values in watch history or ratings
watch_history_df = watch_history_df.dropna(subset=["user_id", "movie_id"])
ratings_df = ratings_df.dropna(subset=["user_id", "movie_id", "rating"])

In [24]:
# ----------------------------
# Step 2: Data Type Conversions
# ----------------------------
from pyspark.sql.types import TimestampType
# Convert timestamp strings to TimestampType
# Rename 'watch_time' to 'timestamp' and convert to TimestampType
watch_history_df = watch_history_df \
    .withColumn("timestamp", col("watch_time").cast(TimestampType())) \
    .drop("watch_time")
ratings_df = ratings_df.withColumn("timestamp", col("timestamp").cast(TimestampType()))
search_logs_df = search_logs_df.withColumn("timestamp", col("timestamp").cast(TimestampType()))

In [27]:
# Normalize genres into array (if comma-separated)
from pyspark.sql.functions import split, col

movies_df = movies_df.withColumn("primary_genre", split(col("genre"), "\|")[0])

In [28]:
movies_df.head(5)

[Row(movie_id='1', title='Process garden', genre='Sci-Fi', release_year='1994', primary_genre='Sci-Fi'),
 Row(movie_id='2', title='Too defense', genre='Action', release_year='2023', primary_genre='Action'),
 Row(movie_id='3', title='Detail plant', genre='Action', release_year='1987', primary_genre='Action'),
 Row(movie_id='4', title='See couple', genre='Sci-Fi', release_year='1989', primary_genre='Sci-Fi'),
 Row(movie_id='5', title='Front really', genre='Drama', release_year='1984', primary_genre='Drama')]

In [29]:
# ----------------------------
# Step 4: Remove Duplicates
# ----------------------------

users_df = users_df.dropDuplicates()
movies_df = movies_df.dropDuplicates()
ratings_df = ratings_df.dropDuplicates()
watch_history_df = watch_history_df.dropDuplicates()
search_logs_df = search_logs_df.dropDuplicates()

In [31]:
# ----------------------------
# Step 5: Save Cleaned Data (Optional)
# ----------------------------
users_df.write.csv(f"{base_path}/cleaned_users.csv", header=True, mode="overwrite")
movies_df.write.csv(f"{base_path}/cleaned_movies.csv", header=True, mode="overwrite")
ratings_df.write.csv(f"{base_path}/cleaned_ratings.csv", header=True, mode="overwrite")
watch_history_df.write.csv(f"{base_path}/cleaned_watch_history.csv", header=True, mode="overwrite")
search_logs_df.write.csv(f"{base_path}/cleaned_search_logs.csv", header=True, mode="overwrite")

In [32]:
# ----------------------------
# Preview Cleaned Data
# ----------------------------

print("Users:")
users_df.show(5)

print("Movies:")
movies_df.show(5)

print("Ratings:")
ratings_df.show(5)

print("Watch History:")
watch_history_df.show(5)

print("Search Logs:")
search_logs_df.show(5)

Users:
+-------+--------+---+---------------+------+
|user_id|    name|age|       location|gender|
+-------+--------+---+---------------+------+
|     44|  Steven| 21|    Port Andrew| Other|
|     99|  Stacey| 18|Port Nathanstad|Female|
|    246|   Bryce| 24|    New Timothy|  Male|
|    330|Brittany| 36|   South Steven|Female|
|    706| Crystal| 45|      Emmaville|  Male|
+-------+--------+---+---------------+------+
only showing top 5 rows

Movies:
+--------+--------------------+-----------+------------+-------------+
|movie_id|               title|      genre|release_year|primary_genre|
+--------+--------------------+-----------+------------+-------------+
|     510|    Guy network wide|Documentary|        2006|  Documentary|
|     757|Condition live re...|     Action|        1987|       Action|
|    1289|Cultural all best...|    Romance|        1993|      Romance|
|    1506|Offer fall genera...|     Action|        1994|       Action|
|    1780|                 Age|Documentary|      

Feature Engineering

In [34]:
# Read cleaned datasets
users_df = spark.read.csv(f"{base_path}/cleaned_users.csv", header=True, inferSchema=True)
movies_df = spark.read.csv(f"{base_path}/cleaned_movies.csv", header=True, inferSchema=True)
watch_history_df = spark.read.csv(f"{base_path}/cleaned_watch_history.csv", header=True, inferSchema=True)
ratings_df = spark.read.csv(f"{base_path}/cleaned_ratings.csv", header=True, inferSchema=True)

In [45]:
from pyspark.sql.functions import avg, count, col, split, explode, when, lit, round, concat_ws

In [41]:
# ✅ Feature 1: Average Rating Per User
user_avg_rating_df = ratings_df.groupBy("user_id") \
    .agg(avg("rating").alias("avg_user_rating"))

In [40]:
# ✅ Feature 2: Average Rating Per Movie
movie_avg_rating_df = ratings_df.groupBy("movie_id") \
    .agg(avg("rating").alias("avg_movie_rating"))

# ✅ Feature 3: Total Number of Ratings Per Movie
movie_rating_count_df = ratings_df.groupBy("movie_id") \
    .agg(count("rating").alias("num_ratings"))

# ✅ Feature 4: Watch Count Per Movie
watch_count_df = watch_history_df.groupBy("movie_id") \
    .agg(count("timestamp").alias("watch_count"))

# ✅ Feature 5: Explode genres for genre-level aggregation
movies_exploded_df = movies_df.withColumn("genre_array", split(col("genre"), "\\|"))
movies_exploded_df = movies_exploded_df.withColumn("genre", explode(col("genre_array")))

# ✅ Feature 6: Number of Movies Watched Per User
user_watch_count_df = watch_history_df.groupBy("user_id") \
    .agg(count("movie_id").alias("movies_watched"))

# ✅ Feature 7: Rating Deviation (difference from movie avg)
rating_with_movie_avg = ratings_df.join(movie_avg_rating_df, on="movie_id", how="left")
rating_with_movie_avg = rating_with_movie_avg.withColumn(
    "rating_deviation",
    col("rating") - col("avg_movie_rating")
)

In [43]:
movies_exploded_df.head()

Row(movie_id=510, title='Guy network wide', genre='Documentary', release_year=2006, primary_genre='Documentary', genre_array=['Documentary'])

In [46]:
# ✅ Save the engineered features as CSV for downstream processing
user_avg_rating_df.write.csv(f"{base_path}/features/user_avg_rating.csv", header=True, mode="overwrite")
movie_avg_rating_df.write.csv(f"{base_path}/features/movie_avg_rating.csv", header=True, mode="overwrite")
movie_rating_count_df.write.csv(f"{base_path}/features/movie_rating_count.csv", header=True, mode="overwrite")
watch_count_df.write.csv(f"{base_path}/features/movie_watch_count.csv", header=True, mode="overwrite")
user_watch_count_df.write.csv(f"{base_path}/features/user_watch_count.csv", header=True, mode="overwrite")
rating_with_movie_avg.write.csv(f"{base_path}/features/rating_with_deviation.csv", header=True, mode="overwrite")
# ✅ Fix array column before CSV write
movies_exploded_to_save = movies_exploded_df.withColumn("genres_joined", concat_ws("|", "genre_array")) \
                                            .drop("genre_array")
movies_exploded_to_save.write.csv(f"{base_path}/features/movies_exploded_genres.csv", header=True, mode="overwrite")
print("✅ Feature Engineering Complete.")

✅ Feature Engineering Complete.


Machine Learning

In [47]:
# 🔍 What is ALS (Alternating Least Squares)?
# ALS (Alternating Least Squares) is a collaborative filtering algorithm used primarily for recommender systems—like Netflix, Amazon, Spotify, etc.

# 📌 ALS in Simple Terms:
# ALS works by trying to learn latent factors for users and items (e.g., movies), so it can predict how likely a user is to like a particular item they haven’t interacted with yet.
# ✅ Prepare Dataset for ALS Model
# We'll use the rating_with_deviation_df for training ALS
# ✅ Load Feature Data
user_watch_count_df = spark.read.csv(f"{base_path}/features/user_watch_count.csv", header=True, inferSchema=True)
rating_with_deviation_df = spark.read.csv(f"{base_path}/features/rating_with_deviation.csv", header=True, inferSchema=True)
movies_exploded_genres_df = spark.read.csv(f"{base_path}/features/movies_exploded_genres.csv", header=True, inferSchema=True)

print("✅ Feature datasets loaded")

als_df = rating_with_deviation_df.select(
    col("user_id").cast("integer"),
    col("movie_id").cast("integer"),
    col("rating").cast("float")
).dropna()

✅ Feature datasets loaded


In [48]:
# ✅ Train-Test Split
(training_data, test_data) = als_df.randomSplit([0.8, 0.2], seed=42)

In [50]:
# ✅ ALS Model
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

als = ALS(
    maxIter=10,
    regParam=0.1,
    userCol="user_id",
    itemCol="movie_id",
    ratingCol="rating",
    coldStartStrategy="drop",
    nonnegative=True
)

In [51]:
# Train model
model = als.fit(training_data)
print("✅ ALS model trained")

✅ ALS model trained


In [53]:
# ✅ Evaluate Model
predictions = model.transform(test_data)
evaluator = RegressionEvaluator(
    metricName="rmse",
    labelCol="rating",
    predictionCol="prediction"
)
rmse = evaluator.evaluate(predictions)
print(f"✅ Root-mean-square error = {rmse:.4f}")

✅ Root-mean-square error = 1.7602


In [58]:
# ✅ Save model if needed
model.save(f"{base_path}/models/als_model")

print("🎉 Model training complete!")

Py4JJavaError: An error occurred while calling o767.save.
: java.io.IOException: Path /content/models/als_model already exists. To overwrite it, please use write.overwrite().save(path) for Scala and use write().overwrite().save(path) for Java and Python.
	at org.apache.spark.ml.util.FileSystemOverwrite.handleOverwrite(ReadWrite.scala:683)
	at org.apache.spark.ml.util.MLWriter.save(ReadWrite.scala:167)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)


In [65]:
from pyspark.sql.functions import explode, col
from pyspark.ml.recommendation import ALSModel

# ✅ Load trained ALS model
als_model = ALSModel.load(f"{base_path}/models/als_model")


# ✅ Generate top 10 recommendations for each user
user_recommendations_df = als_model.recommendForAllUsers(10)

# ✅ Flatten the nested 'recommendations' column
recommendations_flat_df = user_recommendations_df \
    .withColumn("rec", explode("recommendations")) \
    .select(
        col("user_id"),
        col("rec.movie_id").alias("movie_id"),
        col("rec.rating").alias("predicted_rating")
    )

# ✅ Save as CSV
recommendations_flat_df.write.csv(f"{base_path}/model/recommendations.csv", header=True, mode="overwrite")

print("✅ Recommendations file written to /model/recommendations.csv")


✅ Recommendations file written to /model/recommendations.csv


# Data Model Preparation & Export

In [66]:
# 📂 Load Cleaned and Feature Data
users_df = spark.read.csv(f"{base_path}/cleaned_users.csv", header=True, inferSchema=True)
movies_df = spark.read.csv(f"{base_path}/cleaned_movies.csv", header=True, inferSchema=True)
ratings_df = spark.read.csv(f"{base_path}/cleaned_ratings.csv", header=True, inferSchema=True)
recommendations_df = spark.read.csv(f"{base_path}/model/recommendations.csv", header=True, inferSchema=True)

In [68]:
recommendations_df.head(5)

[Row(user_id=1, movie_id=1194, predicted_rating=5.8309617),
 Row(user_id=1, movie_id=221, predicted_rating=5.8091965),
 Row(user_id=1, movie_id=4156, predicted_rating=5.7953186),
 Row(user_id=1, movie_id=4122, predicted_rating=5.7914705),
 Row(user_id=1, movie_id=4810, predicted_rating=5.7662034)]

In [69]:
# Load Feature Files
user_watch_count_df = spark.read.csv(f"{base_path}/features/user_watch_count.csv", header=True, inferSchema=True)
rating_with_deviation_df = spark.read.csv(f"{base_path}/features/rating_with_deviation.csv", header=True, inferSchema=True)


In [70]:
# ✅ Join & Prepare Final Data Model
final_model_df = recommendations_df \
    .join(users_df, on="user_id", how="left") \
    .join(movies_df, on="movie_id", how="left") \
    .join(user_watch_count_df, on="user_id", how="left") \
    .join(rating_with_deviation_df, on=["user_id", "movie_id"], how="left")


In [71]:
final_model_df.head(5)

[Row(user_id=1, movie_id=1194, predicted_rating=5.8309617, name='Kevin', age=56, location='Nicholsonport', gender='Female', title='Company young', genre='Comedy', release_year=2002, primary_genre='Comedy', movies_watched=9, rating=None, timestamp=None, avg_movie_rating=None, rating_deviation=None),
 Row(user_id=1, movie_id=221, predicted_rating=5.8091965, name='Kevin', age=56, location='Nicholsonport', gender='Female', title='Report remain miss', genre='Action', release_year=1980, primary_genre='Action', movies_watched=9, rating=None, timestamp=None, avg_movie_rating=None, rating_deviation=None),
 Row(user_id=1, movie_id=4156, predicted_rating=5.7953186, name='Kevin', age=56, location='Nicholsonport', gender='Female', title='Ground loss', genre='Romance', release_year=1988, primary_genre='Romance', movies_watched=9, rating=None, timestamp=None, avg_movie_rating=None, rating_deviation=None),
 Row(user_id=1, movie_id=4122, predicted_rating=5.7914705, name='Kevin', age=56, location='Nicho

In [75]:
# ✅ Reorder & Select Columns for Export
# ✅ Reorder & Select Columns for Export
export_df = final_model_df.select(
    "user_id", "name", "age", "gender", "location",
    "movie_id", "title", "genre", "release_year",
    "predicted_rating", "avg_movie_rating", "rating_deviation", "movies_watched"
)
# 📆 Save to CSV for Power BI or Front-End Ingestion
export_path = f"{base_path}/export/final_recommendation_model.csv"
export_df.write.csv(export_path, header=True, mode="overwrite")

print("✅ Data model exported successfully to:", export_path)

✅ Data model exported successfully to: /content/export/final_recommendation_model.csv


In [76]:
export_df.head(5)

[Row(user_id=1, name='Kevin', age=56, gender='Female', location='Nicholsonport', movie_id=1194, title='Company young', genre='Comedy', release_year=2002, predicted_rating=5.8309617, avg_movie_rating=None, rating_deviation=None, movies_watched=9),
 Row(user_id=1, name='Kevin', age=56, gender='Female', location='Nicholsonport', movie_id=221, title='Report remain miss', genre='Action', release_year=1980, predicted_rating=5.8091965, avg_movie_rating=None, rating_deviation=None, movies_watched=9),
 Row(user_id=1, name='Kevin', age=56, gender='Female', location='Nicholsonport', movie_id=4156, title='Ground loss', genre='Romance', release_year=1988, predicted_rating=5.7953186, avg_movie_rating=None, rating_deviation=None, movies_watched=9),
 Row(user_id=1, name='Kevin', age=56, gender='Female', location='Nicholsonport', movie_id=4122, title='Minute positive outside yet', genre='Drama', release_year=1990, predicted_rating=5.7914705, avg_movie_rating=None, rating_deviation=None, movies_watched=

Power BI / Dashboard Integration Preparation

In [77]:
# 📂 Load Exported Recommendation Data
export_path = "/content/export/final_recommendation_model.csv"
dashboard_df = spark.read.csv(export_path, header=True, inferSchema=True)

# ✅ Perform Sample Aggregations for Dashboard
from pyspark.sql.functions import avg, countDistinct

# 🎯 Average predicted rating per genre
genre_avg_df = dashboard_df.groupBy("genre").agg(avg("predicted_rating").alias("avg_predicted_rating"))

# 👥 Top 10 most active users (by movies_watched)
top_users_df = dashboard_df.select("user_id", "name", "movies_watched") \
    .dropDuplicates(["user_id"]) \
    .orderBy("movies_watched", ascending=False) \
    .limit(10)

# 🎬 Top 10 recommended movies (highest predicted rating)
top_movies_df = dashboard_df.select("movie_id", "title", "predicted_rating") \
    .orderBy("predicted_rating", ascending=False) \
    .limit(10)

# 🌍 Distribution by location
location_distribution_df = dashboard_df.groupBy("location").agg(countDistinct("user_id").alias("unique_users"))

# 📁 Export Aggregations for Dashboard
genre_avg_df.write.csv("/content/export/dashboard_genre_avg.csv", header=True, mode="overwrite")
top_users_df.write.csv("/content/export/dashboard_top_users.csv", header=True, mode="overwrite")
top_movies_df.write.csv("/content/export/dashboard_top_movies.csv", header=True, mode="overwrite")
location_distribution_df.write.csv("/content/export/dashboard_location_distribution.csv", header=True, mode="overwrite")

print("✅ Dashboard data files exported successfully.")

✅ Dashboard data files exported successfully.
