## RECOMMENDATION SYSTEM USING PYSPARK
In this Noteebook We shall be using pyspark to model a recommendation system .

Refer to the notebook for the explored and prepared data.

In [7]:
# import necessary libraries
import pyspark
from pyspark.sql import SparkSession

# instantiate SparkSession object
spark = SparkSession.builder.master("local").getOrCreate()



In [14]:
# Loading the csv file we created in the Notebook
data = spark.read.format('csv').option('header','true').load('movie-rate.csv')

In [15]:
# calling the .show() method to preview the data
data.show()

+-------+----------------+--------------------+------+------+
|movieId|           title|              genres|userId|rating|
+-------+----------------+--------------------+------+------+
|      1|Toy Story (1995)|Adventure|Animati...|     1|   4.0|
|      1|Toy Story (1995)|Adventure|Animati...|     5|   4.0|
|      1|Toy Story (1995)|Adventure|Animati...|     7|   4.5|
|      1|Toy Story (1995)|Adventure|Animati...|    15|   2.5|
|      1|Toy Story (1995)|Adventure|Animati...|    17|   4.5|
|      1|Toy Story (1995)|Adventure|Animati...|    18|   3.5|
|      1|Toy Story (1995)|Adventure|Animati...|    19|   4.0|
|      1|Toy Story (1995)|Adventure|Animati...|    21|   3.5|
|      1|Toy Story (1995)|Adventure|Animati...|    27|   3.0|
|      1|Toy Story (1995)|Adventure|Animati...|    31|   5.0|
|      1|Toy Story (1995)|Adventure|Animati...|    32|   3.0|
|      1|Toy Story (1995)|Adventure|Animati...|    33|   3.0|
|      1|Toy Story (1995)|Adventure|Animati...|    40|   5.0|
|      1

In [16]:
data.printSchema()

root
 |-- movieId: string (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- rating: string (nullable = true)



In [18]:
# Let's cast our numeric data back to numeric 
data= data.withColumn('userId',data['userId'].cast('int'))
data = data.withColumn('movieId',data['movieId'].cast('int'))
data = data.withColumn('rating',data['rating'].cast('float'))

In [19]:
data.printSchema()

root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)
 |-- userId: integer (nullable = true)
 |-- rating: float (nullable = true)



In [21]:
# There we go back to normal
# On to modelling
# Using ALS 

from pyspark.ml.recommendation import ALS

In [22]:
# Instantiate the ALS class
als = ALS(
    userCol='userId',
    itemCol='movieId',
    ratingCol='rating',
    nonnegative=True,
    coldStartStrategy="drop"
)

In [23]:
# Splitting the Data
train , test = data.randomSplit(weights=[0.75,0.25],seed=42)

In [24]:
# Fitting the train data
model = als.fit(train)

In [25]:
#  time to evaluate after we fitted the train data
from pyspark.ml.evaluation import RegressionEvaluator

predictions = model.transform(test)

In [26]:
# Evaluating using "RMSE"
evaluator = RegressionEvaluator(metricName='rmse',labelCol='rating',predictionCol='prediction')
rmse = evaluator.evaluate(predictions)
print(f'RMSE: {rmse}')

RMSE: 0.8813433891456554


### Cross Evaluating to find the optimal parameters

In [28]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# defining seed
seed = 42
# initialize the als model
als = ALS(
    userCol='userId',
    itemCol='movieId',
    ratingCol='rating',
    nonnegative=True,
    coldStartStrategy="drop"
)

# Building ParamGridBuilder
params = ParamGridBuilder() \
    .addGrid(als.rank, [10, 30, 50]) \
    .addGrid(als.maxIter, [5, 10, 15]) \
    .addGrid(als.regParam, [0.01, 0.1, 1.0]) \
    .build()

# evaluator ("RMSE")
evaluator = RegressionEvaluator(metricName='rmse',labelCol='rating',predictionCol='prediction')

# Cross-evaluation
cross_val = CrossValidator(estimator=als,
                            estimatorParamMaps=params,
                            evaluator=evaluator,
                            numFolds=5,
                            seed=seed,
                            parallelism=4)

# Train-test-split
train , test = data.randomSplit(weights=[0.75,0.25],seed=seed)

model = cross_val.fit(train)

In [29]:
# best Model predictions
best_model = model.bestModel
predictions = best_model.transform(test)
rmse = evaluator.evaluate(predictions)

print(f"Root Mean Squared Error (RMSE) on test data of the Best Model: {rmse}")

Root Mean Squared Error (RMSE) on test data of the Best Model: 0.8750608881572383


In [58]:
# GENERATE TOP 10 RECOMMENDATION FOR USERS
rec_users = best_model.recommendForAllUsers(10)

In [61]:
rec_users.show(5)

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|   471|[[7121, 4.714729]...|
|   463|[[69524, 4.733878...|
|   496|[[306, 4.526734],...|
|   148|[[98491, 4.681185...|
|   540|[[7748, 5.1192875...|
+------+--------------------+
only showing top 5 rows



In [62]:
rec_items = best_model.recommendForAllItems(10)
rec_items.show(5)

+-------+--------------------+
|movieId|     recommendations|
+-------+--------------------+
|   1580|[[53, 4.730987], ...|
|   4900|[[43, 4.4274774],...|
|   6620|[[53, 4.7439375],...|
|   7340|[[562, 3.9343483]...|
|  32460|[[53, 5.5077505],...|
+-------+--------------------+
only showing top 5 rows



In [63]:
data.show(5)

+-------+----------------+--------------------+------+------+
|movieId|           title|              genres|userId|rating|
+-------+----------------+--------------------+------+------+
|      1|Toy Story (1995)|Adventure|Animati...|     1|   4.0|
|      1|Toy Story (1995)|Adventure|Animati...|     5|   4.0|
|      1|Toy Story (1995)|Adventure|Animati...|     7|   4.5|
|      1|Toy Story (1995)|Adventure|Animati...|    15|   2.5|
|      1|Toy Story (1995)|Adventure|Animati...|    17|   4.5|
+-------+----------------+--------------------+------+------+
only showing top 5 rows



#### Recommendation Function for a specific user

In [241]:
def recommend_for_specific_user(user , df , users_df):
    u_df = users_df.toPandas()
    rate = df.toPandas() 

    movie_ids = []
    movies_and_rates = []
    for i in u_df[u_df['userId'] == user]['recommendations']:
        for x in i:
            movie_ids.append(x[0])
            movies_and_rates.append(x[1])
            
    titles = []
    for x in movie_ids:
        if x in rate['movieId']:
            titles.append(rate[rate['movieId'] == x]['title'].drop_duplicates().values[0])
    return  list(zip(titles,movies_and_rates)) 
    

In [242]:
# Example of predicted movies ratings according to the preferences of user 20
recommend_for_specific_user(20,data,rec_users,rec_items)

[("Adam's Rib (1949)", 5.128778457641602),
 ('Beautiful Thing (1996)', 4.945532321929932),
 ('Chorus Line, A (1985)', 4.893959999084473),
 ('Crossing Delancey (1988)', 4.880897045135498),
 ('Lady Jane (1986)', 4.880897045135498),
 ('Guys and Dolls (1955)', 4.853848934173584),
 ('Wallace & Gromit: The Best of Aardman Animation (1996)', 4.832089900970459),
 ('Six Degrees of Separation (1993)', 4.82675313949585),
 ('Love and Death (1975)', 4.824601650238037),
 ('Saving Face (2004)', 4.81075382232666)]