### Invoke SparkContext

In [1]:
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName('Spark project')
sc = SparkContext(conf=conf)
sc

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Spark Project") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [3]:
spark.sparkContext

### Load data

In [4]:
from pyspark import SQLContext
reviews = spark.read.options(header=True).csv("gs://bdpp-project-bucket/amazon_reviews_us_Mobile_Electronics_v1_00.tsv", sep="\t")

### See structure of data

In [5]:
reviews.printSchema()

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: string (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: string (nullable = true)
 |-- helpful_votes: string (nullable = true)
 |-- total_votes: string (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: string (nullable = true)



### Display data

In [6]:
from pyspark.sql.functions import *

reviews.show()

+-----------+-----------+--------------+----------+--------------+--------------------+------------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|  product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+------------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   20422322| R8MEA6IGAHO0B|B00MC4CED8|     217304173|BlackVue DR600GW-PMP|Mobile_Electronics|          5|            0|          0|   N|                Y|         Very Happy!|As advertised. Ev...| 2015-08-31|
|         US|   40835037|R31LOQ8JGLPRLK|B00OQMFG1Q|     137313254|GENSSI GSM / GPS ...|Mobile_Electronics|      

### Show distribution between ratings and how many reviews each customer has written

In [7]:
reviews.groupBy('star_rating').count().orderBy(desc('count')).show()
reviews.groupBy('customer_id').count().orderBy(desc('count')).show()

+-----------+-----+
|star_rating|count|
+-----------+-----+
|          5|52255|
|          4|18088|
|          1|17587|
|          3| 9734|
|          2| 7311|
+-----------+-----+

+-----------+-----+
|customer_id|count|
+-----------+-----+
|   15127646|   25|
|   29514513|   21|
|   12259799|   18|
|   43856165|   10|
|   52460215|   10|
|   19803990|   10|
|   34408569|   10|
|   53037408|    9|
|   49675502|    9|
|   45070473|    8|
|   53090839|    8|
|   50027179|    8|
|   51666042|    8|
|   32038204|    8|
|   38773014|    7|
|    7394955|    7|
|   17957446|    7|
|   51346302|    7|
|   34645354|    7|
|   44834233|    7|
+-----------+-----+
only showing top 20 rows



### See that there are more product ID's than product titles. ID should therefore be more reliable than title

In [8]:
reviews.select('product_title').distinct().count()

24770

In [9]:
reviews.select('product_id').distinct().count()

25801

### Drop attributes of no importance

In [10]:
reviews = reviews.select(['customer_id', 'review_id', 'product_id', 'product_title', 'star_rating', 'verified_purchase'])
reviews.show()

+-----------+--------------+----------+--------------------+-----------+-----------------+
|customer_id|     review_id|product_id|       product_title|star_rating|verified_purchase|
+-----------+--------------+----------+--------------------+-----------+-----------------+
|   20422322| R8MEA6IGAHO0B|B00MC4CED8|BlackVue DR600GW-PMP|          5|                Y|
|   40835037|R31LOQ8JGLPRLK|B00OQMFG1Q|GENSSI GSM / GPS ...|          5|                Y|
|   51469641|R2Y0MM9YE6OP3P|B00QERR5CY|iXCC Multi pack L...|          5|                Y|
|    4332923| RRB9C05HDOD4O|B00QUFTPV4|abcGoodefg® FBI C...|          4|                Y|
|   44855305|R26I2RI1GFV8QG|B0067XVNTG|Generic Car Dashb...|          2|                Y|
|    7846966| RY8DDL22YG4R5|B00KA6CCVY|Aweek® Air Acoust...|          3|                Y|
|   21299354|R2AT2426ZHFUHH|B00MJCDPM2|Sentey LS-4460 B-...|          3|                Y|
|   28902968|R3RRXU2R23NMQ9|B00ET5AWBY|iPad Car Headrest...|          5|                Y|

### Drop purchases that are not verified

In [11]:
reviews = reviews.filter(reviews['verified_purchase'] == 'Y')

### See if there are any missing values

In [12]:
reviews.select([count(when(col(column).isNull(), 1)).alias(column) for column in reviews.columns]).show()

+-----------+---------+----------+-------------+-----------+-----------------+
|customer_id|review_id|product_id|product_title|star_rating|verified_purchase|
+-----------+---------+----------+-------------+-----------+-----------------+
|          0|        0|         0|            0|          0|                0|
+-----------+---------+----------+-------------+-----------+-----------------+



### String indexing with pipelines

In [13]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline

# String index product_id.
prod_indexer = StringIndexer(inputCol='product_id', outputCol='product_id_num')

# String index customer_id.
cust_indexer = StringIndexer(inputCol='customer_id', outputCol='customer_id_num')

# String index star_rating.
star_indexer = StringIndexer(inputCol='star_rating', outputCol='star_rating_num')


pipe = Pipeline(stages = [prod_indexer, cust_indexer, star_indexer])

reviews_enc = pipe.fit(reviews).transform(reviews)
reviews_enc.show()

+-----------+--------------+----------+--------------------+-----------+-----------------+--------------+---------------+---------------+
|customer_id|     review_id|product_id|       product_title|star_rating|verified_purchase|product_id_num|customer_id_num|star_rating_num|
+-----------+--------------+----------+--------------------+-----------+-----------------+--------------+---------------+---------------+
|   20422322| R8MEA6IGAHO0B|B00MC4CED8|BlackVue DR600GW-PMP|          5|                Y|        2819.0|        74624.0|            0.0|
|   40835037|R31LOQ8JGLPRLK|B00OQMFG1Q|GENSSI GSM / GPS ...|          5|                Y|       12526.0|        32056.0|            0.0|
|   51469641|R2Y0MM9YE6OP3P|B00QERR5CY|iXCC Multi pack L...|          5|                Y|          12.0|        23246.0|            0.0|
|    4332923| RRB9C05HDOD4O|B00QUFTPV4|abcGoodefg® FBI C...|          4|                Y|        3110.0|        19854.0|            1.0|
|   44855305|R26I2RI1GFV8QG|B0067X

In [14]:
reviews_enc_only = reviews_enc.select('customer_id_num', 'product_id_num', 'star_rating_num')
reviews_enc_only.show()

+---------------+--------------+---------------+
|customer_id_num|product_id_num|star_rating_num|
+---------------+--------------+---------------+
|        74624.0|        2819.0|            0.0|
|        32056.0|       12526.0|            0.0|
|        23246.0|          12.0|            0.0|
|        19854.0|        3110.0|            1.0|
|          153.0|           5.0|            4.0|
|        29243.0|         961.0|            3.0|
|        18760.0|          81.0|            3.0|
|        47830.0|        1037.0|            0.0|
|        53289.0|       21230.0|            1.0|
|         3150.0|        3804.0|            0.0|
|        52090.0|        1310.0|            0.0|
|        59634.0|        2939.0|            3.0|
|        26120.0|         152.0|            0.0|
|        60022.0|       11492.0|            2.0|
|        66049.0|       10590.0|            4.0|
|        63856.0|         222.0|            0.0|
|        31337.0|        4847.0|            0.0|
|        30903.0|   

### Upload the dataset to the cloud

In [15]:
reviews_enc_only.coalesce(1).write.options(header=True).csv("gs://bdpp-project-bucket/outputs/reviews_encoded_only.csv")

### Train ALS model

In [16]:
from pyspark.ml.recommendation import ALS
# Split into train and test data.
training, test = reviews_enc.randomSplit([0.8, 0.2])

als = ALS(userCol='customer_id_num', itemCol='product_id_num', ratingCol='star_rating_num', \
          implicitPrefs=True , coldStartStrategy="drop", nonnegative=True)
trained_model = als.fit(training)

### Test ALS model

In [17]:
from pyspark.ml.evaluation import RegressionEvaluator
predictions = trained_model.transform(test)

evaluator = RegressionEvaluator(metricName='rmse', labelCol='star_rating_num', predictionCol='prediction')
rmse = evaluator.evaluate(predictions)
print('RMSE on test data: ' + str(rmse))

RMSE on test data: 1.56042009351


### Write RMSE result to GCP bucket

In [None]:
from google.cloud import storage

client = storage.Client()
bucket = client.bucket('bdpp-project-bucket')

blob = bucket.get_blob('outputs/rmse.txt')

blob.upload_from_string('RMSE on test data: ' + str(rmse))

### Cross-validation

In [None]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

params = ParamGridBuilder().addGrid(als.maxIter, [5, 10, 15])\
                            .addGrid(als.regParam, [0.001, 0.01, 0.1])\
                            .addGrid(als.rank, [1, 5, 10, 20]).build()
cv = CrossValidator(estimator=als, estimatorParamMaps=params, evaluator=evaluator, numFolds=4)
cv_model = cv.fit(training)
predictions = cv_model.transform(test)
rmse_cross = evaluator.evaluate(predictions)

print('RMSE score from cross-validation: ' + str(rmse_cross))

RMSE score from cross-validation: 1.56043494982


### Extract the best hyperparameters from the cross-validation

In [28]:
import numpy as np

best_parameters = cv_model.getEstimatorParamMaps()[np.argmax(cv_model.avgMetrics)]

best_maxIter = best_parameters.items()[0][1]
best_regParam = best_parameters.items()[1][1]
best_rank = best_parameters.items()[2][1]

### Write cross-validation score and best hyperparameters to GCP bucket

In [30]:
cross_rmse_blob = bucket.get_blob('outputs/rmse_cross.txt')
cross_rmse_blob.upload_from_string('RMSE score from cross-validation: ' + str(rmse) + '\n\nBest parameters: \n' \
                                   + 'Best number of max iterations: ' + str(best_maxIter) + '\nBest regularization parameter: ' + str(best_regParam)\
                                   + '\nBest rank value: ' + str(best_rank))

### Upload to the cloud the predictions vs actual values

In [None]:
predictions.select('customer_id', 'product_title', 'star_rating', 'star_rating_num', 'prediction').coalesce(1).write.options(header=True).csv("gs://bdpp-project-bucket/outputs/predictions.csv")

### Class customer which can add a new user to the data set and make recommendations for that customer.

In [None]:
import random

class Customer():
    def __init__(self):
        self.id = None
        self.purchase = {'product': None, 'rating': None}
        self.recommendations = None
        self.recommended_products = None
        self.customer_id_num = None
        self.dataset = None
        self.dataset_num = None
        
    def buyProduct(self, product, rating):
        self.purchase.update({'product': product, 'rating': rating})
        
    
    def IDToNum(self, the_dataset):
        cust_indexer = StringIndexer(inputCol='customer_id',outputCol='customer_id_num')
        prod_indexer = StringIndexer(inputCol='product_id', outputCol='product_id_num')
        star_indexer = StringIndexer(inputCol='star_rating', outputCol='star_rating_num')

        pipe = Pipeline(stages=[cust_indexer, prod_indexer, star_indexer])

        return pipe.fit(the_dataset).transform(the_dataset)

        
    def insertUser(self, the_dataset):
        temp_df = the_dataset.select(['customer_id', 'product_title', 'product_id', 'star_rating'])
        user_id_rows = the_dataset.select(['customer_id']).collect()
        user_ids = [int(customer_id[0].encode('ascii')) for customer_id in user_id_rows]
        temp = user_ids[0]      # An ID that already exists in the data set.
        while temp in user_ids:
            temp = random.randrange(1, 100000)
        self.id = str(temp)

        # Find out the product_id of the bought product and fetch the first product_id returned.
        product_id = self.getProductID(temp_df, self.purchase.get('product'))
        # Add new user to data set.
        new_row = spark.createDataFrame([(self.id, self.purchase.get('product'), product_id, self.purchase.get('rating'))])
        # Append the new row.
        self.dataset = temp_df.union(new_row)

        self.dataset_num = self.IDToNum(self.dataset)
        
        self.customer_id_num = self.dataset_num.select(['customer_id_num']).filter(self.dataset_num['customer_id'] == self.id).collect()[0][0]
    
    def trainModel(self, n_recommendations):
        als = ALS(maxIter = best_maxIter, regParam = best_regParam, rank = best_rank,\
                  userCol='customer_id_num', itemCol='product_id_num', ratingCol='star_rating_num', \
                  implicitPrefs=True , coldStartStrategy="drop", nonnegative=True)
        self.recommendations = als.fit(self.dataset_num).recommendForAllUsers(n_recommendations)
        
    
    def queryRecommendations(self):
        recommendations_list = []
        
        # Index of new user.
        idx_new_user = [product_id_num[0] for product_id_num in self.recommendations.select(['customer_id_num'])\
                        .collect()].index(self.customer_id_num)
        recommendations = [recommendation for recommendation in self.recommendations.select('recommendations')\
                           .filter(self.recommendations['customer_id_num'] == self.customer_id_num).collect()[0][0]]
        
        # Get title of every recommended product.
        for index, recommendation in enumerate(recommendations):
            recommendations_list.append(\
                                self.dataset_num.select(['product_title']).filter(self.dataset_num['product_id_num'] == recommendation[0]).collect()[0][0]\
                                       .encode('ascii'))

        self.recommended_products = recommendations_list
    
    def getProductID(self, the_dataset, product):
        return the_dataset.select(['product_id']).filter(the_dataset['product_title'] == product).collect()[0][0]
    
    def printRecommendations(self):
        string_to_print = 'Based on your purchase of ' + self.purchase.get('product') + ', you might also like:\n'
        print('Based on your purchase of ' + self.purchase.get('product') + ', you might also like:')
        for number, recommendation in enumerate(self.recommended_products, 1):
            string_to_print += (str(number) + '): ' + recommendation + '\n')
            print(str(number) + '): ' + recommendation)
        
        # Send output to the GCP bucket.
        final_blob = bucket.get_blob('outputs/recommendations.txt')
        final_blob.upload_from_string(string_to_print)

In [None]:
new_customer = Customer()
new_customer.buyProduct(product='2-Port USB Car Charger Adapter', rating='5')
new_customer.insertUser(reviews_enc)
new_customer.trainModel(5)
new_customer.queryRecommendations()
new_customer.printRecommendations()

Based on your purchase of 2-Port USB Car Charger Adapter, you might also like:
1): iXCC Lightning Cable 3ft, iPhone charger, for iPhone X, 8, 8 Plus, 7, 7 Plus, 6s, 6s Plus, 6, 6 Plus, SE 5s 5c 5, iPad Air 2 Pro, iPad mini 2 3 4, iPad 4th Gen [Apple MFi Certified](Black and White)
2): New Trent Easypak 7000mAh Portable Triple USB Port External Battery Charger/Power Pack for Smartphones, Tablets and more (w/built-in USB cable)
3): iXCC Multi pack Lightning cable
4): Bluetooth Receiver, Breett Bluetooth 4.1 Receiver, Multipoint Connection Bluetooth Audio Music Receiver with 3.5mm AUX Port Hands Free Calling for Car Stereo/Home Stereo/Headphone et
5): eForCity Leather Case for Barnes and Noble Nook / Nook Color, Black
