In [30]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import monotonically_increasing_id, col
from pyspark.ml.feature import StringIndexer
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

In [8]:
spark = SparkSession.builder \
    .appName('ALS Recommendation') \
    .getOrCreate()

In [31]:
df = spark.read.csv("cleaned_imdb_data.csv", header=True, inferSchema=True)

In [10]:
df

DataFrame[Poster_Link: string, Series_Title: string, Released_Year: string, Certificate: string, Runtime: string, Genre: string, IMDB_Rating: double, Overview: string, Meta_score: string, Director: string, Star1: string, Star2: string, Star3: string, Star4: string, No_of_Votes: string, Gross: string]

In [34]:
df.show(), df.count()

+-----------+--------------------+-------------+-----------+-------+--------------------+-----------+--------------------+----------+--------------------+------------------+--------------------+------------------+--------------------+-----------+-----------+
|Poster_Link|        Series_Title|Released_Year|Certificate|Runtime|               Genre|IMDB_Rating|            Overview|Meta_score|            Director|             Star1|               Star2|             Star3|               Star4|No_of_Votes|      Gross|
+-----------+--------------------+-------------+-----------+-------+--------------------+-----------+--------------------+----------+--------------------+------------------+--------------------+------------------+--------------------+-----------+-----------+
|       NULL|The Shawshank Red...|         1994|          A|142 min|               Drama|        9.3|Two imprisoned me...|      80.0|      Frank Darabont|       Tim Robbins|      Morgan Freeman|        Bob Gunton|      Will

(None, 1000)

In [35]:
df = df.withColumn("userId", monotonically_increasing_id()) # fake id

df = df.withColumnRenamed("Series_Title", "movie") \
    .withColumnRenamed("IMDB_Rating", "rating") \
        .select("userId", "movie", "rating")

In [37]:
df.show(), df.count()

+------+--------------------+------+
|userId|               movie|rating|
+------+--------------------+------+
|     0|The Shawshank Red...|   9.3|
|     1|       The Godfather|   9.2|
|     2|     The Dark Knight|   9.0|
|     3|The Godfather: Pa...|   9.0|
|     4|        12 Angry Men|   9.0|
|     5|The Lord of the R...|   8.9|
|     6|        Pulp Fiction|   8.9|
|     7|    Schindler's List|   8.9|
|     8|           Inception|   8.8|
|     9|          Fight Club|   8.8|
|    10|The Lord of the R...|   8.8|
|    11|        Forrest Gump|   8.8|
|    12|Il buono, il brut...|   8.8|
|    13|The Lord of the R...|   8.7|
|    14|          The Matrix|   8.7|
|    15|          Goodfellas|   8.7|
|    16|Star Wars: Episod...|   8.7|
|    17|One Flew Over the...|   8.7|
|    18|            Hamilton|   8.6|
|    19|        Gisaengchung|   8.6|
+------+--------------------+------+
only showing top 20 rows


(None, 1000)

In [38]:
indexer = StringIndexer(inputCol="movie", outputCol="movieId")
df = indexer.fit(df).transform(df)
df = df.select("userId", "movieId", "rating")

In [40]:
(train_data, test_data) = df.randomSplit([0.8, 0.2])

In [42]:
train_data.count(), test_data.count()

(787, 213)

In [55]:
als = ALS(maxIter=3, rank=10, regParam=0.15, userCol="userId", itemCol="movieId", ratingCol="rating", coldStartStrategy="drop", seed=0)

In [56]:
model = als.fit(train_data)

In [57]:
predictions = model.transform(test_data)

In [59]:
train_users = train_data.select("userId").distinct()
train_movies = train_data.select("movieId").distinct()

test_user_not_in_train = test_data.join(train_users, on="userId", how="left_anti")
test_movie_not_in_train = test_data.join(train_movies, on="movieId", how="left_anti")

print("user not in train:", test_user_not_in_train.count())
print("movie not in train:", test_movie_not_in_train.count())

user not in train: 213
movie not in train: 212


In [60]:
test_data_filtered = test_data \
    .join(train_users, on="userId", how="inner") \
    .join(train_movies, on="movieId", how="inner")

In [61]:
predictions = model.transform(test_data_filtered)
predictions.select("userId", "movieId", "rating", "prediction").show(10)

+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
+------+-------+------+----------+



In [62]:
# evaluator = RegressionEvaluator(
#     metricName="rmse",
#     labelCol="rating",
#     predictionCol="prediction"
# )

# rmse = evaluator.evaluate(predictions)
# print(f"RMSE = {rmse}")