In [4]:
!pip install pyspark redis prometheus-client



In [5]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

from sklearn.metrics.pairwise import cosine_similarity
from sklearn.feature_extraction.text import TfidfVectorizer

import os
from scipy.sparse import coo_matrix

In [6]:
from pyspark.sql import SparkSession, Row
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import col, explode

# Initializing Spark session
spark = SparkSession.builder.appName("MovieRecommender").getOrCreate()

In [7]:
def load_data():
      df = spark.read.option("header", True).csv("/content/merged_movie_df.csv")
      df = df.withColumn("userId", col("userId").cast("int")) \
              .withColumn("movieId", col("movieId").cast("int")) \
              .withColumn("rating", col("rating").cast("float"))
      df = df.dropna(subset=["userId", "movieId", "rating"])
      return df

In [8]:
df = load_data()

In [9]:
df.show()

+-------+----------------+--------------------+------+------+------+--------------------+----------+------+--------------------+
|movieId|           title|              genres|imdbId|tmdbId|userId|                 tag| timestamp|rating|           genre_tag|
+-------+----------------+--------------------+------+------+------+--------------------+----------+------+--------------------+
|      1|Toy Story (1995)|Adventure, Animat...|114709| 862.0|   109|            children|1257988285|   4.0|Adventure, Animat...|
|      1|Toy Story (1995)|Adventure, Animat...|114709| 862.0|   109|              Disney|1257988287|   4.0|Adventure, Animat...|
|      1|Toy Story (1995)|Adventure, Animat...|114709| 862.0|   909|           animation|1248249498|   2.0|Adventure, Animat...|
|      1|Toy Story (1995)|Adventure, Animat...|114709| 862.0|   909|            children|1248249511|   2.0|Adventure, Animat...|
|      1|Toy Story (1995)|Adventure, Animat...|114709| 862.0|   909|              Disney|12482494

In [10]:
def train_als_model(df):
    als = ALS(
        userCol="userId",
        itemCol="movieId",
        ratingCol="rating",  #replace with time stamp if you have
        coldStartStrategy="drop",
        nonnegative=True,
        implicitPrefs=False
    )
    model = als.fit(df)
    return model

In [11]:
def load_movies():
    movies_df = spark.read.option("header", True).csv("/content/merged_movie_df.csv") \
        .select(col("movieId").cast("int"), "title") \
        .dropna(subset=["movieId", "title"]).dropDuplicates(["movieId"])
    return movies_df

In [12]:
movies_df = load_movies()
movies_df.show()

+-------+--------------------+
|movieId|               title|
+-------+--------------------+
|      1|    Toy Story (1995)|
|      2|      Jumanji (1995)|
|      3|Grumpier Old Men ...|
|      4|Waiting to Exhale...|
|      5|Father of the Bri...|
|      6|         Heat (1995)|
|      7|      Sabrina (1995)|
|      9| Sudden Death (1995)|
|     10|    GoldenEye (1995)|
|     11|American Presiden...|
|     12|Dracula: Dead and...|
|     13|        Balto (1995)|
|     14|        Nixon (1995)|
|     15|Cutthroat Island ...|
|     16|       Casino (1995)|
|     17|Sense and Sensibi...|
|     18|   Four Rooms (1995)|
|     19|Ace Ventura: When...|
|     20|  Money Train (1995)|
|     21|   Get Shorty (1995)|
+-------+--------------------+
only showing top 20 rows



In [13]:
def get_recommendations_for_user(model, movies_df, user_id, top_n=10):
    user_df = spark.createDataFrame([Row(userId=int(user_id))], schema=["userId"])
    recs = model.recommendForUserSubset(user_df, top_n)
    # Output of recs is (userId, [{movieId,rating}])
    flat_recs = recs.withColumn("rec", explode("recommendations")).select("userId", col("rec.movieId"), col("rec.rating"))
    final_recs = flat_recs.join(movies_df, on="movieId", how="left")
    return final_recs

In [14]:
df = load_data()
model = train_als_model(df)
movies_df = load_movies()

user_id = 109
recommendations = get_recommendations_for_user(model, movies_df, user_id)
recommendations.show(truncate=False)

+-------+------+---------+-------------------------------------------+
|movieId|userId|rating   |title                                      |
+-------+------+---------+-------------------------------------------+
|1861   |109   |7.0972385|Junk Mail (Budbringeren) (1997)            |
|276711 |109   |5.899877 |The Great Warrior Skanderbeg (1953)        |
|128727 |109   |5.685911 |Bizarre (2015)                             |
|79863  |109   |5.5419154|Black Water (2007)                         |
|5605   |109   |5.5345907|Ratcatcher (1999)                          |
|124253 |109   |5.4385486|Sailor of the King (1953)                  |
|213952 |109   |5.4023843|Ciao Ni! (1979)                            |
|227    |109   |5.4013085|Drop Zone (1994)                           |
|248836 |109   |5.3409667|Here Today (2021)                          |
|38376  |109   |5.3202715|Everybody's Fine (Stanno tutti bene) (1990)|
+-------+------+---------+-------------------------------------------+



In [15]:
def get_recommendations_with_explanation(model, movies_df, user_id, top_n=10):
    print(f"\n Creating DataFrame for userId = {user_id}")
    user_df = spark.createDataFrame([Row(userId=int(user_id))], schema=["userId"])
    user_df.show()

    print(f"\n Getting top {top_n} recommendations from ALS model")
    recs = model.recommendForUserSubset(user_df, top_n)
    recs.show(truncate=False)

    # Output of recs is (userId, [{movieId,rating}])

    print("\n Exploding the 'recommendations' array into flat rows")
    flat_recs = recs.withColumn("rec", explode("recommendations")).select("userId", col("rec.movieId"), col("rec.rating"))
    flat_recs.show()

    print("\n Joining with movie titles to get readable output")
    final_recs = flat_recs.join(movies_df, on="movieId", how="left")
    final_recs.show(truncate=False)

    return final_recs


In [16]:
user_id = 109
recommendations = get_recommendations_with_explanation(model, movies_df, user_id)
recommendations.show(truncate=False)


 Creating DataFrame for userId = 109
+------+
|userId|
+------+
|   109|
+------+


 Getting top 10 recommendations from ALS model
+------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|userId|recommendations                                                                                                                                                                                        |
+------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|109   |[{1861, 7.0972385}, {276711, 5.899877}, {128727, 5.685911}, {79863, 5.5419154}, {5605, 5.5345907}, {124253, 5.4385486}, {213952, 5.4023843}, {227, 5.4013085}, {248836, 5.3409667}, {38376, 5.3202715}]|
+------+------------------------

**Saving the model**

In [17]:
model.save("movie_rec_model")

In [33]:
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import col, explode
from pyspark.ml.recommendation import ALSModel

# Initialize Spark session
spark = SparkSession.builder.appName("MovieRecommendation").getOrCreate()

# Load trained ALS model
model = ALSModel.load("movie_rec_model")

# Load movie titles
movies_df = spark.read.option("header", True).csv("/content/merged_movie_df.csv") \
    .select(col("movieId").cast("int"), "title") \
    .dropna(subset=["movieId", "title"]).dropDuplicates(["movieId"])

# Define the recommendation function
def get_recommendations_for_user(model, movies_df, user_id, top_n=10):
    user_df = spark.createDataFrame([Row(userId=int(user_id))], schema=["userId"])
    recs = model.recommendForUserSubset(user_df, top_n)
    flat_recs = recs.withColumn("rec", explode("recommendations")) \
                    .select("userId", col("rec.movieId"), col("rec.rating"))
    final_recs = flat_recs.join(movies_df, on="movieId", how="left")
    return final_recs

# Example usage:
existing_user_id = 109
recs_existing = get_recommendations_for_user(model, movies_df, user_id=existing_user_id, top_n=10)
recs_existing.show(truncate=False)

# Stop Spark
spark.stop()

+-------+------+---------+-------------------------------------------+
|movieId|userId|rating   |title                                      |
+-------+------+---------+-------------------------------------------+
|1861   |109   |7.0972385|Junk Mail (Budbringeren) (1997)            |
|276711 |109   |5.899877 |The Great Warrior Skanderbeg (1953)        |
|128727 |109   |5.685911 |Bizarre (2015)                             |
|79863  |109   |5.5419154|Black Water (2007)                         |
|5605   |109   |5.5345907|Ratcatcher (1999)                          |
|124253 |109   |5.4385486|Sailor of the King (1953)                  |
|213952 |109   |5.4023843|Ciao Ni! (1979)                            |
|227    |109   |5.4013085|Drop Zone (1994)                           |
|248836 |109   |5.3409667|Here Today (2021)                          |
|38376  |109   |5.3202715|Everybody's Fine (Stanno tutti bene) (1990)|
+-------+------+---------+-------------------------------------------+



**Top rated movies**

In [36]:
from pyspark.sql.functions import avg

# Initialize Spark session
spark = SparkSession.builder.appName("MovieRecommendation").getOrCreate()

# Load the data
df = spark.read.option("header", True).csv("/content/merged_movie_df.csv")
df = df.withColumn("userId", col("userId").cast("int")) \
        .withColumn("movieId", col("movieId").cast("int")) \
        .withColumn("rating", col("rating").cast("float"))
df = df.dropna(subset=["userId", "movieId", "rating"])

# Load movie titles
movies_df = spark.read.option("header", True).csv("/content/merged_movie_df.csv") \
    .select(col("movieId").cast("int"), "title") \
    .dropna(subset=["movieId", "title"]).dropDuplicates(["movieId"])

def get_top_rated_movies(df, movies_df, top_n=10):
    # Average rating per movie
    avg_ratings = df.groupBy("movieId").agg(avg("rating").alias("avg_rating"))

    # Join with movie titles
    top_movies = avg_ratings.join(movies_df, on="movieId", how="left") \
                            .orderBy(col("avg_rating").desc()) \
                            .limit(top_n)
    return top_movies

top_movies = get_top_rated_movies(df, movies_df)
top_movies.show(truncate=False)

+-------+--------------------+---------------------------------+
|movieId|avg_rating          |title                            |
+-------+--------------------+---------------------------------+
|119856 |2.884550934E8       |Hands of Steel (1986)            |
|91054  |1.327509145E8       |Batman (1943)                    |
|4890   |2.8026226324074075E7|Shallow Hal (2001)               |
|41889  |5.0                 |Lili (1953)                      |
|7786   |5.0                 |Genghis Blues (1999)             |
|2697   |5.0                 |My Son the Fanatic (1997)        |
|27636  |5.0                 |Herr Lehmann (2003)              |
|3795   |5.0                 |Five Senses, The (1999)          |
|3352   |5.0                 |Brown's Requiem (1998)           |
|5651   |5.0                 |Incredible Mr. Limpet, The (1964)|
+-------+--------------------+---------------------------------+

