In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

In [2]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from pyspark.ml import Pipeline
from pyspark.sql.functions import *
from pyspark.sql.types import *


from pyspark.sql import Row

from pyspark.ml.recommendation import ALS, ALSModel
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator


In [3]:
dvd = spark.read \
    .option("quote", "\"")  \
    .option("escape", "\"") \
    .option("ignoreLeadingWhiteSpace",True) \
    .csv("/user/klaurens/project/project/amazon_reviews_us_Digital_Video_Download_v1_00.tsv",inferSchema=True,header=True, sep='\t' )

In [4]:
dvd.count()

4057147

In [5]:
dvd.printSchema()

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: timestamp (nullable = true)



In [5]:
dvd.select("customer_id", "product_id", "star_rating").show(1)

+-----------+----------+-----------+
|customer_id|product_id|star_rating|
+-----------+----------+-----------+
|   12190288|B00AYB1482|          5|
+-----------+----------+-----------+
only showing top 1 row



In [14]:
new_product_id = sqlContext.createDataFrame(dvd.rdd.map(lambda x: x[3]).zipWithIndex(), \
        StructType([StructField("product_id", StringType(), True),StructField("productID", IntegerType(), True)]))

new_product_id.show(2)

+----------+---------+
|product_id|productID|
+----------+---------+
|B00AYB1482|        0|
|B00KQD28OM|        1|
+----------+---------+
only showing top 2 rows



In [16]:
dvd.printSchema()

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: timestamp (nullable = true)



In [6]:
from pyspark.ml.feature import StringIndexer

In [7]:
indexer = StringIndexer(inputCol="product_id", outputCol="productID")
df_trans = indexer.fit(dvd).transform(dvd)
df_trans.show(2)

+-----------+-----------+--------------+----------+--------------+--------------------+--------------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+---------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|    product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|        review_date|productID|
+-----------+-----------+--------------+----------+--------------+--------------------+--------------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+---------+
|         US|   12190288|R3FU16928EP5TC|B00AYB1482|     668895143|Enlightened: Seas...|Digital_Video_Dow...|          5|            0|          0|   N|                Y|I loved it and I ...|I loved it and I ...|2015-08-31 00:00:00|    411.0|
|         US|   30549954|R1IZHHS

In [8]:
df_trans.select(F.countDistinct("product_id")).show()

+--------------------------+
|count(DISTINCT product_id)|
+--------------------------+
|                    166748|
+--------------------------+



In [9]:
df_trans.select(F.countDistinct("productID")).show()

+-------------------------+
|count(DISTINCT productID)|
+-------------------------+
|                   166748|
+-------------------------+



In [33]:
df_trans.select('customer_id','productID','star_rating').show(5)

+-----------+---------+-----------+
|customer_id|productID|star_rating|
+-----------+---------+-----------+
|   12190288|    411.0|          5|
|   30549954|   1203.0|          5|
|   52895410|  58539.0|          4|
|   27072354|    291.0|          5|
|   26939022|  11079.0|          5|
+-----------+---------+-----------+
only showing top 5 rows



In [34]:
df_trans.printSchema()

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: timestamp (nullable = true)
 |-- productID: double (nullable = false)



In [10]:
df_trans = df_trans.withColumn("productIDint", df_trans['productID'].cast('integer'))

In [11]:
df_trans.printSchema()

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: timestamp (nullable = true)
 |-- productID: double (nullable = false)
 |-- productIDint: integer (nullable = true)



In [37]:
df_trans.select(F.countDistinct("productIDint")).show()

+----------------------------+
|count(DISTINCT productIDint)|
+----------------------------+
|                      166748|
+----------------------------+



In [38]:
df_trans.select('customer_id','productIDint','star_rating').show(5)

+-----------+------------+-----------+
|customer_id|productIDint|star_rating|
+-----------+------------+-----------+
|   12190288|         411|          5|
|   30549954|        1203|          5|
|   52895410|       58539|          4|
|   27072354|         291|          5|
|   26939022|       11079|          5|
+-----------+------------+-----------+
only showing top 5 rows



In [43]:
df_trans.select("customer_id").distinct().count()

2038128

In [44]:
df_trans.select("productIDint").distinct().count()

166748

In [41]:
# calculate the sparsity of the matrix

def get_mat_sparsity(ratings):
    # Count the total number of ratings in the dataset
    count_nonzero = ratings.select("star_rating").count()
    
    print(count_nonzero)
    # Count the number of distinct userIds and distinct movieIds
    total_elements = ratings.select("customer_id").distinct().count() * ratings.select("productIDint").distinct().count()
    print(total_elements)
    # Divide the numerator by the denominator
    sparsity = (1.0 - (count_nonzero *1.0)/total_elements)*100
    print("The ratings dataframe is ", "%.2f" % sparsity + "% sparse.")

In [42]:
get_mat_sparsity(df_trans)

4057147
339853767744
The ratings dataframe is  100.00% sparse.


### Collaborative Filtering

In [12]:
(train, test) = df_trans.randomSplit([0.8, 0.2], seed = 0)

In [13]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

In [14]:
als = ALS(userCol="customer_id", itemCol="productIDint", ratingCol="star_rating",coldStartStrategy="drop")

param_grid = ParamGridBuilder().addGrid(
    als.rank,
    [10, 30, 50],
).addGrid(
    als.regParam,
    [0.1, 0.5, 1],
).build()

evaluator = RegressionEvaluator(metricName="rmse", labelCol="star_rating",
                                predictionCol="prediction")
cv = CrossValidator(estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=3, seed=123)
cv_als_model = cv.fit(train)

In [16]:
cv_als_model.bestModel

ALS_b9e3218485e9

In [17]:
predictions = cv_als_model.bestModel.transform(test)
rmse = evaluator.evaluate(predictions)

In [17]:
# grid search cv
# Rank is very important: bigger rank = larger dimension = cpature more info

In [18]:
rmse

1.4209542212422317

In [19]:
# Print "Rank"
print("  Rank:", cv_als_model.bestModel._java_obj.parent().getRank())
# Print "MaxIter"
print("  MaxIter:", cv_als_model.bestModel._java_obj.parent().getMaxIter())
# Print "RegParam"
print("  RegParam:", cv_als_model.bestModel._java_obj.parent().getRegParam())

  Rank: 50
  MaxIter: 10
  RegParam: 0.5


In [20]:
recommendations = cv_als_model.bestModel.recommendForAllUsers(5)
recommendations.show()

+-----------+--------------------+
|customer_id|     recommendations|
+-----------+--------------------+
|      10206|[[140578, 6.92995...|
|      12940|[[140578, 2.23327...|
|      13832|[[140578, 4.27202...|
|      18051|[[140578, 6.15860...|
|      28759|[[140578, 6.10432...|
|      44822|[[140578, 6.61784...|
|      61051|[[61817, 4.383597...|
|      69637|[[140578, 5.88298...|
|      76493|[[140578, 4.95292...|
|      78120|[[140578, 5.49418...|
|      91446|[[140578, 6.05820...|
|     102960|[[140578, 5.08396...|
|     109613|[[153328, 5.95451...|
|     134748|[[140578, 6.27201...|
|     156366|[[140578, 6.21699...|
|     169588|[[140578, 5.09119...|
|     171452|[[140578, 5.71582...|
|     184096|[[140578, 6.11790...|
|     185494|[[140578, 5.42780...|
|     224663|[[140578, 5.65666...|
+-----------+--------------------+
only showing top 20 rows



In [21]:
nrecommendations = recommendations\
    .withColumn("ls", explode("recommendations"))\
    .select('customer_id', col("ls.productIDint"), col("ls.rating"))

In [22]:
nrecommendations_user = nrecommendations.filter('customer_id = 10206')

In [23]:
nrecommendations_user.show(1)

+-----------+------------+--------+
|customer_id|productIDint|  rating|
+-----------+------------+--------+
|      10206|      140578|6.929952|
+-----------+------------+--------+
only showing top 1 row



In [32]:
df_trans_user = df_trans.filter('customer_id = 10206')