<a href="https://colab.research.google.com/github/BhaskarCS/Springboard/blob/master/Movie_Recommendations.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# Setup PySpark

!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www-eu.apache.org/dist/spark/spark-2.4.7/spark-2.4.7-bin-hadoop2.7.tgz
!tar xf spark-2.4.7-bin-hadoop2.7.tgz
!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.7-bin-hadoop2.7"

import findspark
findspark.init()

In [2]:
# import the SparkConfiguration and SparkContext
from pyspark import SparkConf, SparkContext

# if we wanted to change any configuration settings for this session only we would define them here
conf = SparkConf() 

# create a SparkContext using the above configuration
sc = SparkContext(conf=conf)

# this command shows the current configuration settings
sc._conf.getAll()

[('spark.driver.port', '44809'),
 ('spark.rdd.compress', 'True'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.master', 'local[*]'),
 ('spark.executor.id', 'driver'),
 ('spark.submit.deployMode', 'client'),
 ('spark.driver.host', '2218559182aa'),
 ('spark.ui.showConsoleProgress', 'true'),
 ('spark.app.name', 'pyspark-shell'),
 ('spark.app.id', 'local-1603325221672')]

In [3]:
sc._conf.getAll()

[('spark.driver.port', '44809'),
 ('spark.rdd.compress', 'True'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.master', 'local[*]'),
 ('spark.executor.id', 'driver'),
 ('spark.submit.deployMode', 'client'),
 ('spark.driver.host', '2218559182aa'),
 ('spark.ui.showConsoleProgress', 'true'),
 ('spark.app.name', 'pyspark-shell'),
 ('spark.app.id', 'local-1603325221672')]

In [4]:
# import an SQL spark-session so that we can use dataframes
from pyspark.sql import SparkSession
# import the ALS algorithm we will be using
from pyspark.ml.recommendation import ALS
# instantiate the SQL spark session
spark = SparkSession.builder.getOrCreate()

In [5]:
# Point Colaboratory to the Google Drive

from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [6]:
# read in the data, specifying that there is a header in the csv file and that spark should auto-detect the variable types for each column
data = spark.read.option("header","true")\
  .option("inferSchema","true")\
  .format("csv")\
  .load("/content/gdrive/My Drive/Colab Notebooks/input/movielens/ratings.csv")

In [7]:
# print the schema of the dataframe
data.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)



In [8]:
# removing timestamp column

data = data.drop('timestamp')
data.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)



In [9]:
# We can get spark to show us how many partitions it has split the dataframe up into.
data.rdd.getNumPartitions()

6

In [10]:
# shows the top N rows by using the take method:
data.take(1)

[Row(userId=1, movieId=110, rating=1.0)]

## Model Version 1:

In [11]:
#instantiate the model, with the "drop" cold start strategy
model = ALS(coldStartStrategy="drop")

# set the column names for the required data
model.setItemCol("movieId")\
    .setUserCol("userId")\
    .setRatingCol("rating")

# split data into train and test sets with 80:20 proportions
(train, test) = data.randomSplit([0.8, 0.2], seed=10)

# since the train dataframe will be used many times, forcing spark to cache it could reduce time taken, as we don't have to read from disk as much
train.cache()

# fit the model to the training set
model = model.fit(train)

# calculate predictions by using the model to transform the test set
predictions = model.transform(test)

predictions.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- prediction: float (nullable = false)



In [12]:
# import the regression evaluator
from pyspark.ml.evaluation import RegressionEvaluator

# instantiate evaluator, specifying the desired metric "mae" and the columns that contain the predictions and the actual values
evaluator = RegressionEvaluator(metricName="mae", predictionCol="prediction", labelCol="rating")

# evaluate the output of our model
mae = evaluator.evaluate(predictions)

### Model Version 1: Mean Absolute Error: 

In [13]:
print('The Mean Absolute Error is %.3f' % (mae))

The Mean Absolute Error is 0.634


### Model Version 2:

In [14]:
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder

#create a new ALS estimator
als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", coldStartStrategy="drop")

In [15]:
#define a grid for both parameters this will test 9 different combinations of the 2 parameters
paramGrid = ParamGridBuilder() \
    .addGrid(als.rank, [5, 10, 15]) \
    .addGrid(als.regParam, [1, 0.1, 0.01]) \
    .build()

In [16]:
# split the data with a ratio of 80% training, 20% validation; define the estimator and evaluator to use to determine the best model
# also pass in the parameter grid to search over

trainValSplit = TrainValidationSplit(estimator = als, estimatorParamMaps=paramGrid, \
                                     evaluator = RegressionEvaluator(metricName="mae", predictionCol="prediction", labelCol="rating"), \
                                     trainRatio = 0.8, parallelism = 4)    

In [17]:
# fit the model to the training data
model = trainValSplit.fit(train)

In [18]:
# retrieve the best model
bestModel = model.bestModel

In [19]:
# transform test data using bestModel
predictions = bestModel.transform(test)

In [20]:
# evaluate the predictions
mae = evaluator.evaluate(predictions)

print('The Mean Absolute Error is %.3f' % (mae)) 

The Mean Absolute Error is 0.634
