# Spark Recommendation

## Dataset

Dataset used : Amazon Kindle Store Reviews -170MB (http://jmcauley.ucsd.edu/data/amazon/)

## Spark Init

In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.appName("simple recomendation").config("spark.executor.memory","8G").config("spark.driver.memory", "8G").getOrCreate()

In [4]:
print(spark)

<pyspark.sql.session.SparkSession object at 0x0000022B7FE8DA20>


## Load Dataset

In [5]:
#since the data has no header, let's define it here
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType

schemas = StructType([
    StructField("user_id", StringType(), True),
    StructField("item_id", StringType(), True),
    StructField("rating", DoubleType(), True),
    StructField("timestamp",IntegerType(),True)])

In [6]:
df = spark.read.csv("D:\Repos\Resource\Kindle-Store-Reviews\kindle-store.csv", header=False, schema=schemas)

In [7]:
df.schema

StructType(List(StructField(user_id,StringType,true),StructField(item_id,StringType,true),StructField(rating,DoubleType,true),StructField(timestamp,IntegerType,true)))

In [8]:
df.count()

3205467

In [9]:
df.show()

+--------------+----------+------+----------+
|       user_id|   item_id|rating| timestamp|
+--------------+----------+------+----------+
|A2GZ9GFZV1LWB0|1603420304|   4.0|1405209600|
|A1K7VSUDCVAPW8|1603420304|   3.0|1282176000|
|A35J5XRE5ZT6H2|1603420304|   4.0|1365206400|
|A3DGZNFSMNWSX5|1603420304|   4.0|1285632000|
|A2CVDQ6H36L4VL|1603420304|   5.0|1342396800|
|A3U0EV8PXX6G2O|1603420304|   4.0|1383004800|
|A18KZLAOO17CH7|1603420304|   5.0|1315958400|
|A1RYEDOAVTVC97|1603420304|   4.0|1253491200|
| AW4GYRDGPXSHC|1603420304|   5.0|1284336000|
|A3RM23R5S3OBO5|1603420304|   5.0|1323216000|
| A5T79U2IMPYTY|1603420304|   3.0|1367280000|
|A109D193EJ98V2|1603420304|   5.0|1325808000|
|A3VX4QRMRMG143|1603420304|   4.0|1370995200|
|A3QAL688R9KZT4|1603420304|   5.0|1324512000|
|A36TM5HAM32TMO|1603420304|   5.0|1320019200|
| AL69F6UZDPHFZ|1603420304|   5.0|1381881600|
|A2WOB3XO3160CX|1603420304|   4.0|1356912000|
|A22ZT1X0VUYNIV|1603420304|   5.0|1281916800|
| AQFYGWUOGWBXY|1603420304|   5.0|

In [10]:
df.filter(df.user_id=='A18KZLAOO17CH7').show()

+--------------+----------+------+----------+
|       user_id|   item_id|rating| timestamp|
+--------------+----------+------+----------+
|A18KZLAOO17CH7|1603420304|   5.0|1315958400|
|A18KZLAOO17CH7|B00BAVVC6O|   5.0|1371686400|
|A18KZLAOO17CH7|B00F12H1IQ|   5.0|1388620800|
|A18KZLAOO17CH7|B00F12H1MM|   5.0|1388620800|
|A18KZLAOO17CH7|B00FHQI2KM|   5.0|1380931200|
+--------------+----------+------+----------+



## Creating Model

In [11]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

In [12]:
# since user col need to be an int value, we need to change our 'string' ids to int
from pyspark.ml.feature import IndexToString, StringIndexer

In [13]:
# convert user_id (String) to Float by using StringIndexer
stringindexer = StringIndexer(inputCol='user_id',outputCol='user_id_int')
stringindexer.setHandleInvalid("keep")
model = stringindexer.fit(df)
indexed = model.transform(df)

In [14]:
# convert item_id (String) to Float by using StringIndexer
stringindexer_item = StringIndexer(inputCol='item_id',outputCol='item_id_int')
stringindexer_item.setHandleInvalid("keep") 
model = stringindexer_item.fit(indexed)
indexed = model.transform(indexed)

In [15]:
indexed.show()

+--------------+----------+------+----------+-----------+-----------+
|       user_id|   item_id|rating| timestamp|user_id_int|item_id_int|
+--------------+----------+------+----------+-----------+-----------+
|A2GZ9GFZV1LWB0|1603420304|   4.0|1405209600|  1397483.0|     4102.0|
|A1K7VSUDCVAPW8|1603420304|   3.0|1282176000|  1360487.0|     4102.0|
|A35J5XRE5ZT6H2|1603420304|   4.0|1365206400|  1016904.0|     4102.0|
|A3DGZNFSMNWSX5|1603420304|   4.0|1285632000|   533144.0|     4102.0|
|A2CVDQ6H36L4VL|1603420304|   5.0|1342396800|   997297.0|     4102.0|
|A3U0EV8PXX6G2O|1603420304|   4.0|1383004800|   760453.0|     4102.0|
|A18KZLAOO17CH7|1603420304|   5.0|1315958400|    77896.0|     4102.0|
|A1RYEDOAVTVC97|1603420304|   4.0|1253491200|   669260.0|     4102.0|
| AW4GYRDGPXSHC|1603420304|   5.0|1284336000|   436551.0|     4102.0|
|A3RM23R5S3OBO5|1603420304|   5.0|1323216000|  1056593.0|     4102.0|
| A5T79U2IMPYTY|1603420304|   3.0|1367280000|   472655.0|     4102.0|
|A109D193EJ98V2|1603

In [16]:
indexed.filter(indexed.user_id_int==77896.0).show()

+--------------+----------+------+----------+-----------+-----------+
|       user_id|   item_id|rating| timestamp|user_id_int|item_id_int|
+--------------+----------+------+----------+-----------+-----------+
|A18KZLAOO17CH7|1603420304|   5.0|1315958400|    77896.0|     4102.0|
|A18KZLAOO17CH7|B00BAVVC6O|   5.0|1371686400|    77896.0|     5678.0|
|A18KZLAOO17CH7|B00F12H1IQ|   5.0|1388620800|    77896.0|   193578.0|
|A18KZLAOO17CH7|B00F12H1MM|   5.0|1388620800|    77896.0|   195243.0|
|A18KZLAOO17CH7|B00FHQI2KM|   5.0|1380931200|    77896.0|     3415.0|
+--------------+----------+------+----------+-----------+-----------+



In [17]:
indexed.filter(indexed.user_id=='A18KZLAOO17CH7').show()

+--------------+----------+------+----------+-----------+-----------+
|       user_id|   item_id|rating| timestamp|user_id_int|item_id_int|
+--------------+----------+------+----------+-----------+-----------+
|A18KZLAOO17CH7|1603420304|   5.0|1315958400|    77896.0|     4102.0|
|A18KZLAOO17CH7|B00BAVVC6O|   5.0|1371686400|    77896.0|     5678.0|
|A18KZLAOO17CH7|B00F12H1IQ|   5.0|1388620800|    77896.0|   193578.0|
|A18KZLAOO17CH7|B00F12H1MM|   5.0|1388620800|    77896.0|   195243.0|
|A18KZLAOO17CH7|B00FHQI2KM|   5.0|1380931200|    77896.0|     3415.0|
+--------------+----------+------+----------+-----------+-----------+



In [18]:
indexed = indexed.selectExpr(['user_id_int as uid','item_id_int as iid','rating as rating'])

In [19]:
indexed.show(2)

+---------+------+------+
|      uid|   iid|rating|
+---------+------+------+
|1397483.0|4102.0|   4.0|
|1360487.0|4102.0|   3.0|
+---------+------+------+
only showing top 2 rows



In [20]:
# total 3205467 rows
(training, test) = indexed.randomSplit([0.8, 0.2])

In [21]:
# training
als = ALS(maxIter=5, regParam=0.01, userCol="uid", itemCol="iid", ratingCol="rating",
          coldStartStrategy="drop")
model = als.fit(training)

In [22]:
# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test.select(['uid','iid','rating']))
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")

In [23]:
predictions.show()

+--------+-----+------+------------+
|     uid|  iid|rating|  prediction|
+--------+-----+------+------------+
|144011.0|148.0|   2.0|   0.1391947|
| 39621.0|148.0|   3.0|  0.19290876|
|276947.0|148.0|   5.0| -0.09074353|
| 10210.0|148.0|   2.0|  0.29808566|
| 93572.0|148.0|   4.0| -0.43418026|
|275605.0|148.0|   3.0|-0.105560526|
| 69878.0|148.0|   2.0|  0.61052364|
|210414.0|148.0|   5.0|  -0.5430999|
|265943.0|148.0|   1.0|  -0.5144456|
|327477.0|148.0|   5.0|  0.25743455|
|129602.0|148.0|   3.0|   0.9436585|
| 12199.0|148.0|   4.0|  -2.5957139|
|226318.0|148.0|   5.0|  0.78879833|
|377273.0|148.0|   2.0|  0.07713748|
|299252.0|148.0|   4.0|  0.37340653|
|226919.0|148.0|   1.0| -0.06115124|
|186500.0|148.0|   4.0|   1.1710051|
|348541.0|148.0|   4.0|  0.18466339|
|305797.0|148.0|   3.0|-0.014746324|
|281713.0|148.0|   1.0|    2.875606|
+--------+-----+------+------------+
only showing top 20 rows



In [24]:
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 5.84537221194402


In [25]:
# Generate top 10 kindle recommendations for each user
userRecs = model.recommendForAllUsers(10)
# Generate top 10 user recommendations for each kindle
kindleRecs = model.recommendForAllItems(10)

In [26]:
userRecs.show()

+----+--------------------+
| uid|     recommendations|
+----+--------------------+
| 148|[[56765, 16.52278...|
| 463|[[94913, 11.43423...|
| 471|[[120684, 14.5425...|
| 496|[[49901, 15.42053...|
| 833|[[79767, 14.08148...|
|1088|[[50654, 18.24583...|
|1238|[[51039, 19.92655...|
|1342|[[75118, 16.27201...|
|1580|[[128666, 17.167]...|
|1591|[[52704, 22.09821...|
|1645|[[60021, 18.23156...|
|1829|[[75544, 18.52557...|
|1959|[[80190, 22.79922...|
|2122|[[72421, 22.06683...|
|2142|[[62463, 11.35798...|
|2366|[[77577, 14.39145...|
|2659|[[37696, 17.07533...|
|2866|[[52881, 31.15724...|
|3175|[[56425, 25.29191...|
|3749|[[26374, 14.47831...|
+----+--------------------+
only showing top 20 rows



In [27]:
kindleRecs.show()

+----+--------------------+
| iid|     recommendations|
+----+--------------------+
| 148|[[20306, 15.22369...|
| 463|[[48859, 25.33829...|
| 471|[[88895, 19.14617...|
| 496|[[350625, 27.6201...|
| 833|[[54895, 22.03339...|
|1088|[[50558, 31.99250...|
|1238|[[34092, 43.57119...|
|1342|[[47408, 34.96792...|
|1580|[[125090, 34.1654...|
|1591|[[40294, 37.99627...|
|1645|[[163099, 46.7077...|
|1829|[[65620, 25.52935...|
|1959|[[28731, 35.70830...|
|2122|[[60187, 21.38083...|
|2142|[[54735, 40.59636...|
|2366|[[42712, 31.12254...|
|2659|[[40294, 51.71845...|
|2866|[[19133, 48.26461...|
|3175|[[16528, 48.2196]...|
|3749|[[163099, 35.6878...|
+----+--------------------+
only showing top 20 rows

