## Making use of Collaborative Filtering and then making recommendation on the Movies Dataset

So I am using The ALS library in Pyspark and my data is already uploaded in the bucket

### Reading the Data

In [1]:
from pyspark.sql import SparkSession

In [2]:
# Initialize Spark session
spark = SparkSession.builder \
    .appName("ALS Recommendation Example") \
    .getOrCreate()


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/04/01 21:42:56 INFO SparkEnv: Registering MapOutputTracker
24/04/01 21:42:56 INFO SparkEnv: Registering BlockManagerMaster
24/04/01 21:42:56 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
24/04/01 21:42:56 INFO SparkEnv: Registering OutputCommitCoordinator


In [3]:
# file from Google Cloud Storage
file_path = "gs://pysparkbucket-jm/IMDB Top 250 Movies.csv"

In [4]:
df = spark.read.csv(file_path, header=True, inferSchema=True)

                                                                                

In [5]:
df.show(10)

+----+--------------------+----+------+--------------------+-----------+--------+--------------------+---------+----------+--------------------+--------------------+--------------------+
|rank|                name|year|rating|               genre|certificate|run_time|             tagline|   budget|box_office|               casts|           directors|             writers|
+----+--------------------+----+------+--------------------+-----------+--------+--------------------+---------+----------+--------------------+--------------------+--------------------+
|   1|The Shawshank Red...|1994|   9.3|               Drama|          R|  2h 22m|Fear can hold you...| 25000000|  28884504|Tim Robbins,Morga...|      Frank Darabont|Stephen King,Fran...|
|   2|       The Godfather|1972|   9.2|         Crime,Drama|          R|  2h 55m|An offer you can'...|  6000000| 250341816|Marlon Brando,Al ...|Francis Ford Coppola|Mario Puzo,Franci...|
|   3|     The Dark Knight|2008|   9.0|  Action,Crime,Drama|     

### Data Preprocessing

In [6]:
# Drop irrelevant columns 
df = df.drop("tagline", "certificate", "budget", "box_office", "casts", "directors", "writers")


In [7]:
# Handle Missing Values 
df = df.dropna()

In [8]:
# Convert 'run_time' column to minutes
from pyspark.sql.functions import regexp_extract, col
hours = regexp_extract(col("run_time"), r"(\d+)h", 1).cast("int") * 60
minutes = regexp_extract(col("run_time"), r"(\d+)m", 1).cast("int")
df = df.withColumn("run_time_minutes", hours + minutes).drop("run_time")

In [9]:
df.show(10)

+----+--------------------+----+------+--------------------+----------------+
|rank|                name|year|rating|               genre|run_time_minutes|
+----+--------------------+----+------+--------------------+----------------+
|   1|The Shawshank Red...|1994|   9.3|               Drama|             142|
|   2|       The Godfather|1972|   9.2|         Crime,Drama|             175|
|   3|     The Dark Knight|2008|   9.0|  Action,Crime,Drama|             152|
|   4|The Godfather Par...|1974|   9.0|         Crime,Drama|             202|
|   5|        12 Angry Men|1957|   9.0|         Crime,Drama|              96|
|   6|    Schindler's List|1993|   9.0|Biography,Drama,H...|             195|
|   7|The Lord of the R...|2003|   9.0|Action,Adventure,...|             201|
|   8|        Pulp Fiction|1994|   8.9|         Crime,Drama|             154|
|   9|The Lord of the R...|2001|   8.8|Action,Adventure,...|             178|
|  10|The Good, the Bad...|1966|   8.8|   Adventure,Western|    

### Creating user item Matrix

In [10]:
# Convert 'name' and 'genre' column to numerical indices
from pyspark.ml.feature import StringIndexer

In [11]:
name_indexer = StringIndexer(inputCol="name", outputCol="user")
genre_indexer = StringIndexer(inputCol="genre", outputCol="item")

In [12]:
df = name_indexer.fit(df).transform(df)
df = genre_indexer.fit(df).transform(df)

                                                                                

In [13]:
# Rename columns
df = df.withColumnRenamed("rating", "rating_float")
df = df.withColumnRenamed("user", "userId")
df = df.withColumnRenamed("item", "movieId")

In [15]:
df.show(10)

+----+--------------------+----+------------+--------------------+----------------+------+-------+
|rank|                name|year|rating_float|               genre|run_time_minutes|userId|movieId|
+----+--------------------+----+------------+--------------------+----------------+------+-------+
|   1|The Shawshank Red...|1994|         9.3|               Drama|             142| 214.0|    0.0|
|   2|       The Godfather|1972|         9.2|         Crime,Drama|             175| 187.0|    1.0|
|   3|     The Dark Knight|2008|         9.0|  Action,Crime,Drama|             152| 179.0|   10.0|
|   4|The Godfather Par...|1974|         9.0|         Crime,Drama|             202| 188.0|    1.0|
|   5|        12 Angry Men|1957|         9.0|         Crime,Drama|              96|   0.0|    1.0|
|   6|    Schindler's List|1993|         9.0|Biography,Drama,H...|             195| 153.0|    3.0|
|   7|The Lord of the R...|2003|         9.0|Action,Adventure,...|             201| 206.0|    7.0|
|   8|    

### Train the ALS model

In [17]:
from pyspark.ml.recommendation import ALS

In [18]:
# Initialize ALS model
als = ALS(maxIter=10, regParam=0.1, userCol="userId", itemCol="movieId", ratingCol="rating_float",
          coldStartStrategy="drop")

In [19]:
# Fit ALS model on the DataFrame
model = als.fit(df)

### Making Recommendation

In [20]:
# Make recommendations for all users
userRecs = model.recommendForAllUsers(10)

In [21]:
# Show recommendations
userRecs.show(truncate=False)



+------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|userId|recommendations                                                                                                                                                         |
+------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|0     |[{1, 8.968684}, {67, 6.996734}, {9, 5.5972605}, {102, 5.2198005}, {35, 4.873327}, {48, 4.788812}, {28, 4.6693187}, {91, 4.6376557}, {60, 4.526513}, {11, 4.4766455}]    |
|1     |[{3, 8.071824}, {49, 5.8443036}, {68, 5.091434}, {8, 4.91893}, {37, 4.815689}, {90, 4.4279466}, {94, 4.4034433}, {19, 3.9874587}, {72, 3.265158}, {87, 2.963067}]       |
|2     |[{31, 8.165876}, {41, 5.9934287}, {64, 5.5782204}, {19, 4.4019814}, {76, 3.6810553}, {94, 3.5962992}, 

                                                                                

## Here I have also added the name column so we can see what movie name the model recommends, in the last output the name was not present so there can be an issue just identifying the movie name 

In [22]:
# Join recommendations with original DataFrame to get movie names
from pyspark.sql.functions import explode

In [23]:
# Explode the 'recommendations' array column
userRecs = userRecs.withColumn("recommendation", explode("recommendations"))
# Join with original DataFrame to get movie names
userRecs_with_names = userRecs.join(df.select("movieId", "name"), userRecs.recommendation.movieId == df.movieId)

In [24]:
# Select relevant columns and show the recommendations
userRecs_with_names.select("userId", "recommendation.movieId", "name", "recommendation.rating").show(truncate=False)




+------+-------+---------------------------+---------+
|userId|movieId|name                       |rating   |
+------+-------+---------------------------+---------+
|0     |1      |The 400 Blows              |8.968684 |
|0     |1      |Cool Hand Luke             |8.968684 |
|0     |1      |La haine                   |8.968684 |
|0     |1      |Casino                     |8.968684 |
|0     |1      |Taxi Driver                |8.968684 |
|0     |1      |To Kill a Mockingbird      |8.968684 |
|0     |1      |Scarface                   |8.968684 |
|0     |1      |Once Upon a Time in America|8.968684 |
|0     |1      |American History X         |8.968684 |
|0     |1      |City of God                |8.968684 |
|0     |1      |Pulp Fiction               |8.968684 |
|0     |1      |12 Angry Men               |8.968684 |
|0     |1      |The Godfather Part II      |8.968684 |
|0     |1      |The Godfather              |8.968684 |
|0     |67     |Grave of the Fireflies     |6.996734 |
|0     |9 

                                                                                

### Predictions 

In [25]:
# Example: Make predictions for specific user-item pairs
predictions = model.transform(df)

In [26]:
predictions.show()

+----+--------------------+----+------------+--------------------+----------------+------+-------+----------+
|rank|                name|year|rating_float|               genre|run_time_minutes|userId|movieId|prediction|
+----+--------------------+----+------------+--------------------+----------------+------+-------+----------+
| 211|               Rocky|1976|         8.1|         Drama,Sport|            NULL| 148.0|   43.0|  8.070729|
| 169|             Warrior|2011|         8.2|  Action,Drama,Sport|             140| 243.0|   50.0|  8.164239|
|  17|          Goodfellas|1990|         8.7|Biography,Crime,D...|             145|  65.0|   13.0|  8.669656|
| 243|             Persona|1966|         8.1|      Drama,Thriller|              83| 133.0|   17.0|  8.071881|
|  80|Once Upon a Time ...|1984|         8.3|         Crime,Drama|             229| 126.0|    1.0|  8.271121|
| 189|In the Name of th...|1993|         8.1|Biography,Crime,D...|             133|  81.0|   13.0|   8.07175|
|  87|    

### RMSE(root mean squared error)

In [27]:
from pyspark.ml.evaluation import RegressionEvaluator

# Evaluate the model by computing the RMSE on the test data
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating_float", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) = " + str(rmse))


                                                                                

Root Mean Squared Error (RMSE) = 0.03094736024407895


Do not forget to stop your sparkjob by writing spark.stop()