In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www-eu.apache.org/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
!tar xf spark-2.4.4-bin-hadoop2.7.tgz
!pip install -q findspark

Creating Environment

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"

In [0]:
#Creating spark session
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [0]:
from pyspark.sql.types import *

In [0]:
#Loading spark Context
sc = spark.sparkContext

In [6]:
from google.colab import drive
drive.mount('/content/drive')

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3Aietf%3Awg%3Aoauth%3A2.0%3Aoob&scope=email%20https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fdocs.test%20https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fdrive%20https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fdrive.photos.readonly%20https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fpeopleapi.readonly&response_type=code

Enter your authorization code:
··········
Mounted at /content/drive


Reading training data

In [0]:
import pandas as pd 
data=pd.read_csv('/content/drive/My Drive/training.csv')

Loading traing data in spark dataframe

In [8]:
data_df = spark.read \
    .format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("/content/drive/My Drive/training.csv")
data_df.withColumn('reviewerID', data_df.reviewerID.cast(IntegerType()))
data_df.withColumn('asin', data_df.asin.cast(IntegerType()))
data_df.withColumn('overall', data_df.overall.cast(FloatType()))

DataFrame[asin: int, overall: float, reviewerID: int]

In [9]:
#data_df = data_df.drop('helpful','reviewText','reviewTime','reviewerName','summary','unixReviewTime')
data_df.show(5)

+-----+-------+----------+
| asin|overall|reviewerID|
+-----+-------+----------+
|52021|      4|     15012|
|42867|      5|     20330|
| 9168|      5|     62907|
|26051|      4|     11778|
|30061|      4|     63717|
+-----+-------+----------+
only showing top 5 rows



In [0]:
trainingData = data_df

Preprocessing of data

In [0]:
dataSet = data[['asin', 'reviewerID', 'overall']]
users = data.reviewerID.unique()
userMeanRatings = {}
#Mean centering of overall column
for uid in users:
    avg = dataSet.loc[dataSet.reviewerID == uid, 'overall'].mean()
    dataSet.loc[dataSet.reviewerID == uid, 'overall'] -= avg
    userMeanRatings[uid] = avg
#dataSet.to_csv(preprocessedFile, index=False)


In [0]:
#Saving preprocessed data in csv file
dataSet.to_csv('preprocessed.csv',index=False)
!cp preprocessed.csv /content/drive/My\ Drive/

In [0]:

temp_df=spark.createDataFrame(dataSet)
df = spark.createDataFrame(dataSet)

In [0]:
trainingData=df

In [15]:
trainingData.show(5)

+-----+----------+-------------------+
| asin|reviewerID|            overall|
+-----+----------+-------------------+
|52021|     15012|               -0.5|
|42867|     20330|                0.0|
| 9168|     62907|                0.0|
|26051|     11778|-0.8913043478260869|
|30061|     63717|                0.0|
+-----+----------+-------------------+
only showing top 5 rows



ALS Model to train

In [0]:
#Training model ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
als=ALS(maxIter=100, regParam=0.01,rank=150,userCol="reviewerID",itemCol="asin",ratingCol="overall",coldStartStrategy="drop",nonnegative=True)
model=als.fit(trainingData)

Loading Test Data

In [17]:
ratings_df = spark.read \
    .format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("/content/drive/My Drive/test_with_asin_reviewerID.csv")
ratings_df.show(5)

+----------+-----+
|reviewerID| asin|
+----------+-----+
|     57436|28105|
|     57436|  965|
|     18624|31199|
|     32196|39244|
|     32196|25050|
+----------+-----+
only showing top 5 rows



Applying model on Test Data

In [18]:
testData=ratings_df['reviewerID','asin']
testData.withColumn('reviewerID', testData['reviewerID'].cast(IntegerType()))
testData.withColumn('asin', testData['asin'].cast(IntegerType()))
#Evaluation parameter=RMSE
evaluator=RegressionEvaluator(metricName="rmse",labelCol="overall",predictionCol="prediction")
predictions=model.transform(testData)
pdPredictions  = predictions.toPandas()
#Inverse mean centering
for uid, mean in userMeanRatings.items():
    pdPredictions.loc[pdPredictions.reviewerID == uid, 'prediction'] += mean
pdPredictions.head()


Unnamed: 0,reviewerID,asin,prediction
0,27033,148,3.250056
1,63670,148,4.0
2,11146,463,3.964887
3,40607,463,3.826146
4,51277,463,4.023309


In [19]:
pdPredictions.head()

Unnamed: 0,reviewerID,asin,prediction
0,27033,148,3.250056
1,63670,148,4.0
2,11146,463,3.964887
3,40607,463,3.826146
4,51277,463,4.023309


In [20]:
pdPredictions['key'] = pdPredictions.reviewerID.astype(str)+'-' + pdPredictions.asin.astype(str)
pdPredictions.head()


Unnamed: 0,reviewerID,asin,prediction,key
0,27033,148,3.250056,27033-148
1,63670,148,4.0,63670-148
2,11146,463,3.964887,11146-463
3,40607,463,3.826146,40607-463
4,51277,463,4.023309,51277-463


Transforming data in the {key,overall} pair

In [21]:
#Creating a new dataframe for reviewID-asin,overall 
new_data=pdPredictions[['key','prediction']].copy()
new_data.rename(columns = {'prediction':'overall'}, inplace = True)
# new_data.overall = new_data.overall.astype(float)
new_data.head()

Unnamed: 0,key,overall
0,27033-148,3.250056
1,63670-148,4.0
2,11146-463,3.964887
3,40607-463,3.826146
4,51277-463,4.023309


Storing data in csv

In [0]:
new_data.to_csv('kaggle_submission.csv',index=False)
!cp kaggle_submission.csv /content/drive/My\ Drive/