In [1]:
#LINK TO AMAZON DATASET: https://nijianmo.github.io/amazon/index.html#sample-metadata

In [2]:
import numpy as np
import pandas as pd

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, DoubleType, IntegerType
from pyspark.sql import functions as F


spark = SparkSession.builder.getOrCreate()
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)

In [4]:
df = spark.read.csv('/home/luca/Downloads/ratings_Movies_and_TV.csv')

In [5]:
df.rdd.id()

14

In [6]:
df.schema

StructType(List(StructField(_c0,StringType,true),StructField(_c1,StringType,true),StructField(_c2,StringType,true),StructField(_c3,StringType,true)))

In [7]:
df.tail(10)

[Row(_c0='A17W587EH23J0Q', _c1='B00LT1JHLW', _c2='5.0', _c3='1405641600'),
 Row(_c0='A3E4Q2YOYCKXON', _c1='B00LT1JHLW', _c2='5.0', _c3='1405987200'),
 Row(_c0='A1U1UNV1RLCKRL', _c1='B00LT1JHLW', _c2='3.0', _c3='1406073600'),
 Row(_c0='A14THKG1X8861X', _c1='B00LT1JHLW', _c2='5.0', _c3='1405555200'),
 Row(_c0='A3DE438TF1A958', _c1='B00LT1JHLW', _c2='5.0', _c3='1405728000'),
 Row(_c0='AHCV1RTGY3PJ8', _c1='B00LT1JHLW', _c2='5.0', _c3='1405641600'),
 Row(_c0='A2RWCXDMANY0LW', _c1='B00LT1JHLW', _c2='5.0', _c3='1405987200'),
 Row(_c0='A3V9PIFRME2XCW', _c1='B00LT1JHLW', _c2='5.0', _c3='1405900800'),
 Row(_c0='A3ROPC55BE2OM9', _c1='B00LT1JHLW', _c2='5.0', _c3='1405728000'),
 Row(_c0='A2ARBNMH5Q5YM1', _c1='B00LVGP8EA', _c2='5.0', _c3='1405641600')]

In [8]:
#Changing column names - https://stackoverflow.com/questions/34077353/how-to-change-dataframe-column-names-in-pyspark

df = df.selectExpr("_c0 as ReviewerID", "_c1 as ProductID", "_c2 as Rating", 
                   "_c3 as unixReviewTime")

In [9]:
#Showing First 20 Rows of Dataset. Items of interest for preprocessing are ReviewerID and ProductID.

df.show()

+--------------+----------+------+--------------+
|    ReviewerID| ProductID|Rating|unixReviewTime|
+--------------+----------+------+--------------+
|A3R5OBKS7OM2IR|0000143502|   5.0|    1358380800|
|A3R5OBKS7OM2IR|0000143529|   5.0|    1380672000|
| AH3QC2PC1VTGP|0000143561|   2.0|    1216252800|
|A3LKP6WPMP9UKX|0000143588|   5.0|    1236902400|
| AVIY68KEPQ5ZD|0000143588|   5.0|    1232236800|
|A1CV1WROP5KTTW|0000589012|   5.0|    1309651200|
| AP57WZ2X4G0AA|0000589012|   2.0|    1366675200|
|A3NMBJ2LCRCATT|0000589012|   5.0|    1393804800|
| A5Y15SAOMX6XA|0000589012|   2.0|    1307404800|
|A3P671HJ32TCSF|0000589012|   5.0|    1393718400|
|A3VCKTRD24BG7K|0000589012|   5.0|    1378425600|
| ANF0AGIV0JCH2|0000589012|   5.0|    1308182400|
|A3LDEBLV6MVUBE|0000589012|   5.0|    1208995200|
|A1R2XZWQ6NM5M1|0000589012|   5.0|    1224979200|
|A36L1XGA5AQIJY|0000589012|   1.0|    1393113600|
|A2HWI21H23GDS4|0000589012|   4.0|    1338681600|
|A1DNYFL3RSXRMO|0000589012|   5.0|    1208908800|


In [10]:
# MUST FIT INTEGER RANGE: -2147483648 to 2147483647 - https://spark.apache.org/docs/latest/sql-ref-datatypes.html

df.count()

4607047

In [11]:
# Change the input integer to change the size of the dataset for analysis
new_df = df.take(70000)

In [12]:
new_df = spark.createDataFrame(new_df)

In [13]:
#Items of interest are ReviewerID and ProductID

new_df.show()

+--------------+----------+------+--------------+
|    ReviewerID| ProductID|Rating|unixReviewTime|
+--------------+----------+------+--------------+
|A3R5OBKS7OM2IR|0000143502|   5.0|    1358380800|
|A3R5OBKS7OM2IR|0000143529|   5.0|    1380672000|
| AH3QC2PC1VTGP|0000143561|   2.0|    1216252800|
|A3LKP6WPMP9UKX|0000143588|   5.0|    1236902400|
| AVIY68KEPQ5ZD|0000143588|   5.0|    1232236800|
|A1CV1WROP5KTTW|0000589012|   5.0|    1309651200|
| AP57WZ2X4G0AA|0000589012|   2.0|    1366675200|
|A3NMBJ2LCRCATT|0000589012|   5.0|    1393804800|
| A5Y15SAOMX6XA|0000589012|   2.0|    1307404800|
|A3P671HJ32TCSF|0000589012|   5.0|    1393718400|
|A3VCKTRD24BG7K|0000589012|   5.0|    1378425600|
| ANF0AGIV0JCH2|0000589012|   5.0|    1308182400|
|A3LDEBLV6MVUBE|0000589012|   5.0|    1208995200|
|A1R2XZWQ6NM5M1|0000589012|   5.0|    1224979200|
|A36L1XGA5AQIJY|0000589012|   1.0|    1393113600|
|A2HWI21H23GDS4|0000589012|   4.0|    1338681600|
|A1DNYFL3RSXRMO|0000589012|   5.0|    1208908800|


In [14]:
new_df.dtypes

[('ReviewerID', 'string'),
 ('ProductID', 'string'),
 ('Rating', 'string'),
 ('unixReviewTime', 'string')]

In [15]:
new_df = new_df.withColumn("ReviewerID", new_df["ReviewerID"].cast(IntegerType()))
new_df = new_df.withColumn("ProductID", new_df["ProductID"].cast(IntegerType()))
new_df = new_df.withColumn("Rating", new_df["Rating"].cast(DoubleType()))
new_df = new_df.withColumn("unixReviewTime", new_df["unixReviewTime"].cast(IntegerType()))

new_df.show()

+----------+---------+------+--------------+
|ReviewerID|ProductID|Rating|unixReviewTime|
+----------+---------+------+--------------+
|      null|   143502|   5.0|    1358380800|
|      null|   143529|   5.0|    1380672000|
|      null|   143561|   2.0|    1216252800|
|      null|   143588|   5.0|    1236902400|
|      null|   143588|   5.0|    1232236800|
|      null|   589012|   5.0|    1309651200|
|      null|   589012|   2.0|    1366675200|
|      null|   589012|   5.0|    1393804800|
|      null|   589012|   2.0|    1307404800|
|      null|   589012|   5.0|    1393718400|
|      null|   589012|   5.0|    1378425600|
|      null|   589012|   5.0|    1308182400|
|      null|   589012|   5.0|    1208995200|
|      null|   589012|   5.0|    1224979200|
|      null|   589012|   1.0|    1393113600|
|      null|   589012|   4.0|    1338681600|
|      null|   589012|   5.0|    1208908800|
|      null|   589012|   1.0|    1218412800|
|      null|   589012|   5.0|    1322956800|
|      nul

In [16]:
# Sourced from: https://stackoverflow.com/questions/44153575/fill-na-with-random-numbers-in-pyspark

new_df = new_df.withColumn('ReviewerID', F.coalesce(F.col('ReviewerID'), 
                                                (F.round(F.rand()*70000)))).collect()
new_df = spark.createDataFrame(new_df)

In [17]:
new_df.show()

+----------+---------+------+--------------+
|ReviewerID|ProductID|Rating|unixReviewTime|
+----------+---------+------+--------------+
|    5880.0|   143502|   5.0|    1358380800|
|   34719.0|   143529|   5.0|    1380672000|
|   57419.0|   143561|   2.0|    1216252800|
|   11312.0|   143588|   5.0|    1236902400|
|   26280.0|   143588|   5.0|    1232236800|
|   19783.0|   589012|   5.0|    1309651200|
|   27083.0|   589012|   2.0|    1366675200|
|   48798.0|   589012|   5.0|    1393804800|
|   49507.0|   589012|   2.0|    1307404800|
|     198.0|   589012|   5.0|    1393718400|
|   52916.0|   589012|   5.0|    1378425600|
|   40612.0|   589012|   5.0|    1308182400|
|    7960.0|   589012|   5.0|    1208995200|
|   10541.0|   589012|   5.0|    1224979200|
|   18935.0|   589012|   1.0|    1393113600|
|   53927.0|   589012|   4.0|    1338681600|
|   35594.0|   589012|   5.0|    1208908800|
|   54822.0|   589012|   1.0|    1218412800|
|   39938.0|   589012|   5.0|    1322956800|
|   68044.

In [18]:
new_df = new_df.withColumn("ReviewerID", new_df["ReviewerID"].cast(IntegerType()))
new_df = new_df.withColumn("ProductID", new_df["ProductID"].cast(IntegerType()))
new_df = new_df.withColumn("Rating", new_df["Rating"].cast(DoubleType()))
new_df = new_df.withColumn("unixReviewTime", new_df["unixReviewTime"].cast(IntegerType()))

In [19]:
new_df.dtypes

[('ReviewerID', 'int'),
 ('ProductID', 'int'),
 ('Rating', 'double'),
 ('unixReviewTime', 'int')]

In [20]:
new_df.show()

+----------+---------+------+--------------+
|ReviewerID|ProductID|Rating|unixReviewTime|
+----------+---------+------+--------------+
|      5880|   143502|   5.0|    1358380800|
|     34719|   143529|   5.0|    1380672000|
|     57419|   143561|   2.0|    1216252800|
|     11312|   143588|   5.0|    1236902400|
|     26280|   143588|   5.0|    1232236800|
|     19783|   589012|   5.0|    1309651200|
|     27083|   589012|   2.0|    1366675200|
|     48798|   589012|   5.0|    1393804800|
|     49507|   589012|   2.0|    1307404800|
|       198|   589012|   5.0|    1393718400|
|     52916|   589012|   5.0|    1378425600|
|     40612|   589012|   5.0|    1308182400|
|      7960|   589012|   5.0|    1208995200|
|     10541|   589012|   5.0|    1224979200|
|     18935|   589012|   1.0|    1393113600|
|     53927|   589012|   4.0|    1338681600|
|     35594|   589012|   5.0|    1208908800|
|     54822|   589012|   1.0|    1218412800|
|     39938|   589012|   5.0|    1322956800|
|     6804

In [21]:
del(df)

In [22]:
# COLLABORATIVE FILTERING 

from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder

In [23]:
(training, test) = new_df.randomSplit([0.8, 0.2])

In [24]:
training.filter("ReviewerID is NULL").show()
training.filter("ProductID is NULL").show()
training.filter("Rating is NULL").show()
training.filter("UnixReviewTime is NULL").show()

+----------+---------+------+--------------+
|ReviewerID|ProductID|Rating|unixReviewTime|
+----------+---------+------+--------------+
+----------+---------+------+--------------+

+----------+---------+------+--------------+
|ReviewerID|ProductID|Rating|unixReviewTime|
+----------+---------+------+--------------+
|       299|     null|   5.0|    1335657600|
|       613|     null|   3.0|    1330041600|
|       620|     null|   4.0|    1375574400|
|      1399|     null|   5.0|    1371686400|
|      1863|     null|   5.0|    1224806400|
|      1889|     null|   5.0|    1244419200|
|      2280|     null|   5.0|    1233705600|
|      2406|     null|   5.0|    1222300800|
|      2927|     null|   5.0|    1272240000|
|      3531|     null|   5.0|    1222473600|
|      3936|     null|   5.0|    1379030400|
|      3990|     null|   5.0|    1235001600|
|      4146|     null|   5.0|    1272585600|
|      4470|     null|   4.0|    1294099200|
|      4881|     null|   5.0|    1323216000|
|      50

In [25]:
test.filter("ReviewerID is NULL").show()
test.filter("ProductID is NULL").show()
test.filter("Rating is NULL").show()
test.filter("unixReviewTime is NULL").show()

+----------+---------+------+--------------+
|ReviewerID|ProductID|Rating|unixReviewTime|
+----------+---------+------+--------------+
+----------+---------+------+--------------+

+----------+---------+------+--------------+
|ReviewerID|ProductID|Rating|unixReviewTime|
+----------+---------+------+--------------+
|      2299|     null|   4.0|    1271635200|
|      5555|     null|   5.0|    1272672000|
|      7518|     null|   5.0|    1225843200|
|      9142|     null|   5.0|    1222300800|
|      9410|     null|   5.0|    1222387200|
|      9436|     null|   5.0|    1008547200|
|     10502|     null|   5.0|    1404259200|
|     12043|     null|   5.0|    1235001600|
|     12762|     null|   5.0|    1336780800|
|     12899|     null|   5.0|    1398384000|
|     15592|     null|   4.0|    1324598400|
|     16231|     null|   5.0|    1260316800|
|     16780|     null|   5.0|    1222300800|
|     16823|     null|   5.0|    1390003200|
|     18244|     null|   5.0|    1333584000|
|     211

In [26]:
training = training.dropna()
test = test.dropna()

In [27]:
als = ALS(maxIter=5, regParam=0.1, userCol="ReviewerID", itemCol="ProductID", ratingCol="Rating",
          coldStartStrategy="drop")
model = als.fit(training)

In [28]:
param_grid = ParamGridBuilder()\
.addGrid(als.rank, [12, 13, 14])\
.addGrid(als.maxIter, [18, 19, 20])\
.addGrid(als.regParam, [.17, .18, .19])\
.build()

In [29]:
evaluator = RegressionEvaluator(metricName="rmse", labelCol="Rating", 
                                predictionCol="prediction")

In [30]:
tvs = TrainValidationSplit(estimator=als, estimatorParamMaps=param_grid,
                          evaluator=evaluator)

In [31]:
model = tvs.fit(training)

In [32]:
best_model = model.bestModel

In [33]:
predictions = best_model.transform(test)
rmse = evaluator.evaluate(predictions)

In [34]:
print("RMSE = " + str(rmse))
print("**BEST MODEL**")
print("Rank: ", best_model.rank)
print("MaxIter: ", best_model._java_obj.parent().getMaxIter())
print("RegParam: ", best_model._java_obj.parent().getRegParam())

RMSE = 1.9129475852576983
**BEST MODEL**
Rank:  12
MaxIter:  20
RegParam:  0.19


In [35]:
display(predictions.sort("ReviewerID", "Rating"))

ReviewerID,ProductID,Rating,unixReviewTime,prediction
2,767853946,5.0,1404432000,2.8639176
4,767830520,5.0,1078876800,3.52406
7,767810856,4.0,1383782400,3.7566168
19,767832485,4.0,957312000,3.4473715
22,767805712,5.0,1179360000,3.736113
31,767806808,5.0,996969600,3.558663
42,780619765,5.0,1340496000,0.79268634
47,767802497,5.0,1360540800,2.8590975
50,767809246,5.0,1075248000,3.7837179
50,767802470,5.0,1392249600,3.9071658


In [42]:
predictions_df = predictions.toPandas()

predictions_df.to_csv('/home/luca/Dropbox/predictions.csv')