In [17]:
from pyspark.sql import SparkSession
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import explode


In [2]:
spark = SparkSession.builder \
    .appName("MovieRecommendationApp") \
    .getOrCreate()


In [3]:
# Load Dataset
movies_path = "./Data_Sets/movies.csv"
ratings_path = "./Data_Sets/ratings.csv"




In [4]:
movies = spark.read.csv(movies_path, header=True)
ratings = spark.read.csv(ratings_path, header=True)

# Preview datasets
movies.show(5)
ratings.show(5)



+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
+------+-------+------+---------+
only showing top 5 rows



In [5]:
# Preprocess Data
ratings = ratings.select(ratings["userId"].cast("int"),
                         ratings["movieId"].cast("int"),
                         ratings["rating"].cast("float"))


In [6]:
ratings.show(5)

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|      1|   4.0|
|     1|      3|   4.0|
|     1|      6|   4.0|
|     1|     47|   5.0|
|     1|     50|   5.0|
+------+-------+------+
only showing top 5 rows



In [7]:
# Split Dataset
(training, test) = ratings.randomSplit([0.8, 0.2])


In [8]:
#  Build ALS Model
als = ALS(maxIter=10, regParam=0.1, userCol="userId", itemCol="movieId", ratingCol="rating",
          coldStartStrategy="drop")
model = als.fit(training)


In [9]:
# Evaluate Model
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print(f"Root-mean-square error = {rmse}")


Root-mean-square error = 0.8778262294742294


In [12]:
#  Generate Recommendations
user_recs = model.recommendForAllUsers(10)
movie_recs = model.recommendForAllItems(10)

# Show sample recommendations
user_recs.show(5)
movie_recs.show(5)



+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|     1|[{3022, 5.685599}...|
|     2|[{68945, 5.008567...|
|     3|[{70946, 5.089172...|
|     4|[{6818, 5.6524982...|
|     5|[{3022, 4.9692335...|
+------+--------------------+
only showing top 5 rows

+-------+--------------------+
|movieId|     recommendations|
+-------+--------------------+
|     26|[{53, 4.953249}, ...|
|     27|[{53, 4.861505}, ...|
|     28|[{53, 6.0493126},...|
|     31|[{364, 4.614735},...|
|     34|[{536, 5.1348476}...|
+-------+--------------------+
only showing top 5 rows



In [19]:
type(user_recs)

pyspark.sql.dataframe.DataFrame

In [25]:
recommendationsDF = (user_recs
  .select("userId", explode("recommendations")
  .alias("recommendation"))
  .select("userId", "recommendation.*")
)
recommendationsDF.show(5)
recommendationsDF.write.csv("./Data_Sets/recommendations", header=True, mode='overwrite')

+------+-------+---------+
|userId|movieId|   rating|
+------+-------+---------+
|     1|   3022| 5.685599|
|     1|   2131|5.6738753|
|     1| 170705|5.6290684|
|     1|    123|5.6072817|
|     1| 106100| 5.561618|
+------+-------+---------+
only showing top 5 rows

