#**Recommender Systems**

# **Code**

**Building an RS from scratch using
the ALS method in PySpark**

Setup Spark In Collab 

In [None]:
!apt-get update

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-2.4.7/spark-2.4.7-bin-hadoop2.7.tgz
!tar xf spark-2.4.7-bin-hadoop2.7.tgz
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.7-bin-hadoop2.7"

In [None]:
import findspark
findspark.init()

# Data Info

The dataset that we are going to use is a subset from
a famous open sourced movie lens dataset and contains a total of 0.1
million records with three columns (User_Id,title,rating). We will train our
recommender model using 75% of the data and test it on the rest of the
25% user ratings.

# Step 1: Create the SparkSession Object

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('rc').getOrCreate()

# Step 2: Read the Dataset

In [None]:
df = spark.read.csv('movie_ratings_df.csv',inferSchema=True,header= True)


# Step 3: Exploratory Data Analysis


We will explore the dataset by viewing the dataset, validating
the shape of the dataset, and getting a count of the number of movies rated
and the number of movies that each user rated.

In [None]:
#shape of the dataset
print((df.count(),len(df.columns)))

(100000, 3)


the datatypes of the input values to check if we need to change/
cast any columns’ datatypes.

In [None]:
df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- rating: integer (nullable = true)



 The critical thing with using PySpark for building
RS is that we need to have user_id and item_id in numerical form. Hence,
we will convert the movie title to numerical values

for now we will view a
few rows of the dataframe using the rand function to shuffle the records in
random order

In [None]:
from pyspark.sql.functions import *
df.orderBy(rand()).show(10)

+------+--------------------+------+
|userId|               title|rating|
+------+--------------------+------+
|   293|Star Trek III: Th...|     2|
|   829|Mighty Aphrodite ...|     4|
|   660|      Titanic (1997)|     4|
|   239|Gone with the Win...|     3|
|   838|Four Weddings and...|     4|
|   279|So I Married an A...|     3|
|   146|Leave It to Beave...|     1|
|   279|  Major Payne (1994)|     4|
|   303|City of Lost Chil...|     4|
|   180|      Contact (1997)|     5|
+------+--------------------+------+
only showing top 10 rows



In [None]:
df.groupBy('userId').count().orderBy('count',ascending=False).show(10)

+------+-----+
|userId|count|
+------+-----+
|   405|  737|
|   655|  685|
|    13|  636|
|   450|  540|
|   276|  518|
|   416|  493|
|   537|  490|
|   303|  484|
|   234|  480|
|   393|  448|
+------+-----+
only showing top 10 rows



In [None]:
df.groupBy('userId').count().orderBy('count',ascending=True).show(10)

+------+-----+
|userId|count|
+------+-----+
|   732|   20|
|   631|   20|
|   636|   20|
|   926|   20|
|    93|   20|
|   596|   20|
|   572|   20|
|    34|   20|
|   300|   20|
|   685|   20|
+------+-----+
only showing top 10 rows



The user with the highest number of records has rated 737 movies, and
each user has rated at least 20 movies.

In [None]:
df.groupBy('title').count().orderBy('count',ascending=False).show(10)

+--------------------+-----+
|               title|count|
+--------------------+-----+
|    Star Wars (1977)|  583|
|      Contact (1997)|  509|
|        Fargo (1996)|  508|
|Return of the Jed...|  507|
|    Liar Liar (1997)|  485|
|English Patient, ...|  481|
|       Scream (1996)|  478|
|    Toy Story (1995)|  452|
|Air Force One (1997)|  431|
|Independence Day ...|  429|
+--------------------+-----+
only showing top 10 rows



In [None]:
df.groupBy('title').count().orderBy('count',ascending=True).show(10)

+--------------------+-----+
|               title|count|
+--------------------+-----+
|Aiqing wansui (1994)|    1|
|Next Step, The (1...|    1|
|Leopard Son, The ...|    1|
|Modern Affair, A ...|    1|
|    Fear, The (1995)|    1|
| Mad Dog Time (1996)|    1|
|Lashou shentan (1...|    1|
|Vie est belle, La...|    1|
|JLG/JLG - autopor...|    1|
|       Target (1995)|    1|
+--------------------+-----+
only showing top 10 rows



the movie with the highest number of ratings is Starwars(1977) and each movie has been rating at least by 1 User.

# Step 4: Feature Engineering

We now convert the movie title column from categorical to numerical
values using StringIndexer. We import the stringIndexer and Indextostring
from the PySpark library.

In [None]:
from pyspark.sql.functions import *
from pyspark.ml.feature import StringIndexer,IndexToString

We create the stringindexer object by mentioning the input
column and output column. Then we fit the object on the dataframe and
apply it on the movie title column to create new dataframe with numerical
values.

In [None]:
stringIndexer = StringIndexer(inputCol='title',outputCol='title_new')
#applying stringindexer object on dataframe movie title column
model = stringIndexer.fit(df)
#creating new dataframe with transformed values
indexed = model.transform(df)
                              

In [None]:
indexed.orderBy(rand()).show(10)

+------+--------------------+------+---------+
|userId|               title|rating|title_new|
+------+--------------------+------+---------+
|    71| Blade Runner (1982)|     5|     52.0|
|   716|  Bob Roberts (1992)|     5|    397.0|
|   580|Austin Powers: In...|     5|    241.0|
|   547|    Game, The (1997)|     4|     64.0|
|   429|Renaissance Man (...|     3|    669.0|
|   924|       Scream (1996)|     3|      6.0|
|   554|Mission: Impossib...|     4|     22.0|
|   524|Shanghai Triad (Y...|     5|    850.0|
|    83|   Braveheart (1995)|     5|     37.0|
|   275|James and the Gia...|     3|    260.0|
+------+--------------------+------+---------+
only showing top 10 rows



In [None]:
indexed.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- rating: integer (nullable = true)
 |-- title_new: double (nullable = false)



# Step 5: Splitting the Dataset

In [None]:
# We split it into a 75 to 25 ratio to train the model and test its accuracy
train_df,test_df = indexed.randomSplit([0.75,0.25])


In [None]:
train_df.count()

75044

In [None]:
test_df.count()

24956

# Step 6: Build and Train Recommender Model

We import the ALS function from the PySpark ml library and build the
model on the training dataset. There are multiple hyperparameters
that can be tuned to improve the performance of the model. Two of the
important ones are nonnegative =‘True’ doesn’t create negative ratings in
recommendations and coldStartStrategy=‘drop’ to prevent any NaN ratings
predictions

In [None]:
from pyspark.ml.recommendation import ALS
rec = ALS(maxIter=10,regParam=0.01,userCol='userId',itemCol='title_new',ratingCol='rating',nonnegative=True,coldStartStrategy="drop")

In [None]:
rec_model=rec.fit(train_df)

# Step 7: Predictions and Evaluation on Test Data

Check the performance of the
model on unseen or test data. We use the transform function to make
predictions on the test data and RegressionEvaluate to check the RMSE
value of the model on test data

In [None]:
predicted_rating = rec_model.transform(test_df)
predicted_rating.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- rating: integer (nullable = true)
 |-- title_new: double (nullable = false)
 |-- prediction: float (nullable = false)



In [None]:
predicted_rating.orderBy(rand()).show(10)

+------+--------------------+------+---------+----------+
|userId|               title|rating|title_new|prediction|
+------+--------------------+------+---------+----------+
|    65|When We Were King...|     4|    649.0| 3.4903214|
|   429|When Harry Met Sa...|     4|     45.0| 3.6515372|
|   181| Phantom, The (1996)|     1|    418.0|  1.572057|
|   543|Treasure of the S...|     4|    420.0|  4.101422|
|   593|      Sabrina (1995)|     3|    128.0| 3.3797655|
|   354|    Quiz Show (1994)|     3|    151.0|  3.739589|
|   276|Nutty Professor, ...|     4|    179.0| 2.7088554|
|   373|        Balto (1995)|     4|    986.0| 1.0898387|
|   332|Heaven's Prisoner...|     4|    833.0|  2.590468|
|   806|     Die Hard (1988)|     5|     73.0|  3.697603|
+------+--------------------+------+---------+----------+
only showing top 10 rows



In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
evaluator=RegressionEvaluator(metricName='rmse',predictionCol='prediction',labelCol='rating')

In [None]:
rmse=evaluator.evaluate(predicted_rating)

In [None]:
print(rmse)

1.0225595165259223


# Step 8: Recommend Top Movies That Active User Might Like

 recommend top movies to users that they have not
seen and might like. The first step is to create a list of unique movies in the
dataframe

In [None]:
unique_movies=indexed.select('title_new').distinct()

In [None]:
#number of unique movies
unique_movies.count()

1664

In [None]:
#assigning alias name 'a' to unique movies df
a = unique_movies.alias('a')

In [None]:

user_id=85

In [None]:
#creating another dataframe which contains already watched movie by active user 
watched_movies=indexed.filter(indexed['userId'] == user_id).select('title_new').distinct()

In [None]:
#number of movies already rated 
watched_movies.count()

287

In [None]:
#assigning alias name 'b' to watched movies df
b=watched_movies.alias('b')

We now combine both the tables to find
the movies that we can recommend by filtering null values from the joined
table.

In [None]:
total_movies = a.join(b, a.title_new == b.title_new,how='left')

In [None]:
total_movies.show(20,False)

+---------+---------+
|title_new|title_new|
+---------+---------+
|558.0    |null     |
|305.0    |305.0    |
|299.0    |null     |
|596.0    |null     |
|769.0    |null     |
|934.0    |null     |
|496.0    |496.0    |
|1051.0   |null     |
|692.0    |null     |
|810.0    |null     |
|720.0    |null     |
|782.0    |null     |
|184.0    |184.0    |
|147.0    |147.0    |
|576.0    |null     |
|170.0    |null     |
|1369.0   |null     |
|1587.0   |null     |
|169.0    |null     |
|608.0    |null     |
+---------+---------+
only showing top 20 rows



In [None]:
#selecting movies which active user is yet to rate or watch
remaining_movies=total_movies.where(col("b.title_new").isNull()).select(a.title_new).distinct()

In [None]:
#number of movies user is yet to rate 
remaining_movies.count()

1377

In [None]:
#adding new column of user_Id of active useer to remaining movies df 
remaining_movies=remaining_movies.withColumn("userId",lit(int(user_id)))

In [None]:
remaining_movies.show(10,False)

+---------+------+
|title_new|userId|
+---------+------+
|558.0    |85    |
|299.0    |85    |
|596.0    |85    |
|769.0    |85    |
|934.0    |85    |
|1051.0   |85    |
|692.0    |85    |
|810.0    |85    |
|720.0    |85    |
|782.0    |85    |
+---------+------+
only showing top 10 rows



Finally, we can now make the predictions on this remaining movie’s
dataset for the active user using the recommender model that we built
earlier. We filter only a few top recommendations that have the highest
predicted ratings.

In [None]:
recommendations=rec_model.transform(remaining_movies).orderBy('prediction',ascending=False)

In [None]:
recommendations.show(5)

+---------+------+----------+
|title_new|userId|prediction|
+---------+------+----------+
|   1271.0|    85|  5.007507|
|    747.0|    85| 4.9752927|
|   1286.0|    85| 4.9036617|
|    695.0|    85| 4.7419667|
|    870.0|    85| 4.7412076|
+---------+------+----------+
only showing top 5 rows



In [None]:
#converting title_new values back to movie titles
movie_title = IndexToString(inputCol="title_new", outputCol="title",labels=model.labels)

final_recommendations=movie_title.transform(recommendations)

In [None]:
final_recommendations.show(10,False)

+---------+------+----------+------------------------------------------------------+
|title_new|userId|prediction|title                                                 |
+---------+------+----------+------------------------------------------------------+
|1271.0   |85    |5.007507  |Whole Wide World, The (1996)                          |
|747.0    |85    |4.9752927 |Thin Blue Line, The (1988)                            |
|1286.0   |85    |4.9036617 |Mina Tannenbaum (1994)                                |
|695.0    |85    |4.7419667 |Some Folks Call It a Sling Blade (1993)               |
|870.0    |85    |4.7412076 |Microcosmos: Le peuple de l'herbe (1996)              |
|961.0    |85    |4.7284765 |Amateur (1994)                                        |
|482.0    |85    |4.693382  |Wallace & Gromit: The Best of Aardman Animation (1996)|
|1164.0   |85    |4.650778  |Murder, My Sweet (1944)                               |
|1354.0   |85    |4.637894  |Crossfire (1947)                    