# Real-Time Movie Recommendation System Using Apache Spark and PySpark

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null


In [2]:
!wget -q https://dlcdn.apache.org/spark/spark-3.5.3/spark-3.5.3-bin-hadoop3.tgz
!tar xf  spark-3.5.3-bin-hadoop3.tgz

In [3]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.3-bin-hadoop3"


In [4]:
!pip install -q findspark


In [5]:
!pip install pyspark




In [6]:
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import col


In [7]:
import findspark
findspark.init()

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Movie Recommendation System") \
    .getOrCreate()

print(spark.version)  # Check the Spark version to confirm it's working


3.5.3


*Download the MovieLens dataset. Use the 100k dataset (ml-latest-small).*

In [8]:
from google.colab import files
files.upload()  # Upload 'ratings.csv'

ratings = spark.read.csv('ratings.csv', header=True, inferSchema=True)
ratings.show(5)  # Preview the dataset


Saving ratings.csv to ratings.csv
+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
+------+-------+------+---------+
only showing top 5 rows



In [9]:
ratings = ratings.na.drop()


In [10]:
train, test = ratings.randomSplit([0.8, 0.2], seed=42)


In [11]:
als = ALS(
    userCol="userId",
    itemCol="movieId",
    ratingCol="rating",
    nonnegative=True,
    implicitPrefs=False,
    rank=10
)
model = als.fit(train)


In [12]:
predictions = model.transform(test)
predictions.show(5)


+------+-------+------+---------+----------+
|userId|movieId|rating|timestamp|prediction|
+------+-------+------+---------+----------+
|     1|      6|   4.0|964982224| 4.3193064|
|     1|    101|   5.0|964980868|  3.977095|
|     1|    151|   5.0|964984041| 4.1479044|
|     1|    231|   5.0|964981179|  3.702023|
|     1|    349|   4.0|964982563| 3.6715374|
+------+-------+------+---------+----------+
only showing top 5 rows



In [14]:
predictions.filter(predictions.prediction.isNull()).show()


+------+-------+------+---------+----------+
|userId|movieId|rating|timestamp|prediction|
+------+-------+------+---------+----------+
+------+-------+------+---------+----------+



In [15]:
cleaned_predictions = predictions.na.drop(subset=["prediction"])


In [16]:
evaluator = RegressionEvaluator(
    metricName="rmse",
    labelCol="rating",
    predictionCol="prediction"
)
rmse = evaluator.evaluate(cleaned_predictions)
print(f"Root Mean Square Error (RMSE): {rmse}")


Root Mean Square Error (RMSE): 0.8752339537767505


In [17]:
user_id = 7  # Change to the user ID you want to test
recommendations = model.recommendForAllUsers(5)
recommendations.filter(col("userId") == user_id).show(truncate=False)


+------+-------------------------------------------------------------------------------------------------+
|userId|recommendations                                                                                  |
+------+-------------------------------------------------------------------------------------------------+
|7     |[{1949, 4.7205644}, {6650, 4.6920013}, {102217, 4.5935316}, {86377, 4.579396}, {3022, 4.5675073}]|
+------+-------------------------------------------------------------------------------------------------+

