1. Intialize Environments

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

#Create SparkSession
spark = SparkSession.builder.appName("Recom_System").getOrCreate()

#Load Rating data
ratings = (
    spark.read.csv(
        path="/home/jovyan/data-sets/ml-latest-small/ratings.csv",
        sep=",",
        header=True,
        quote='"',
        schema='userId INT, movieId INT, rating DOUBLE, timestamp INT'
    )
    .select("userId", "movieId", "rating")
    .cache()
)

#Load movie data
movies = (spark.read.csv(
    path="/home/jovyan/data-sets/ml-latest-small/movies.csv",
    sep=",",
    header=True,
    quote='"',
    schema="movieId INT, title STRING, genres STRING"
    )
    .withColumn("release_year", f.regexp_extract(f.col("title"),"\s?\((\d{4})\)", 1))
    .withColumn("title", f.regexp_replace(f.col("title"), "\s?\((\d{4})\)", ""))
    .withColumn("genres", f.split(f.col("genres"), "\|"))
    .cache()
          
)
movies.show()
movies.describe().show()
ratings.describe().show()

+-------+--------------------+--------------------+------------+
|movieId|               title|              genres|release_year|
+-------+--------------------+--------------------+------------+
|      1|           Toy Story|[Adventure, Anima...|        1995|
|      2|             Jumanji|[Adventure, Child...|        1995|
|      3|    Grumpier Old Men|   [Comedy, Romance]|        1995|
|      4|   Waiting to Exhale|[Comedy, Drama, R...|        1995|
|      5|Father of the Bri...|            [Comedy]|        1995|
|      6|                Heat|[Action, Crime, T...|        1995|
|      7|             Sabrina|   [Comedy, Romance]|        1995|
|      8|        Tom and Huck|[Adventure, Child...|        1995|
|      9|        Sudden Death|            [Action]|        1995|
|     10|           GoldenEye|[Action, Adventur...|        1995|
|     11|American Presiden...|[Comedy, Drama, R...|        1995|
|     12|Dracula: Dead and...|    [Comedy, Horror]|        1995|
|     13|               B

2. Traning with ALS

In [3]:
als = ALS(
    userCol="userId",
    itemCol="movieId",
    ratingCol="rating"
)

evaluator = RegressionEvaluator(
    metricName="rmse", labelCol="rating", predictionCol="prediction"
)

parameter_grid = (
    ParamGridBuilder()
    .addGrid(als.rank, [1, 5, 10])
    .addGrid(als.maxIter, [20])
    .addGrid(als.regParam, [0.05, 0.1])
    .build()
)

crossvalidator = CrossValidator(
    estimator=als,
    estimatorParamMaps=parameter_grid,
    evaluator=evaluator,
    numFolds=2
)

(training_data, validation_data) = ratings.randomSplit([8.0, 2.0])
crossval_model = crossvalidator.fit(training_data)
model = crossval_model.bestModel
predictions = model.transform(validation_data).na.drop()
print(f"rmse for best model ({model}):{evaluator.evaluate(predictions)}")

rmse for best model (ALS_0664da4d226c):0.9600765492616707


In [4]:
predictions.toPandas()

Unnamed: 0,userId,movieId,rating,prediction
0,182,471,4.5,3.489158
1,217,471,2.0,3.013054
2,312,471,4.0,3.650676
3,411,471,4.0,3.396613
4,44,833,2.0,2.686844
5,608,833,0.5,2.435403
6,463,1088,3.5,3.535184
7,177,1088,3.5,3.316696
8,41,1088,1.5,2.925025
9,68,1088,3.5,3.183466


3. Make Recommendation

In [22]:
#Method1

USERID = 519

rec_all_users = model.recommendForAllUsers(5).cache()
print(rec_all_users.count())
rec_all_users.show(20,False)

recommendation_for_user1 = (
    rec_all_users.filter(f"userId == {USERID}")
    .withColumn("rec", f.explode("recommendations"))
    .select(
        "userId",
        f.col("rec").movieId.alias("movieId"),
        f.col("rec").rating.alias("rating")
    )
    .join(movies, ["movieId"])
    .orderBy("rating", ascending=False)
    .select("movieId", "title", "release_year")
)

recommendation_for_user1.show(5,False)
recommendation_for_user1.printSchema()

610
+------+-------------------------------------------------------------------------------------------------+
|userId|recommendations                                                                                  |
+------+-------------------------------------------------------------------------------------------------+
|471   |[[5746, 8.037052], [5764, 7.2333465], [7899, 7.2333465], [136850, 7.0670753], [40491, 6.965575]] |
|463   |[[5746, 8.39893], [5764, 7.5590363], [7899, 7.5590363], [136850, 7.385278], [40491, 7.279208]]   |
|496   |[[5746, 7.8109956], [5764, 7.029896], [7899, 7.029896], [136850, 6.868301], [40491, 6.769656]]   |
|148   |[[5746, 7.871516], [5764, 7.0843644], [7899, 7.0843644], [136850, 6.921518], [40491, 6.8221083]] |
|540   |[[5746, 9.31408], [5764, 8.382672], [7899, 8.382672], [136850, 8.189981], [40491, 8.072353]]     |
|392   |[[5746, 7.649399], [5764, 6.884459], [7899, 6.884459], [136850, 6.7262073], [40491, 6.629603]]   |
|243   |[[5746, 10.020263], [5764

In [28]:
#Method2

USERID = 519

subset = ratings.filter(f"userId == {USERID}").select("userId").distinct()
rec_subset = model.recommendForUserSubset(subset, 5)
rec_subset.show(1,False)

recommendation_for_user2 = (
    rec_subset
    .withColumn("rec", f.explode("recommendations"))
    .select(
        "userId",
        f.col("rec").movieId.alias("movieId"),
        f.col("rec").rating.alias("rating")
    )
    .join(movies, ["movieId"])
    .orderBy("rating", ascending=False)
    .select("movieId", "title", "release_year")
)

recommendation_for_user2.show(5,False)


+------+---------------------------------------------------------------------------------------------+
|userId|recommendations                                                                              |
+------+---------------------------------------------------------------------------------------------+
|519   |[[5746, 9.778925], [5764, 8.801032], [7899, 8.801032], [136850, 8.598724], [40491, 8.475226]]|
+------+---------------------------------------------------------------------------------------------+

+-------+-----------------------------------------------------------------+------------+
|movieId|title                                                            |release_year|
+-------+-----------------------------------------------------------------+------------+
|5746   |Galaxy of Terror (Quest)                                         |1981        |
|7899   |Master of the Flying Guillotine (Du bi quan wang da po xue di zi)|1975        |
|5764   |Looker                        

In [35]:
#Method3
USERID = 519

movies_to_be_rated = (
    ratings
    .filter(f"userId == {USERID}")
    .select("movieId").distinct()
    .withColumn("userId", f.lit(USERID))
)


movies_to_be_rated.show()

user_movie_predictions = model.transform(movies_to_be_rated)
user_movie_predictions.show()
user_movie_predictions.printSchema()

+-------+------+
|movieId|userId|
+-------+------+
|   6378|   519|
|   4025|   519|
|  33679|   519|
|  88163|   519|
|   2329|   519|
|  59369|   519|
| 104211|   519|
|   1968|   519|
| 140110|   519|
|  51935|   519|
|   3717|   519|
|    318|   519|
|   3408|   519|
|   4447|   519|
|  69406|   519|
|  34162|   519|
|   5989|   519|
|   1704|   519|
|    356|   519|
|   4963|   519|
+-------+------+
only showing top 20 rows

+-------+------+----------+
|movieId|userId|prediction|
+-------+------+----------+
|   6378|   519| 4.3246827|
|   4025|   519| 3.5989845|
|  33679|   519|  4.003306|
|  88163|   519| 4.8164916|
|   2329|   519| 4.9330707|
|  59369|   519|  4.193957|
| 104211|   519| 4.3248234|
|   1968|   519| 4.5387316|
| 140110|   519|  4.388717|
|  51935|   519|  4.602032|
|   3717|   519| 3.8007061|
|    318|   519| 5.0600343|
|   3408|   519| 4.3277626|
|   4447|   519| 3.8321538|
|  69406|   519| 3.8201365|
|  34162|   519| 4.2376747|
|   5989|   519|  4.609609|
|   17

In [40]:
recommendation_for_user3 = (
    user_movie_predictions
    .orderBy("prediction", ascending=False)
    .limit(5)
    .join(movies, ["movieId"])
    .select("movieId", "title", "release_year")
)

recommendation_for_user3.show(5, False)

+-------+-------------------------+------------+
|movieId|title                    |release_year|
+-------+-------------------------+------------+
|318    |Shawshank Redemption, The|1994        |
|356    |Forrest Gump             |1994        |
|1704   |Good Will Hunting        |1997        |
|2329   |American History X       |1998        |
|88163  |Crazy, Stupid, Love.     |2011        |
+-------+-------------------------+------------+

