In [10]:
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as func
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
import pandas as pd
import matplotlib.pyplot as plt

In [2]:
# Initialize Saprk Session
spark = SparkSession.builder.appName("MovieLensRec").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/08/10 19:56:03 WARN Utils: Your hostname, dothanhdatle, resolves to a loopback address: 127.0.1.1; using 10.188.43.224 instead (on interface wlo1)
25/08/10 19:56:03 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/08/10 19:56:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
# Load data
base_dir = os.path.dirname(os.getcwd())
data_path = os.path.join(base_dir, "data", "transformed")

movie_df = spark.read.parquet(os.path.join(data_path,"movies_clean"))
movieByGenre_df = spark.read.parquet(os.path.join(data_path,"movies_by_genre"))
rating_df = spark.read.parquet(os.path.join(data_path,"ratingWithDatetime"))
user_df = spark.read.parquet(os.path.join(data_path,"usersData"))

                                                                                

In [4]:
movie_df.printSchema()
movieByGenre_df.printSchema()
rating_df.printSchema()
user_df.printSchema()

root
 |-- movieID: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)
 |-- year: integer (nullable = true)

root
 |-- movieID: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- genre: string (nullable = true)

root
 |-- userID: integer (nullable = true)
 |-- movieID: integer (nullable = true)
 |-- rating: float (nullable = true)
 |-- timestamp: long (nullable = true)
 |-- datetime: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)

root
 |-- userID: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- occupation: integer (nullable = true)
 |-- zipcode: string (nullable = true)
 |-- age_group: string (nullable = true)



In [5]:
movie_df.show(3,truncate=False)
movieByGenre_df.show(3, truncate=False)
rating_df.show(3, truncate=False)
user_df.show(3, truncate=False)

                                                                                

+-------+--------------------------------------+--------------------------+----+
|movieID|title                                 |genres                    |year|
+-------+--------------------------------------+--------------------------+----+
|169    |Free Willy 2: The Adventure Home      |Adventure|Children's|Drama|1995|
|191    |Scarlet Letter, The                   |Drama                     |1995|
|891    |Halloween: The Curse of Michael Myers |Horror|Thriller           |1995|
+-------+--------------------------------------+--------------------------+----+
only showing top 3 rows
+-------+---------------------------------+----+----------+
|movieID|title                            |year|genre     |
+-------+---------------------------------+----+----------+
|169    |Free Willy 2: The Adventure Home |1995|Adventure |
|169    |Free Willy 2: The Adventure Home |1995|Children's|
|169    |Free Willy 2: The Adventure Home |1995|Drama     |
+-------+---------------------------------+----+-

**Build recommendation model**

In [None]:
# split train test set 
(train, test) = rating_df.randomSplit([0.8, 0.2])

In [None]:
# train model
als = ALS(userCol="userID", itemCol="movieID", ratingCol="rating",
          coldStartStrategy="drop", nonnegative=True, implicitPrefs=False)

model = als.fit(train)

                                                                                

In [11]:
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")

rmse = evaluator.evaluate(predictions)
print(f"Root-mean-square error = {rmse}")

[Stage 116:>                                                        (0 + 2) / 2]

Root-mean-square error = 0.8736244296303589


                                                                                

In [12]:
# Make recommendations for all users
userRecs = model.recommendForAllUsers(10)
userRecs.show(5, truncate=False)



+------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|userID|recommendations                                                                                                                                                                            |
+------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|26    |[{572, 4.8338547}, {2309, 4.0876555}, {3314, 4.0826306}, {3233, 3.8834777}, {1851, 3.8290057}, {985, 3.8030872}, {3853, 3.7817454}, {2332, 3.7800689}, {811, 3.7602987}, {3603, 3.7539806}]|
|27    |[{557, 5.2348585}, {2309, 4.801462}, {670, 4.750251}, {923, 4.7052727}, {2019, 4.6911325}, {3030, 4.685462}, {668, 4.6763964}, {3134, 4.6495295}, {912, 4.647382}, {3022, 4.643687}]       |
|28    |[{557, 

                                                                                