In [None]:
import tensorflow_datasets as tfds
import pandas as pd
import numpy as np
from sklearn.preprocessing import MinMaxScaler
from google.colab import drive
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, DoubleType
from pyspark.sql.functions import col, explode
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

In [None]:
# Load MovieLens 1M dataset
dataset = tfds.load("movielens/1m-ratings", split="train", as_supervised=False)

# Convert to Pandas DataFrame
df = tfds.as_dataframe(dataset)



Downloading and preparing dataset Unknown size (download: Unknown size, generated: Unknown size, total: Unknown size) to /root/tensorflow_datasets/movielens/1m-ratings/0.1.1...


Dl Completed...: 0 url [00:00, ? url/s]

Dl Size...: 0 MiB [00:00, ? MiB/s]

Extraction completed...: 0 file [00:00, ? file/s]

Generating splits...:   0%|          | 0/1 [00:00<?, ? splits/s]

Generating train examples...: 0 examples [00:00, ? examples/s]

Shuffling /root/tensorflow_datasets/movielens/1m-ratings/incomplete.7KCUI5_0.1.1/movielens-train.tfrecord*...:…

Dataset movielens downloaded and prepared to /root/tensorflow_datasets/movielens/1m-ratings/0.1.1. Subsequent calls will reuse this data.


In [None]:
# Decode binary fields to strings and convert to integers
df["user_id"] = df["user_id"].apply(lambda x: int(x.decode("utf-8")))
df["movie_id"] = df["movie_id"].apply(lambda x: int(x.decode("utf-8")))

df["movie_genres"] = df["movie_genres"].astype(str)
df["movie_title"] = df["movie_title"].astype(str)


In [None]:
df = df.dropna()  # Remove missing values

NameError: name 'df' is not defined

In [None]:
scaler = MinMaxScaler()
df["scaled_rating"] = scaler.fit_transform(df[["user_rating"]])


In [None]:
user_counts = df.groupby("user_id").size()
active_users = user_counts[user_counts >= 5].index  # Keep users with at least 5 ratings
df = df[df["user_id"].isin(active_users)]

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
df = pd.read_csv('/content/drive/My Drive/Apache Spark Project/movielens_dataset.csv')
df.head(10)

Unnamed: 0,bucketized_user_age,movie_genres,movie_id,movie_title,timestamp,user_gender,user_id,user_occupation_label,user_occupation_text,user_rating,user_zip_code,scaled_rating
0,35.0,[0 7],3107,Backdraft (1991),977432193,True,130,18,b'technician/engineer',5.0,b'50021',1.0
1,25.0,[7],2114,"Outsiders, The (1983)",965932967,False,3829,0,b'academic/educator',4.0,b'22307',0.75
2,18.0,[ 4 15],256,Junior (1994),1012103552,False,1265,21,b'writer',1.0,b'49321',0.0
3,18.0,[ 0 10],1389,Jaws 3-D (1983),972004605,True,2896,14,b'sales/marketing',5.0,b'60073',1.0
4,18.0,[0],3635,"Spy Who Loved Me, The (1977)",961180111,True,5264,17,b'college/grad student',4.0,b'15217',0.75
5,25.0,[3 4],2042,D2: The Mighty Ducks (1994),962909420,True,4957,1,b'artist',2.0,b'48197',0.25
6,25.0,[4 7],2289,"Player, The (1992)",961040047,True,5294,1,b'artist',4.0,b'60626',0.75
7,35.0,[ 5 7 9 16],3334,Key Largo (1948),974668206,True,2064,0,b'academic/educator',5.0,b'01020',1.0
8,45.0,[10 15],2901,Phantasm (1979),965357450,True,4121,6,b'executive/managerial',3.0,b'08876',0.5
9,25.0,[15],1206,"Clockwork Orange, A (1971)",970522261,True,3010,12,b'programmer',4.0,b'78759',0.75


In [None]:
# Initialize Spark
spark = SparkSession.builder.appName("MovieRecommendation").getOrCreate()

# Convert Pandas DataFrame to Spark DataFrame
spark_df = spark.createDataFrame(df)

# Ensure correct data types
spark_df = spark_df.withColumn("user_id", col("user_id").cast(IntegerType()))
spark_df = spark_df.withColumn("movie_id", col("movie_id").cast(IntegerType()))
spark_df = spark_df.withColumn("scaled_rating", col("scaled_rating").cast(DoubleType()))

# Show schema
spark_df.printSchema()


root
 |-- bucketized_user_age: double (nullable = true)
 |-- movie_genres: string (nullable = true)
 |-- movie_id: integer (nullable = true)
 |-- movie_title: string (nullable = true)
 |-- timestamp: long (nullable = true)
 |-- user_gender: boolean (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- user_occupation_label: long (nullable = true)
 |-- user_occupation_text: string (nullable = true)
 |-- user_rating: double (nullable = true)
 |-- user_zip_code: string (nullable = true)
 |-- scaled_rating: double (nullable = true)



In [None]:
als = ALS(
    userCol="user_id",
    itemCol="movie_id",
    ratingCol="scaled_rating",
    rank=20,          # Number of latent factors
    maxIter=15,       # More iterations for better convergence
    regParam=0.1,     # Adjust regularization
    nonnegative=True, # Ensures only positive ratings
    coldStartStrategy="drop"  # Prevents issues with unseen movies/users
)

# Train ALS model
model = als.fit(spark_df)
model

ALSModel: uid=ALS_25b6b3095b40, rank=20

In [None]:
predictions = model.transform(spark_df)
predictions.show(5)

+-------------------+------------+--------+--------------------+----------+-----------+-------+---------------------+--------------------+-----------+-------------+-------------+----------+
|bucketized_user_age|movie_genres|movie_id|         movie_title| timestamp|user_gender|user_id|user_occupation_label|user_occupation_text|user_rating|user_zip_code|scaled_rating|prediction|
+-------------------+------------+--------+--------------------+----------+-----------+-------+---------------------+--------------------+-----------+-------------+-------------+----------+
|               18.0|         [4]|    2779|Heaven Can Wait (...|1042207567|       true|   5156|                   14|  b'sales/marketing'|        5.0|     b'10024'|          1.0|0.65893596|
|               18.0|     [12 14]|     914| My Fair Lady (1964)| 974736894|      false|   1580|                   17|b'college/grad st...|        3.0|     b'76201'|          0.5| 0.5890366|
|               35.0|     [ 0 18]|    3767|Missing

In [None]:
evaluator = RegressionEvaluator(
    metricName="rmse",
    labelCol="scaled_rating",
    predictionCol="prediction"
)

rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE): {rmse}")


Root Mean Squared Error (RMSE): 0.24763627575071684


In [None]:
# Get top 10 recommendations for each user
user_recommendations = model.recommendForAllUsers(10)

# Show recommendations
user_recommendations.show(10, truncate=False)


+-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|user_id|recommendations                                                                                                                                                                                 |
+-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|1      |[{3382, 1.2744665}, {557, 1.0122381}, {989, 0.9517216}, {578, 0.9148153}, {3233, 0.90853053}, {787, 0.898469}, {572, 0.8965089}, {2503, 0.891472}, {3172, 0.8857642}, {1830, 0.877764}]         |
|3      |[{3382, 1.2076895}, {557, 0.9591261}, {989, 0.90178674}, {578, 0.86681515}, {3233, 0.86086166}, {787, 0.85132766}, {572, 0.84947157}, {2503, 0.8446972}, {3172, 0.83929}, {1830, 0.

In [None]:
# Explode recommendations (convert list to rows)
recommendations = user_recommendations.withColumn("recommendation", explode(col("recommendations")))
recommendations = recommendations.drop("recommendations")

# Extract movie_id and predicted rating
recommendations = recommendations.select(
    col("user_id"),
    col("recommendation.movie_id").alias("movie_id"),
    col("recommendation.rating").alias("predicted_rating")
)
recommendations.show(25, truncate=False)

+-------+--------+----------------+
|user_id|movie_id|predicted_rating|
+-------+--------+----------------+
|1      |3382    |1.2744665       |
|1      |557     |1.0122381       |
|1      |989     |0.9517216       |
|1      |578     |0.9148153       |
|1      |3233    |0.90853053      |
|1      |787     |0.898469        |
|1      |572     |0.8965089       |
|1      |2503    |0.891472        |
|1      |3172    |0.8857642       |
|1      |1830    |0.877764        |
|3      |3382    |1.2076895       |
|3      |557     |0.9591261       |
|3      |989     |0.90178674      |
|3      |578     |0.86681515      |
|3      |3233    |0.86086166      |
|3      |787     |0.85132766      |
|3      |572     |0.84947157      |
|3      |2503    |0.8446972       |
|3      |3172    |0.83929         |
|3      |1830    |0.83170956      |
|5      |3382    |0.92054486      |
|5      |557     |0.73131555      |
|5      |989     |0.68758935      |
|5      |578     |0.6609295       |
|5      |3233    |0.65638477

In [None]:
# Select movie_id and movie_title in PySpark format
movies_df = spark_df.select("movie_id", "movie_title")
movies_df.show(7)
movies_df.printSchema()

+--------+--------------------+
|movie_id|         movie_title|
+--------+--------------------+
|    3107|    Backdraft (1991)|
|    2114|Outsiders, The (1...|
|     256|       Junior (1994)|
|    1389|     Jaws 3-D (1983)|
|    3635|Spy Who Loved Me,...|
|    2042|D2: The Mighty Du...|
|    2289|  Player, The (1992)|
+--------+--------------------+
only showing top 7 rows

root
 |-- movie_id: integer (nullable = true)
 |-- movie_title: string (nullable = true)



In [None]:
# Join with movie titles to get names instead of IDs
recommendations = recommendations.join(movies_df, on="movie_id", how="left")

# Show recommended movie names for each user
recommendations.select("user_id", "movie_id", "movie_title", "predicted_rating").show(25, truncate=False)

+-------+--------+----------------------------------+----------------+
|user_id|movie_id|movie_title                       |predicted_rating|
+-------+--------+----------------------------------+----------------+
|1      |572     |Foreign Student (1994)            |0.8965089       |
|1      |572     |Foreign Student (1994)            |0.8965089       |
|3      |572     |Foreign Student (1994)            |0.84947157      |
|3      |572     |Foreign Student (1994)            |0.84947157      |
|2      |572     |Foreign Student (1994)            |0.80537343      |
|2      |572     |Foreign Student (1994)            |0.80537343      |
|4      |572     |Foreign Student (1994)            |0.8877477       |
|4      |572     |Foreign Student (1994)            |0.8877477       |
|1      |578     |Hour of the Pig, The (1993)       |0.9148153       |
|1      |578     |Hour of the Pig, The (1993)       |0.9148153       |
|3      |578     |Hour of the Pig, The (1993)       |0.86681515      |
|3    

In [None]:
# Get top N movies for a specific user
user_id = 130
recommendations.filter(f"user_id == {user_id}").show(truncate=False)

+--------+-------+----------------+-----------------------------------------+
|movie_id|user_id|predicted_rating|movie_title                              |
+--------+-------+----------------+-----------------------------------------+
|572     |130    |0.99707603      |Foreign Student (1994)                   |
|572     |130    |0.99707603      |Foreign Student (1994)                   |
|578     |130    |1.0174356       |Hour of the Pig, The (1993)              |
|578     |130    |1.0174356       |Hour of the Pig, The (1993)              |
|787     |130    |0.99925584      |Gate of Heavenly Peace, The (1995)       |
|787     |130    |0.99925584      |Gate of Heavenly Peace, The (1995)       |
|787     |130    |0.99925584      |Gate of Heavenly Peace, The (1995)       |
|989     |130    |1.0584822       |Schlafes Bruder (Brother of Sleep) (1995)|
|3172    |130    |0.98512596      |Ulysses (Ulisse) (1954)                  |
|3382    |130    |1.417452        |Song of Freedom (1936)       

In [None]:
print("Movie Recommendation System - Final Performance")
print(f"Dataset Used: MovieLens 1M")
print(f"Number of Users: {df['user_id'].nunique()}")
print(f"Number of Movies: {df['movie_id'].nunique()}")
print(f"Final RMSE (Error): 0.24")

Movie Recommendation System - Final Performance
Dataset Used: MovieLens 1M
Number of Users: 6040
Number of Movies: 3706
Final RMSE (Error): 0.24
Model Successfully Trained Using Apache Spark ALS!
