In [1]:
"""
Collaborative Filtering ALS Recommender System using Spark MLlib adapted from
the Spark Summit 2014 Recommender System training example.

Developed By: Pranav Masariya
Supervisor: Dr. Magdalini Eirinaki
"""

import os
import numpy as np
from pyspark.sql import SparkSession
from pyspark import SparkContext

from pyspark.mllib.recommendation import ALS
import math
import pyspark.sql
from pyspark.sql.functions import udf
from pyspark.sql.types import *
from pyspark.ml.evaluation import RegressionEvaluator

In [2]:
# Calling spark session to register application
spark = SparkSession \
    .builder \
    .appName("Recom") \
    .config("spark.recom.demo", "1") \
    .getOrCreate()
# lambda word: (word, 1)

In [3]:
ratings_df = spark.read \
    .format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("traing_surprise.csv")

In [4]:
ratings_df.show(5)

+---+-----+----------+-------+
|_c0| asin|reviewerID|overall|
+---+-----+----------+-------+
|  0|52021|     15012|      4|
|  1|42867|     20330|      5|
|  2| 9168|     62907|      5|
|  3|26051|     11778|      4|
|  4|30061|     63717|      4|
+---+-----+----------+-------+
only showing top 5 rows



In [7]:
ratings_df.count()

687833

In [8]:
ratings_df = ratings_df.drop("_c0")
ratings_df.show(5)

+-----+----------+-------+
| asin|reviewerID|overall|
+-----+----------+-------+
|52021|     15012|      4|
|42867|     20330|      5|
| 9168|     62907|      5|
|26051|     11778|      4|
|30061|     63717|      4|
+-----+----------+-------+
only showing top 5 rows



In [32]:
testing_df = spark.read \
    .format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("py_test_set.csv")

In [33]:
validation_df = spark.read \
    .format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("py_validation_set.csv")

In [34]:
validation_df.count()

294786

In [35]:
testing_df = testing_df.drop("_c0")
testing_df.show(5)

+-----+----------+-------+
| asin|reviewerID|overall|
+-----+----------+-------+
|53990|     51353|      4|
|16865|     24035|      4|
|56330|     44344|      5|
|37794|     68137|      4|
| 1268|     16332|      4|
+-----+----------+-------+
only showing top 5 rows



In [37]:
(_,_,validationData) = testing_df.randomSplit([0.6,0.2,0.2]) # randomSplit(x, weights, seed)


In [60]:
 validation_df = validation_df.drop("_c0")

In [61]:

validation_for_predict = validationData.drop("overall")


In [62]:
validationData.show()

+----+----------+-------+
|asin|reviewerID|overall|
+----+----------+-------+
|   2|     18103|      5|
|   2|     40593|      3|
|   6|     66740|      5|
|   8|     58308|      4|
|   9|      4360|      5|
|   9|     34774|      1|
|  10|     36501|      3|
|  20|      2192|      5|
|  20|     26125|      5|
|  20|     33173|      5|
|  20|     45923|      4|
|  23|     14930|      4|
|  23|     54666|      4|
|  24|     37710|      5|
|  25|      7665|      5|
|  26|      8388|      5|
|  27|       670|      4|
|  27|     39357|      5|
|  36|     43423|      5|
|  39|      1435|      5|
+----+----------+-------+
only showing top 20 rows



In [63]:
validation_for_predict.show()

+----+----------+
|asin|reviewerID|
+----+----------+
|   2|     18103|
|   2|     40593|
|   6|     66740|
|   8|     58308|
|   9|      4360|
|   9|     34774|
|  10|     36501|
|  20|      2192|
|  20|     26125|
|  20|     33173|
|  20|     45923|
|  23|     14930|
|  23|     54666|
|  24|     37710|
|  25|      7665|
|  26|      8388|
|  27|       670|
|  27|     39357|
|  36|     43423|
|  39|      1435|
+----+----------+
only showing top 20 rows



In [64]:
seed = 5 #Random seed for initial matrix factorization model. A value of None will use system time as the seed.
iterations = 10
regularization_parameter = 0.1 #run for different lambdas - e.g. 0.01
ranks = [4, 8, 12] #number of features

In [65]:
min_error = 1000
for rank in ranks:
    model = ALS.train(ratings_df, rank, seed=seed, iterations=iterations,
                      lambda_=regularization_parameter)
    
    predictions = model.predictAll(validation_for_predict.rdd).map(lambda r: ((r[0], r[1]), r[2]))
    rates_and_preds = validationData.rdd.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
    error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean()) # RMSE Error

    print ('For rank',rank, "the RMSE is ", error)
    if error < min_error:
        min_error = error
        best_rank = rank

print ("The best model was trained with rank", best_rank)

For rank 4 the RMSE is  1.1859088283230408
For rank 8 the RMSE is  0.705891975645383
For rank 12 the RMSE is  0.5942899896999723
The best model was trained with rank 12


In [66]:
validation_df.show()

+-----+----------+-------+
| asin|reviewerID|overall|
+-----+----------+-------+
|53990|     51353|      5|
|16865|     24035|      5|
|56330|     44344|      5|
|37794|     68137|      5|
| 1268|     16332|      5|
|17755|     12362|      5|
|49764|     66410|      5|
|25200|     49450|      5|
|14099|     40115|      5|
|21067|     11999|      5|
|39364|     28660|      5|
|24739|     13206|      5|
|36963|     41622|      5|
|21986|     19975|      5|
|35972|     21052|      5|
|56761|     53401|      5|
|61080|     25504|      5|
| 6394|     61153|      5|
| 2755|     42778|      5|
| 2307|      4386|      5|
+-----+----------+-------+
only showing top 20 rows



In [67]:
test_for_predict = validation_df.drop("overall")
test_for_predict = test_for_predict.drop("_c0")

In [68]:
test_for_predict.show()

+-----+----------+
| asin|reviewerID|
+-----+----------+
|53990|     51353|
|16865|     24035|
|56330|     44344|
|37794|     68137|
| 1268|     16332|
|17755|     12362|
|49764|     66410|
|25200|     49450|
|14099|     40115|
|21067|     11999|
|39364|     28660|
|24739|     13206|
|36963|     41622|
|21986|     19975|
|35972|     21052|
|56761|     53401|
|61080|     25504|
| 6394|     61153|
| 2755|     42778|
| 2307|      4386|
+-----+----------+
only showing top 20 rows



In [69]:
test_for_predict.count()//294745

294786

In [70]:
validation_df.show()

+-----+----------+-------+
| asin|reviewerID|overall|
+-----+----------+-------+
|53990|     51353|      5|
|16865|     24035|      5|
|56330|     44344|      5|
|37794|     68137|      5|
| 1268|     16332|      5|
|17755|     12362|      5|
|49764|     66410|      5|
|25200|     49450|      5|
|14099|     40115|      5|
|21067|     11999|      5|
|39364|     28660|      5|
|24739|     13206|      5|
|36963|     41622|      5|
|21986|     19975|      5|
|35972|     21052|      5|
|56761|     53401|      5|
|61080|     25504|      5|
| 6394|     61153|      5|
| 2755|     42778|      5|
| 2307|      4386|      5|
+-----+----------+-------+
only showing top 20 rows



In [71]:
prediction = model.predictAll(test_for_predict.rdd)

In [72]:
prediction.count()

294745

In [23]:
prediction.take(3)

[Rating(user=28105, product=57436, rating=3.782193531242547),
 Rating(user=965, product=57436, rating=2.99500068547825),
 Rating(user=31199, product=18624, rating=4.838887675062123)]

In [24]:
from pyspark.sql.types import Row

#here you are going to create a function
def f(x):
    d = {}
#     "asin":x[0],"reviewerID":x[1],"overall":x[2]
    d["asin"] = x[0]
    d["reviewerID"] = x[1]
    d["overall"] = x[2]
    print(x)
    return d

In [25]:
test_df = prediction.map(lambda x: Row(**f(x))).toDF()

In [76]:
test_df.count()

291098

In [73]:
test_df.show()


+-----+------------------+----------+
| asin|           overall|reviewerID|
+-----+------------------+----------+
|28105| 3.782193531242547|     57436|
|  965|  2.99500068547825|     57436|
|31199| 4.838887675062123|     18624|
|39244|3.9429087363878317|     32196|
|25050|4.5552665501405745|     32196|
|54586| 4.941920299052484|     32196|
|38991| 4.932972505086246|     32196|
|52203|3.4665720642780817|     32196|
|34452|  3.25926269832587|     54040|
|36776|  3.95045163897624|     54040|
|15096| 4.944374899144345|     54040|
|31941| 4.784060993519634|     54040|
| 3973| 3.099652667890167|     54040|
|24127| 3.200659853613787|     54040|
|58963| 4.909371624415071|     54040|
| 6419| 3.656857803138702|     54040|
|60443| 2.902866109563715|     54040|
| 9690|0.5905607385864561|      7608|
|20983|2.8481622320792366|      7608|
|16376| 3.603550941274074|     18500|
+-----+------------------+----------+
only showing top 20 rows



In [75]:
test_df.coalesce(1).write.csv("baseline")