<a href="https://colab.research.google.com/github/gtoge/Recommendation_System/blob/master/Recomendation_System.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# RECOMMENDATION SYSTEM
- we will build a RS using Spark's MLlib library.
- We will predict the rating of an item from any given user.
- We will use a collaborative filtering based recommender



## Creating a Colab environment
- We will beusing google colab for pyspark.
- We need to install the following dependencies:
- Apache Spark 
- hadoop 
- java 
- Findspark


### Apache Spark and Hadoop
- Installing current version

In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark

### Setting Up the Environment Path

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

### Testing Installation
- we will use a local spark session

In [0]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

### Uploading Google Drive

In [0]:
#Mounting My Google Drive 
from google.colab import drive
drive.mount('/content/gdrive')


Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3aietf%3awg%3aoauth%3a2.0%3aoob&response_type=code&scope=email%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdocs.test%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive.photos.readonly%20https%3a%2f%2fwww.googleapis.com%2fauth%2fpeopleapi.readonly

Enter your authorization code:
··········
Mounted at /content/gdrive


In [0]:
# Change to my working directory
%cd /content/gdrive/My Drive/Recomendation_System

/content/gdrive/My Drive/Recomendation_System


In [0]:
pwd

'/content/gdrive/My Drive/Recomendation_System'

### Creating a SparkSession Object

In [0]:
#import and create sparksession object
from pyspark.sql import SparkSession 
spark=SparkSession.builder.appName('rc').getOrCreate()

### Reading the dataset

In [0]:
#import the required functions and libraries
from pyspark.sql.functions import *
#load the dataset and create sprk dataframe
df=spark.read.csv('/content/gdrive/My Drive/Recomendation_System/movie_ratings_df.csv',inferSchema=True,header=True)

## Exploratory Data Analysis

In [0]:
#Getting the shape of the data 
print((df.count(),len(df.columns)))

(100000, 3)


In [0]:
#display columns and columns type in dataframe
df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- rating: integer (nullable = true)



In [0]:
#display few rows of the dataframe randomly
df.orderBy(rand()).show(10,False)

+------+-------------------------------------+------+
|userId|title                                |rating|
+------+-------------------------------------+------+
|268   |Bob Roberts (1992)                   |4     |
|454   |Real Genius (1985)                   |2     |
|474   |City Hall (1996)                     |3     |
|798   |Wizard of Oz, The (1939)             |4     |
|276   |Young Poisoner's Handbook, The (1995)|4     |
|25    |Nell (1994)                          |4     |
|87    |Heat (1995)                          |3     |
|178   |Highlander (1986)                    |4     |
|881   |Graduate, The (1967)                 |3     |
|336   |Beautiful Girls (1996)               |5     |
+------+-------------------------------------+------+
only showing top 10 rows



In [0]:
# number of ratings by each user
df.groupBy('userId').count().orderBy('count',ascending=False).show(10,False)

+------+-----+
|userId|count|
+------+-----+
|405   |737  |
|655   |685  |
|13    |636  |
|450   |540  |
|276   |518  |
|416   |493  |
|537   |490  |
|303   |484  |
|234   |480  |
|393   |448  |
+------+-----+
only showing top 10 rows



In [0]:
#number of ratings by each user
df.groupBy('userId').count().orderBy('count',ascending=True).show(10,False)

+------+-----+
|userId|count|
+------+-----+
|732   |20   |
|636   |20   |
|572   |20   |
|93    |20   |
|631   |20   |
|596   |20   |
|300   |20   |
|926   |20   |
|685   |20   |
|34    |20   |
+------+-----+
only showing top 10 rows



In [0]:
#number of times a movie has been rated 
df.groupBy('title').count().orderBy('count',ascending=False).show(10,False)

+-----------------------------+-----+
|title                        |count|
+-----------------------------+-----+
|Star Wars (1977)             |583  |
|Contact (1997)               |509  |
|Fargo (1996)                 |508  |
|Return of the Jedi (1983)    |507  |
|Liar Liar (1997)             |485  |
|English Patient, The (1996)  |481  |
|Scream (1996)                |478  |
|Toy Story (1995)             |452  |
|Air Force One (1997)         |431  |
|Independence Day (ID4) (1996)|429  |
+-----------------------------+-----+
only showing top 10 rows



In [0]:
df.groupBy('title').count().orderBy('count',ascending=True).show(10,False)

+-----------------------------------------+-----+
|title                                    |count|
+-----------------------------------------+-----+
|Aiqing wansui (1994)                     |1    |
|Next Step, The (1995)                    |1    |
|Leopard Son, The (1996)                  |1    |
|Fear, The (1995)                         |1    |
|Mad Dog Time (1996)                      |1    |
|Target (1995)                            |1    |
|Lashou shentan (1992)                    |1    |
|Vie est belle, La (Life is Rosey) (1987) |1    |
|Modern Affair, A (1995)                  |1    |
|JLG/JLG - autoportrait de d�cembre (1994)|1    |
+-----------------------------------------+-----+
only showing top 10 rows



## Feature Engineering
- We convert categrorical data to numerical values
- We will using StringIndexer

In [0]:
#import String indexer to convert string values to numeric values
from pyspark.ml.feature import StringIndexer,IndexToString

In [0]:
#creating string indexer to convert the movie title column values into numerical values
stringIndexer = StringIndexer(inputCol="title", outputCol="title_new")

In [0]:
#applying stringindexer object on dataframe movie title column
model = stringIndexer.fit(df)

In [0]:
#creating new dataframe with transformed values
indexed = model.transform(df)

In [0]:
#validate the numerical title values
indexed.show(10)

+------+------------+------+---------+
|userId|       title|rating|title_new|
+------+------------+------+---------+
|   196|Kolya (1996)|     3|    287.0|
|    63|Kolya (1996)|     3|    287.0|
|   226|Kolya (1996)|     5|    287.0|
|   154|Kolya (1996)|     3|    287.0|
|   306|Kolya (1996)|     5|    287.0|
|   296|Kolya (1996)|     4|    287.0|
|    34|Kolya (1996)|     5|    287.0|
|   271|Kolya (1996)|     4|    287.0|
|   201|Kolya (1996)|     4|    287.0|
|   209|Kolya (1996)|     4|    287.0|
+------+------------+------+---------+
only showing top 10 rows



In [0]:
#number of times each numerical movie title has been rated 
indexed.groupBy('title_new').count().orderBy('count',ascending=False).show(10,False)

+---------+-----+
|title_new|count|
+---------+-----+
|0.0      |583  |
|1.0      |509  |
|2.0      |508  |
|3.0      |507  |
|4.0      |485  |
|5.0      |481  |
|6.0      |478  |
|7.0      |452  |
|8.0      |431  |
|9.0      |429  |
+---------+-----+
only showing top 10 rows



## Splitting Dataset

In [0]:
#split the data into training and test datatset
train,test=indexed.randomSplit([0.75,0.25])

In [0]:
#count number of records in train set
train.count()

75157

In [0]:
#count number of records in test set
test.count()

24843

## Building and Training Model
- Using ALS Function
- Setting Hyperparameters

In [0]:
#import ALS recommender function from pyspark ml library
from pyspark.ml.recommendation import ALS


In [0]:
#Training the recommender model using trainning datatset
# setting nonnegative to True to advoid creating negative rating
# coldStartStrategy to drop to prevent NAN
rec=ALS(maxIter=10,regParam=0.01,userCol='userId',itemCol='title_new',ratingCol='rating',nonnegative=True,coldStartStrategy="drop")

In [0]:
#fit the model on train set
rec_model=rec.fit(train)

## Prediction and Evaluation on Test Data

In [0]:
#making predictions on test set 
prediction=rec_model.transform(test)

In [0]:
#columns in predicted ratings dataframe
prediction.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- rating: integer (nullable = true)
 |-- title_new: double (nullable = false)
 |-- prediction: float (nullable = false)



In [0]:
#predicted vs actual ratings for test set 
prediction.orderBy(rand()).show(10)

+------+--------------------+------+---------+----------+
|userId|               title|rating|title_new|prediction|
+------+--------------------+------+---------+----------+
|   214| Pulp Fiction (1994)|     5|     12.0| 3.6739855|
|   102|Fire Down Below (...|     2|    653.0| 2.1945543|
|    59| Blade Runner (1982)|     5|     52.0|  4.881231|
|   230|Twelve Monkeys (1...|     3|     13.0|  3.045692|
|   538|Nobody's Fool (1994)|     3|    635.0| 3.7750916|
|   584|      Titanic (1997)|     5|     20.0| 5.0793176|
|   318|    Pinocchio (1940)|     3|    331.0|  3.693245|
|   901|     Maverick (1994)|     5|    247.0| 4.7107105|
|   406|Immortal Beloved ...|     3|    525.0| 2.3319488|
|   505|Air Force One (1997)|     4|      8.0| 3.9634209|
+------+--------------------+------+---------+----------+
only showing top 10 rows



In [0]:
#importing Regression Evaluator to measure RMSE
from pyspark.ml.evaluation import RegressionEvaluator

In [0]:
#create Regressor evaluator object for measuring accuracy
eval=RegressionEvaluator(metricName='rmse',predictionCol='prediction',labelCol='rating')

In [0]:
#apply the RE on predictions dataframe to calculate RMSE
rmse=eval.evaluate(prediction)

In [0]:
#print RMSE error
print("RMSE:% 3f" % rmse)

RMSE: 1.021138


## Recommending Top Movies That Active Users Might Like

In [0]:
#create dataset of all distinct movies 
unique_movies=indexed.select('title_new').distinct()

In [0]:
#number of unique movies
unique_movies.count()

1664

In [0]:
#assigning alias name 'a' to unique movies df
a = unique_movies.alias('a')

In [0]:
user_id=85

In [0]:
#creating another dataframe which contains already watched movie by active user 
watched_movies=indexed.filter(indexed['userId'] == user_id).select('title_new').distinct()

In [0]:
#number of movies already rated 
watched_movies.count()

287

In [0]:
#assigning alias name 'b' to watched movies df
b=watched_movies.alias('b')

In [0]:
#joining both tables on left join 
total_movies = a.join(b, a.title_new == b.title_new,how='left')

In [0]:
total_movies.show(10,False)

+---------+---------+
|title_new|title_new|
+---------+---------+
|558.0    |null     |
|305.0    |305.0    |
|299.0    |null     |
|596.0    |null     |
|769.0    |null     |
|934.0    |null     |
|496.0    |496.0    |
|1051.0   |null     |
|692.0    |null     |
|810.0    |null     |
+---------+---------+
only showing top 10 rows



In [0]:
#selecting movies which active user is yet to rate or watch
remaining_movies=total_movies.where(col("b.title_new").isNull()).select(a.title_new).distinct()

In [0]:
#number of movies user is yet to rate 
remaining_movies.count()

1377

In [0]:
#adding new column of user_Id of active useer to remaining movies df 
remaining_movies=remaining_movies.withColumn("userId",lit(int(user_id)))

In [0]:
remaining_movies.show(10,False)

+---------+------+
|title_new|userId|
+---------+------+
|558.0    |85    |
|299.0    |85    |
|596.0    |85    |
|769.0    |85    |
|934.0    |85    |
|1051.0   |85    |
|692.0    |85    |
|810.0    |85    |
|720.0    |85    |
|782.0    |85    |
+---------+------+
only showing top 10 rows



In [0]:
#making recommendations using ALS recommender model and selecting only top 'n' movies
recommendations=rec_model.transform(remaining_movies).orderBy('prediction',ascending=False)

In [0]:
recommendations.show(5,False)

+---------+------+----------+
|title_new|userId|prediction|
+---------+------+----------+
|1286.0   |85    |5.6669097 |
|1369.0   |85    |5.0321894 |
|1271.0   |85    |4.667848  |
|914.0    |85    |4.645869  |
|263.0    |85    |4.6170864 |
+---------+------+----------+
only showing top 5 rows



In [0]:
#converting title_new values back to movie titles
movie_title = IndexToString(inputCol="title_new", outputCol="title",labels=model.labels)

final_recommendations=movie_title.transform(recommendations)

In [0]:
final_recommendations.show(10,False)

+---------+------+----------+----------------------------+
|title_new|userId|prediction|title                       |
+---------+------+----------+----------------------------+
|1286.0   |85    |5.6669097 |Mina Tannenbaum (1994)      |
|1369.0   |85    |5.0321894 |Harlem (1993)               |
|1271.0   |85    |4.667848  |Whole Wide World, The (1996)|
|914.0    |85    |4.645869  |Top Hat (1935)              |
|263.0    |85    |4.6170864 |12 Angry Men (1957)         |
|1195.0   |85    |4.559059  |Pather Panchali (1955)      |
|901.0    |85    |4.5414906 |Three Caballeros, The (1945)|
|1470.0   |85    |4.4813557 |Some Mother's Son (1996)    |
|1598.0   |85    |4.4738894 |Spanish Prisoner, The (1997)|
|1501.0   |85    |4.4738894 |Butcher Boy, The (1998)     |
+---------+------+----------+----------------------------+
only showing top 10 rows



In [0]:
#create function to recommend top 'n' movies to any particular user
def top_movies(user_id,n):
    """
    This function returns the top 'n' movies that user has not seen yet but might like 
    
    """
    #assigning alias name 'a' to unique movies df
    a = unique_movies.alias('a')
    
    #creating another dataframe which contains already watched movie by active user 
    watched_movies=indexed.filter(indexed['userId'] == user_id).select('title_new')
    
    #assigning alias name 'b' to watched movies df
    b=watched_movies.alias('b')
    
    #joining both tables on left join 
    total_movies = a.join(b, a.title_new == b.title_new,how='left')
    
    #selecting movies which active user is yet to rate or watch
    remaining_movies=total_movies.where(col("b.title_new").isNull()).select(a.title_new).distinct()
    
    
    #adding new column of user_Id of active useer to remaining movies df 
    remaining_movies=remaining_movies.withColumn("userId",lit(int(user_id)))
    
    
    #making recommendations using ALS recommender model and selecting only top 'n' movies
    recommendations=rec_model.transform(remaining_movies).orderBy('prediction',ascending=False).limit(n)
    
    
    #adding columns of movie titles in recommendations
    movie_title = IndexToString(inputCol="title_new", outputCol="title",labels=model.labels)
    final_recommendations=movie_title.transform(recommendations)
    
    #return the recommendations to active user
    return final_recommendations.show(n,False)

In [0]:
top_movies(85,10)

+---------+------+----------+----------------------------+
|title_new|userId|prediction|title                       |
+---------+------+----------+----------------------------+
|1286.0   |85    |5.6669097 |Mina Tannenbaum (1994)      |
|1369.0   |85    |5.0321894 |Harlem (1993)               |
|1271.0   |85    |4.667848  |Whole Wide World, The (1996)|
|914.0    |85    |4.645869  |Top Hat (1935)              |
|263.0    |85    |4.6170864 |12 Angry Men (1957)         |
|1195.0   |85    |4.559059  |Pather Panchali (1955)      |
|901.0    |85    |4.5414906 |Three Caballeros, The (1945)|
|1470.0   |85    |4.4813557 |Some Mother's Son (1996)    |
|1598.0   |85    |4.4738894 |Spanish Prisoner, The (1997)|
|1386.0   |85    |4.4738894 |Underneath, The (1995)      |
+---------+------+----------+----------------------------+

