In [9]:
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, ArrayType
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import col

# Create a SparkSession
spark = SparkSession.builder \
    .appName("ALS Movie Recommendations") \
    .getOrCreate()

# Load datasets into DataFrames
movies_df = spark.read.csv("movies.csv", header=True)
ratings_df = spark.read.csv("ratings.csv", header=True)

# Convert userId and movieId columns to IntegerType
ratings_df = ratings_df.withColumn("userId", ratings_df["userId"].cast(IntegerType()))
ratings_df = ratings_df.withColumn("movieId", ratings_df["movieId"].cast(IntegerType()))

# Convert rating column to DoubleType
ratings_df = ratings_df.withColumn("rating", ratings_df["rating"].cast(DoubleType()))

# Train ALS recommendation model
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
          coldStartStrategy="drop")
model = als.fit(ratings_df)

# Make recommendations for users
userRecs = model.recommendForAllUsers(10)

# Explode the array of recommendations
userRecs = userRecs.select("userId", col("recommendations.movieId").alias("movieId"))

# Convert the column of arrays to rows
userRecs = userRecs.withColumn("movieId", col("movieId")[0])

# Join recommendations with movie information
recommended_movies = userRecs.join(movies_df, ["movieId"], "left_outer") \
                              .select("userId", "title", "genres")

# Display the results
recommended_movies.show(truncate=False)




+------+----------------------------------------------+----------------------+
|userId|title                                         |genres                |
+------+----------------------------------------------+----------------------+
|1     |Show Me Love (Fucking Åmål) (1998)            |Drama|Romance         |
|2     |Wait Until Dark (1967)                        |Drama|Thriller        |
|3     |Stardust Memories (1980)                      |Comedy|Drama          |
|4     |Tommy (1975)                                  |Musical               |
|5     |Mist, The (2007)                              |Horror|Sci-Fi         |
|6     |Affair to Remember, An (1957)                 |Drama|Romance         |
|7     |Officer and a Gentleman, An (1982)            |Drama|Romance         |
|8     |Stranger Than Paradise (1984)                 |Comedy|Drama          |
|9     |Wait Until Dark (1967)                        |Drama|Thriller        |
|10    |Anna and the King (1999)                    

In [10]:
from pyspark.ml.evaluation import RegressionEvaluator

# Generate predictions
predictions = model.transform(ratings_df)

# Select necessary columns for evaluation
predictions = predictions.select("userId", "movieId", "rating", "prediction")

# Evaluate the model using RMSE
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)

print("Root Mean Squared Error (RMSE):", rmse)



Root Mean Squared Error (RMSE): 0.5329384619841988


In [11]:
from pyspark.sql.functions import split, explode, trim, col

# Split genres into individual genres
movies_df = movies_df.withColumn("genre", explode(split(trim(col("genres")), "\\|")))

# Count occurrences of each unique genre
genre_counts = movies_df.groupBy("genre").count()

# Display the result
genre_counts.show()


+------------------+-----+
|             genre|count|
+------------------+-----+
|             Crime| 1199|
|           Romance| 1596|
|          Thriller| 1894|
|         Adventure| 1263|
|             Drama| 4361|
|               War|  382|
|       Documentary|  440|
|           Fantasy|  779|
|           Mystery|  573|
|           Musical|  334|
|         Animation|  611|
|         Film-Noir|   87|
|(no genres listed)|   34|
|              IMAX|  158|
|            Horror|  978|
|           Western|  167|
|            Comedy| 3756|
|          Children|  664|
|            Action| 1828|
|            Sci-Fi|  980|
+------------------+-----+



In [12]:
from pyspark.sql.functions import sum

# Join ratings and movies DataFrames
joined_df = ratings_df.join(movies_df, "movieId", "inner")

# Group by movieId and genre to compute total rating for each movie and each genre
total_rating_per_movie = joined_df.groupBy("movieId", "title").agg(sum("rating").alias("total_rating"))
total_rating_per_genre = joined_df.groupBy("genre").agg(sum("rating").alias("total_rating"))

# Display the total rating of each movie
print("Total rating of each movie:")
total_rating_per_movie.show()

# Display the total rating of each genre
print("Total rating of each genre:")
total_rating_per_genre.show()


Total rating of each movie:
+-------+--------------------+------------+
|movieId|               title|total_rating|
+-------+--------------------+------------+
|   2657|Rocky Horror Pict...|       842.0|
|   2076|  Blue Velvet (1986)|       544.5|
|    493|Menace II Society...|       132.0|
|    881|    First Kid (1996)|        21.0|
|    442|Demolition Man (1...|       751.5|
|   6548|  Bad Boys II (2003)|       196.0|
| 141688|       Legend (2015)|        14.0|
|   2171|Next Stop Wonderl...|        79.5|
|   4085|Beverly Hills Cop...|       626.0|
|  45447|Da Vinci Code, Th...|       459.0|
|  38886|Squid and the Wha...|        62.0|
| 142997|Hotel Transylvani...|        23.0|
|   6059| Recruit, The (2003)|       114.0|
|    938|         Gigi (1958)|        19.5|
|  96373|       Broken (2012)|         4.5|
| 134849|   Duck Amuck (1953)|        12.0|
|  97172|Frankenweenie (2012)|        20.0|
| 161966|         Elle (2016)|         4.0|
|   5563|City by the Sea (...|        17.0|
|   

In [13]:
from pyspark.sql.functions import avg, col

# Calculate average rating of all movies
average_rating_all_movies = ratings_df.groupBy().avg("rating").collect()[0][0]

# Calculate average rating of each user
average_rating_per_user = ratings_df.groupBy("userId").avg("rating")

# List top 10 best performing movies
top_performing_movies = total_rating_per_movie.orderBy(col("total_rating").desc()).limit(10)

# List 5 worst performing movies
worst_performing_movies = total_rating_per_movie.orderBy(col("total_rating")).limit(5)

# Display results
print("Average rating of all movies:", average_rating_all_movies)
print("\nAverage rating of each user:")
average_rating_per_user.show()

print("\nTop 10 best performing movies:")
top_performing_movies.show(truncate=False)

print("\n5 worst performing movies:")
worst_performing_movies.show(truncate=False)



Average rating of all movies: 3.501556983616962

Average rating of each user:
+------+------------------+
|userId|       avg(rating)|
+------+------------------+
|   148|3.7395833333333335|
|   463| 3.787878787878788|
|   471|             3.875|
|   496| 3.413793103448276|
|   243| 4.138888888888889|
|   392|               3.2|
|   540|               4.0|
|    31|              3.92|
|   516|3.6923076923076925|
|    85|3.7058823529411766|
|   137| 3.978723404255319|
|   251| 4.869565217391305|
|   451|3.7941176470588234|
|   580| 3.529816513761468|
|    65| 4.029411764705882|
|   458|4.1525423728813555|
|    53|               5.0|
|   255|2.5681818181818183|
|   481| 2.806451612903226|
|   588|              3.25|
+------+------------------+
only showing top 20 rows


Top 10 best performing movies:
+-------+--------------------------------+------------+
|movieId|title                           |total_rating|
+-------+--------------------------------+------------+
|356    |Forrest Gump (1