In [76]:
# Import libraries
!apt-get install openjdk-8-jdk-headless -qq # Java Run
!pip install pyspark # Importing spark




In [77]:
# We have created spark session
from pyspark.sql import SparkSession # Importing spark session as it creates a dbs

In [78]:
spark = SparkSession.builder \
    .appName("Scalable_Movie_Recommender") \
    .getOrCreate()

spark


In [79]:
ratings = spark.read.csv(
    "ratings.csv",
    header=True,
    inferSchema=True
)

movies = spark.read.csv(
    "movies.csv",
    header=True,
    inferSchema=True
)


In [80]:
ratings.show(5)
ratings.printSchema()
ratings.count()


+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|      2|   3.5|1112486027|
|     1|     29|   3.5|1112484676|
|     1|     32|   3.5|1112484819|
|     1|     47|   3.5|1112484727|
|     1|     50|   3.5|1112484580|
+------+-------+------+----------+
only showing top 5 rows
root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)



20000263

In [81]:
movies.show(5)
movies.printSchema()
movies.count()

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows
root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)



27278

In [82]:
ratings_clean = ratings.dropna(subset=["userId", "movieId", "rating"])

In [83]:
from pyspark.sql.functions import count, avg

ratings_clean.select(
    count("rating").alias("num_ratings"),
    count("userId").alias("num_users"),
    count("movieId").alias("num_movies"),
    avg("rating").alias("avg_rating")
).show()

ratings_clean.groupBy("rating").count().orderBy("rating").show()

ratings_clean.groupBy("movieId").count().orderBy("count", ascending=False).show(10)


+-----------+---------+----------+------------------+
|num_ratings|num_users|num_movies|        avg_rating|
+-----------+---------+----------+------------------+
|   20000263| 20000263|  20000263|3.5255285642993797|
+-----------+---------+----------+------------------+

+------+-------+
|rating|  count|
+------+-------+
|   0.5| 239125|
|   1.0| 680732|
|   1.5| 279252|
|   2.0|1430997|
|   2.5| 883398|
|   3.0|4291193|
|   3.5|2200156|
|   4.0|5561926|
|   4.5|1534824|
|   5.0|2898660|
+------+-------+

+-------+-----+
|movieId|count|
+-------+-----+
|    296|67310|
|    356|66172|
|    318|63366|
|    593|63299|
|    480|59715|
|    260|54502|
|    110|53769|
|    589|52244|
|   2571|51334|
|    527|50054|
+-------+-----+
only showing top 10 rows


In [84]:
from pyspark.sql.functions import count

user_counts = ratings_clean.groupBy("userId").count()
movie_counts = ratings_clean.groupBy("movieId").count()

ratings_filtered = ratings_clean \
    .join(user_counts.filter("count >= 5"), "userId") \
    .join(movie_counts.filter("count >= 5"), "movieId")


In [85]:
train, test = ratings_clean.randomSplit([0.8, 0.2], seed=42)
# Now doing train & test data split where 80 % train and 20 % test

In [86]:
from pyspark.sql.functions import avg, lit

# Compute global mean rating from training data
global_avg = train.select(avg("rating")).first()[0]

# Predict global average for all test samples
baseline_preds = test.withColumn("prediction", lit(global_avg))

from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(
    metricName="rmse",
    labelCol="rating",
    predictionCol="prediction"
)

baseline_rmse = evaluator.evaluate(baseline_preds)
print(f"Baseline RMSE (Global Average): {baseline_rmse:.4f}")


Baseline RMSE (Global Average): 1.0523


In [87]:
from pyspark.ml.recommendation import ALS

als = ALS(
    userCol="userId",
    itemCol="movieId",
    ratingCol="rating",
    maxIter=10,
    regParam=0.1,
    rank=10,
    nonnegative=True,
    coldStartStrategy="drop"
)


In [88]:
model = als.fit(train)
# fitting the model into 80% of the train dataset

In [89]:
predictions = model.transform(test)
predictions.show(5)
# We have created the new column after implementing ALS and predicting the ratings.

+------+-------+------+----------+----------+
|userId|movieId|rating| timestamp|prediction|
+------+-------+------+----------+----------+
|     1|     32|   3.5|1112484819| 3.8077385|
|     1|    151|   4.0|1094785734| 3.3544195|
|     1|    253|   4.0|1112484940| 3.6520727|
|     1|    337|   3.5|1094785709| 3.5002086|
|     1|    919|   3.5|1094785621| 3.6985352|
+------+-------+------+----------+----------+
only showing top 5 rows


In [90]:
from pyspark.ml.evaluation import RegressionEvaluator # we have calculate RMSE between ratings

evaluator = RegressionEvaluator(
    metricName="rmse",
    labelCol="rating",
    predictionCol="prediction"
)

rmse = evaluator.evaluate(predictions)
print(f"Root-mean-square error (RMSE) = {rmse:.4f}")


Root-mean-square error (RMSE) = 0.8139


In [91]:
user_recs = model.recommendForAllUsers(5)
user_recs.show(5, truncate=False)


+------+-------------------------------------------------------------------------------------------------------+
|userId|recommendations                                                                                        |
+------+-------------------------------------------------------------------------------------------------------+
|1     |[{126219, 6.939781}, {121029, 5.805707}, {129536, 5.629725}, {117907, 5.297702}, {120821, 5.2549405}]  |
|3     |[{126219, 8.483379}, {121029, 6.4881673}, {129536, 6.3033276}, {117907, 5.9485145}, {98275, 5.8542247}]|
|5     |[{126219, 7.408825}, {121029, 6.608641}, {77736, 6.037557}, {129536, 5.837345}, {129243, 5.554977}]    |
|6     |[{126219, 7.6811395}, {3226, 5.644737}, {121029, 5.338091}, {83435, 5.201769}, {117907, 5.1567435}]    |
|9     |[{126219, 7.568107}, {121029, 5.2128115}, {129536, 4.9689784}, {120821, 4.6102343}, {98275, 4.609886}] |
+------+----------------------------------------------------------------------------------------

In [92]:
from pyspark.sql.functions import explode

recs_exploded = user_recs.select(
    col("userId"),
    explode(col("recommendations")).alias("rec")
)


In [93]:
final_recs = recs_exploded.select(
    col("userId"),
    col("rec.movieId").alias("movieId"),
    col("rec.rating").alias("predicted_rating")
)


In [94]:
final_recs_with_titles = final_recs.join(
    movies,
    on="movieId",
    how="inner"
).select("userId", "movieId", "title", "genres", "predicted_rating")


In [95]:
from pyspark.sql.functions import when, col, round

final_recs_clipped = final_recs_with_titles.withColumn(
    "predicted_rating",
    when(col("predicted_rating") > 5, 5) \
    .when(col("predicted_rating") < 0, 0) \
    .otherwise(col("predicted_rating"))
)

# Optional: round to 2 decimals
final_recs_clipped = final_recs_clipped.withColumn(
    "predicted_rating", round(col("predicted_rating"), 2)
)

# Show final recommendations
final_recs_clipped.show(truncate=False)


+------+-------+----------------------------------------+---------------------+----------------+
|userId|movieId|title                                   |genres               |predicted_rating|
+------+-------+----------------------------------------+---------------------+----------------+
|1     |126219 |Marihuana (1936)                        |Documentary|Drama    |5.0             |
|1     |121029 |No Distance Left to Run (2010)          |Documentary          |5.0             |
|1     |129536 |Code Name Coq Rouge (1989)              |(no genres listed)   |5.0             |
|1     |117907 |My Brother Tom (2001)                   |Drama                |5.0             |
|1     |120821 |The War at Home (1979)                  |Documentary|War      |5.0             |
|3     |126219 |Marihuana (1936)                        |Documentary|Drama    |5.0             |
|3     |121029 |No Distance Left to Run (2010)          |Documentary          |5.0             |
|3     |129536 |Code Name Coq 

In [96]:
spark.stop()