In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Recommender System').getOrCreate()

### Reading the Dataset

In [6]:
df = spark.read.csv('./static/movie_ratings_df.csv', inferSchema=True, header=True)
df.show(10, True) # Pulls the fist ten rows of the dataset

+------+------------+------+
|userId|       title|rating|
+------+------------+------+
|   196|Kolya (1996)|     3|
|    63|Kolya (1996)|     3|
|   226|Kolya (1996)|     5|
|   154|Kolya (1996)|     3|
|   306|Kolya (1996)|     5|
|   296|Kolya (1996)|     4|
|    34|Kolya (1996)|     5|
|   271|Kolya (1996)|     4|
|   201|Kolya (1996)|     4|
|   209|Kolya (1996)|     4|
+------+------------+------+
only showing top 10 rows



### Data size in terms of rows and records

In [3]:
print((df.count(), len(df.columns)))

(100000, 3)


Our Dataset contains 100,000 records with only three columns

Data Schema

In [4]:
df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- rating: integer (nullable = true)



#### Two of the columns are Numerical while title column is categorical

#### Top users by number of moviers rated

In [10]:
df.groupBy('userId').count().orderBy('count', ascending=False).show(10, False)

+------+-----+
|userId|count|
+------+-----+
|405   |737  |
|655   |685  |
|13    |636  |
|450   |540  |
|276   |518  |
|416   |493  |
|537   |490  |
|303   |484  |
|234   |480  |
|393   |448  |
+------+-----+
only showing top 10 rows



#### Bottom users by number of moviers rated

In [11]:
df.groupBy('userId').count().orderBy('count', ascending=True).show(10, False)

+------+-----+
|userId|count|
+------+-----+
|732   |20   |
|631   |20   |
|572   |20   |
|685   |20   |
|93    |20   |
|636   |20   |
|34    |20   |
|596   |20   |
|926   |20   |
|300   |20   |
+------+-----+
only showing top 10 rows



## Feature Engineering

In [12]:
from pyspark.sql.functions import *
from pyspark.ml.feature import StringIndexer, IndexToString

#### So now we create the stringindexer object by mentioning the input column and output column.

In [17]:
stringIndexer = StringIndexer(inputCol='title',outputCol='New_Title')
model = stringIndexer.fit(df)
indexed = model.transform(df)
indexed.show(10, False)

+------+------------+------+---------+
|userId|title       |rating|New_Title|
+------+------------+------+---------+
|196   |Kolya (1996)|3     |287.0    |
|63    |Kolya (1996)|3     |287.0    |
|226   |Kolya (1996)|5     |287.0    |
|154   |Kolya (1996)|3     |287.0    |
|306   |Kolya (1996)|5     |287.0    |
|296   |Kolya (1996)|4     |287.0    |
|34    |Kolya (1996)|5     |287.0    |
|271   |Kolya (1996)|4     |287.0    |
|201   |Kolya (1996)|4     |287.0    |
|209   |Kolya (1996)|4     |287.0    |
+------+------------+------+---------+
only showing top 10 rows



In [22]:
print (df.groupBy('title').count().orderBy('count',ascending=False).show(10,False), indexed.groupBy('New_Title').count().orderBy('count',ascending=False).show(10,False))

+-----------------------------+-----+
|title                        |count|
+-----------------------------+-----+
|Star Wars (1977)             |583  |
|Contact (1997)               |509  |
|Fargo (1996)                 |508  |
|Return of the Jedi (1983)    |507  |
|Liar Liar (1997)             |485  |
|English Patient, The (1996)  |481  |
|Scream (1996)                |478  |
|Toy Story (1995)             |452  |
|Air Force One (1997)         |431  |
|Independence Day (ID4) (1996)|429  |
+-----------------------------+-----+
only showing top 10 rows

+---------+-----+
|New_Title|count|
+---------+-----+
|0.0      |583  |
|1.0      |509  |
|2.0      |508  |
|3.0      |507  |
|4.0      |485  |
|5.0      |481  |
|6.0      |478  |
|7.0      |452  |
|8.0      |431  |
|9.0      |429  |
+---------+-----+
only showing top 10 rows

None None


## Pliting The Data Set
We split it into a 80 to 20 ratio to train the model and test its accuracy

In [38]:
train,test=indexed.randomSplit([0.8,0.2])
print("Training set", train.count())
print("Testing set", test.count())

Training set 80190
Testing set 19810


##  Building and Training the Model

In [36]:
from pyspark.ml.recommendation import ALS
rec=ALS(maxIter=10,regParam=0.01,userCol='userId',itemCol='New_Title',ratingCol='rating',nonnegative=True)
rec_model = rec.fit(train)

### Performance evaluation on our test data
Here we will chack the performance of our model on unseen data.

In [40]:
predict_ratings = rec_model.transform(test)
predict_ratings.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- rating: integer (nullable = true)
 |-- New_Title: double (nullable = true)
 |-- prediction: float (nullable = true)



In [41]:
predict_ratings.show(10)

+------+--------------------+------+---------+----------+
|userId|               title|rating|New_Title|prediction|
+------+--------------------+------+---------+----------+
|   458|That Thing You Do...|     3|    148.0| 2.7512252|
|    53|That Thing You Do...|     3|    148.0| 3.4581232|
|   193|That Thing You Do...|     4|    148.0| 4.0641327|
|   223|That Thing You Do...|     4|    148.0| 3.1970763|
|    93|That Thing You Do...|     4|    148.0| 4.9698334|
|   152|That Thing You Do...|     3|    148.0| 4.1981716|
|   896|That Thing You Do...|     3|    148.0| 2.7994106|
|   280|That Thing You Do...|     3|    148.0| 4.1902285|
|   692|That Thing You Do...|     3|    148.0| 3.2819183|
|   432|That Thing You Do...|     4|    148.0| 3.2117474|
+------+--------------------+------+---------+----------+
only showing top 10 rows



In [46]:
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(metricName = 'rmse', predictionCol = 'prediction', labelCol = 'rating')
rmse = evaluator.evaluate(predict_ratings)
print(rmse)

nan


##### The rmse is non we can equate this to zero meaning there is no error.

### Recommend Top Movies That Active User Might Like
The first step is to create a list of unique movies in the dataframe.

In [48]:
unique_movies=indexed.select('New_Title').distinct()
unique_movies.count() ## Total numbe rof individual movies in our new dataframe

1664

In [55]:
a = unique_movies.alias('a')

#### We can select any user within the dataset for which we need to recommend other movies. In our case, we go ahead with userId = 96.

#### We will filter the movies that this active user has already rated or seen.

In [59]:
user_id=96
watched_movies=indexed.filter(indexed['userId'] == user_id).select('New_Title').distinct()
watched_movies.count() # Number of movies the user has watched

56

In [60]:
b=watched_movies.alias('b')

#### So, there are total of 56 unique movies out of 1,664 movies that this active user has already rated. So, we would want to recommend movies from the remaining 1608 movies.

In [64]:
total_movies = a.join(b, a.New_Title == b.New_Title,how='left')
total_movies.show()

+---------+---------+
|New_Title|New_Title|
+---------+---------+
|    558.0|     null|
|    305.0|     null|
|    299.0|     null|
|    596.0|     null|
|    769.0|     null|
|    934.0|     null|
|    496.0|     null|
|   1051.0|     null|
|    692.0|     null|
|    810.0|     null|
|    720.0|     null|
|    782.0|     null|
|    184.0|     null|
|    147.0|     null|
|    576.0|     null|
|    170.0|     null|
|   1369.0|     null|
|   1587.0|     null|
|    169.0|     null|
|    608.0|     null|
+---------+---------+
only showing top 20 rows



In [67]:
remaining_movies=total_movies.where(col("b.New_Title").isNull()).select(a.New_Title).distinct()
remaining_movies.count()

1608

In [71]:
remaining_movies=remaining_movies.withColumn("userId",lit(int(user_id)))
remaining_movies.show(10, False)

+---------+------+
|New_Title|userId|
+---------+------+
|558.0    |96    |
|305.0    |96    |
|299.0    |96    |
|596.0    |96    |
|769.0    |96    |
|934.0    |96    |
|496.0    |96    |
|1051.0   |96    |
|692.0    |96    |
|810.0    |96    |
+---------+------+
only showing top 10 rows



### Finally, we can now make the predictions on this remaining movie’s dataset for the active user using the recommender model that we built earlier. We filter only a few top recommendations that have the highest predicted ratings.

In [73]:
recommendations = rec_model.transform(remaining_movies).orderBy('prediction', ascending = False)
recommendations.show(100, False)

+---------+------+----------+
|New_Title|userId|prediction|
+---------+------+----------+
|1652.0   |96    |NaN       |
|1540.0   |96    |NaN       |
|1597.0   |96    |NaN       |
|1547.0   |96    |NaN       |
|1632.0   |96    |NaN       |
|1556.0   |96    |NaN       |
|1646.0   |96    |NaN       |
|1623.0   |96    |NaN       |
|1601.0   |96    |NaN       |
|1658.0   |96    |NaN       |
|1522.0   |96    |NaN       |
|1577.0   |96    |NaN       |
|1570.0   |96    |NaN       |
|1586.0   |96    |NaN       |
|1616.0   |96    |NaN       |
|1606.0   |96    |NaN       |
|1534.0   |96    |NaN       |
|1611.0   |96    |NaN       |
|1536.0   |96    |NaN       |
|1618.0   |96    |NaN       |
|1346.0   |96    |9.69666   |
|1188.0   |96    |8.464907  |
|1301.0   |96    |8.186031  |
|826.0    |96    |7.5327663 |
|804.0    |96    |7.235327  |
|1343.0   |96    |7.1318555 |
|837.0    |96    |7.0773735 |
|986.0    |96    |7.0262904 |
|869.0    |96    |6.985313  |
|1154.0   |96    |6.9561777 |
|680.0    

### Let us add the movie title to the recommendations

In [75]:
movie_title = IndexToString(inputCol="New_Title",outputCol="title",labels=model.labels)
final_recommendations=movie_title.transform(recommendations)
final_recommendations.show(100,False)

+---------+------+----------+---------------------------------------------------------+
|New_Title|userId|prediction|title                                                    |
+---------+------+----------+---------------------------------------------------------+
|1547.0   |96    |NaN       |Baton Rouge (1988)                                       |
|1597.0   |96    |NaN       |Etz Hadomim Tafus (Under the Domin Tree) (1994)          |
|1556.0   |96    |NaN       |Mirage (1995)                                            |
|1646.0   |96    |NaN       |Shadows (Cienie) (1988)                                  |
|1601.0   |96    |NaN       |Normal Life (1996)                                       |
|1632.0   |96    |NaN       |Death in Brunswick (1991)                                |
|1652.0   |96    |NaN       |Big One, The (1997)                                      |
|1658.0   |96    |NaN       |� k�ldum klaka (Cold Fever) (1994)                       |
|1606.0   |96    |NaN       |Kil

### Thats it guys
### A simple collaborative filtering based recommender system in PySpark using the ALS method to recommend movies to the users

### Bye