<a href="https://colab.research.google.com/github/ShannonElise86/ShannonElise86.gihub.io/blob/master/Final_Capstone_Model_and_Recommendations.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Recommender Model



> ***Overview***

In this notebook, I use the Alternating Least Squares pipeline from Apache Spark. Then, the model with the best predictions is used to give the top ten recommended products for each user. 

***Challenges and Limitations***



> Ideally, this model would be used to generate recommendations from a dataset containing millions of reviews. However, without access to a distributed computing framework, I am limited to a smaller dataset using my personal laptop. 



> Additionally, I use the ParamGrid package to generate multiple combinations of parameters to test the model. Unfortunately, the memory available cannot handle more than a few combinations at once. To accomodate this, I will use the ParamGrid a number of times to tune the model.







In [0]:
#Install Java and Apache Spark
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz

In [6]:
# Install spark-related depdencies for Python
!pip install -q findspark
!pip install pyspark



In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

In [8]:
from google.colab import drive
drive.mount('/content/gdrive')

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


In [0]:
import json
import gzip
import pandas as pd

from pyspark import SparkContext
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.functions import isnan, when, count, col

import numpy as np
import functools
%matplotlib inline

from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator

In [0]:
DATA_NAME = '/content/gdrive/My Drive/lux_beauty_index.json'
APP_NAME = "Amazon Clothing Recommender"
SPARK_URL = "local[*]"
MAX_MEMORY = "6g"

In [0]:
sc = SparkSession.builder.appName(APP_NAME).config('spark.driver.memory', MAX_MEMORY).config('spark.driver.memory', MAX_MEMORY).master(SPARK_URL).getOrCreate()
sc.sparkContext.setCheckpointDir('/tmp')
sqlContext = SQLContext(sc)

In [0]:
df = sqlContext.read.json(DATA_NAME)

In [13]:
print(f"Dataset shape is {df.count():d} rows by {len(df.columns):d} columns.")

Dataset shape is 27823 rows by 6 columns.


In [14]:
df.printSchema()

root
 |-- productID: string (nullable = true)
 |-- product_index: long (nullable = true)
 |-- rating: double (nullable = true)
 |-- reviewerID: string (nullable = true)
 |-- reviewer_index: long (nullable = true)
 |-- title: string (nullable = true)



In [15]:
df.show()

+----------+-------------+------+--------------+--------------+--------------------+
| productID|product_index|rating|    reviewerID|reviewer_index|               title|
+----------+-------------+------+--------------+--------------+--------------------+
|B000142FVW|          236|   4.0|A3DZFW2XL5S6XK|          1338|OPI Nail Lacquer,...|
|B0001435D4|          643|   1.0|A2LAFL37V088EU|            24|OPI Nail Lacquer ...|
|B00014GT8W|          353|   3.0|A2V92F5R7MLCVI|           326|Dermablend Cover ...|
|B00014GT8W|          353|   4.0| AS44QEHT3KSPK|           330|Dermablend Cover ...|
|B00021DXVA|          264|   5.0|A19UTUEBWKIZFT|          1947|VINCENT LONGO Wat...|
|B00021DXVA|          264|   4.0|A13KW5I9IHQ039|          3382|VINCENT LONGO Wat...|
|B00025WYZC|          786|   4.0|A2FNA5903D9E6Y|           653|Crabtree & Evelyn...|
|B0002RI2PG|           86|   5.0| AQCYEZYS51OBC|          3782|Dermablend Loose ...|
|B0002RI2PG|           86|   4.0|A2ZJPDH0YP90SD|          2773|De

### Checking the Sparsity

The ALS model builds a matrix of reviewers and products, filling in the ratings given. Many of the cells in the matrix are not filled, hence, a sparse matrix. If the matrix is too sparse, it is difficult to get accurate predictions. Having a matrix with no less than 0.05% filled is a baseline goal for the collaborative filltering method.

In [0]:
numerator = df.count()

In [0]:
total_reviewers = df.select('reviewer_index').distinct().count()
total_products = df.select('product_index').distinct().count()

In [0]:
denominator = total_reviewers * total_products

In [19]:
#sparsity
sparsity = ((numerator*1.0/denominator)*100)
print('Percent of the matrix filled:')
print(sparsity)

Percent of the matrix filled:
0.4730196224173525


With 0.47% of the matrix filled in, we're off to a good start!

## Alternating Least Squares 

To begin selecting the best model, I start by building a baseline model using the default parameters. Then, to tune the model, I use a ParamGrid to try combinations of parameters.

In [0]:
#Split the data
(training_data, test_data) = df.randomSplit([0.8, 0.2], seed=123)

In [0]:
#build a baseline model using default parameters
als_model = ALS(userCol='reviewer_index', itemCol='product_index', ratingCol='rating', 
                rank=10, maxIter=10, regParam= 0.1, 
                coldStartStrategy='drop', nonnegative=True, implicitPrefs=False
)

In [0]:
#Fit the model
base_model = als_model.fit(training_data)

In [23]:
#Get predictions
predictions = base_model.transform(test_data)
predictions.show()

+----------+-------------+------+--------------+--------------+--------------------+----------+
| productID|product_index|rating|    reviewerID|reviewer_index|               title|prediction|
+----------+-------------+------+--------------+--------------+--------------------+----------+
|B00UKOAYQQ|          148|   4.0|A3R5GTYQ50QVMD|          1395|Jack Black Gel Po...| 3.5303087|
|B00UKOAYQQ|          148|   5.0|A1QBOC76MIOJYP|          1615|Jack Black Gel Po...| 3.7227428|
|B00UKOAYQQ|          148|   5.0| AVU1ILDDYW301|           434|Jack Black Gel Po...| 4.2082872|
|B00UKOAYQQ|          148|   3.0| AYNRALJ4X1COS|          2860|Jack Black Gel Po...| 3.3061848|
|B00UKOAYQQ|          148|   5.0|A3NHUQ33CFH3VM|          3110|Jack Black Gel Po...| 4.6977525|
|B00E4WWS46|          463|   4.0| A7WLP2ROPFTUQ|          1964|blinc Eyeliner Pe...| 3.6517034|
|B00U1DGFNW|          471|   4.0| AM83WJG9BE2EP|           165|Vichy Pureté Ther...|  4.061397|
|B00U1DGFNW|          471|   5.0| ALYZJ7

In [24]:
# Complete the evaluator code
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")

# Extract the 3 parameters
print(evaluator.getMetricName())
print(evaluator.getLabelCol())
print(evaluator.getPredictionCol())

rmse
rating
prediction


In [25]:
# Evaluate the "predictions" dataframe
RMSE = evaluator.evaluate(predictions)

# Print the RMSE
print (RMSE)

1.0561215355390317


### Tuning the Model
Round 1

In [0]:
#build the model
als_model = ALS(userCol='reviewer_index', itemCol='product_index', ratingCol='rating',  
                coldStartStrategy='drop', nonnegative=True, implicitPrefs=False
)

In [0]:
#The parameter grid gives different combinations of parameters to test the model
param_grid = ParamGridBuilder() \
              .addGrid(als_model.rank, [10, 15]) \
              .addGrid(als_model.maxIter, [10, 15, 20]) \
              .addGrid(als_model.regParam, [.1, 0.5]) \
              .build()

In [28]:
#Evaluate the predictions by using rsme
evaluator = RegressionEvaluator(metricName='rmse', labelCol='rating', predictionCol='prediction')
print('Number of models to be tested: ', len(param_grid))

Number of models to be tested:  12


In [29]:
#Cross validation
cv = CrossValidator(estimator=als_model, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=5)
print(cv)

CrossValidator_cdbc536a4383


In [0]:
#Fit the model
model = cv.fit(training_data)

In [0]:
#Extract the best combinaton of parameters
best_model = model.bestModel

In [0]:
test_predictions = best_model.transform(test_data)
# Calculate the RMSE of test_predictions
RMSE_1 = evaluator.evaluate(test_predictions)

In [33]:
print('RMSE_1 = ' + str(RMSE_1))
print('***Best Model***')
print('Rank: ', (best_model.rank))
print('MaxIter: ', (best_model._java_obj.parent().getMaxIter()))
print('RegParam: ', (best_model._java_obj.parent().getRegParam()))

RMSE_1 = 1.0442751762512295
***Best Model***
Rank:  15
MaxIter:  20
RegParam:  0.1


### Tuning the model
Round 2

Since the local drive cannot handle more than 12 combinations, I repeated the process with increased rank and maxIter parameters to try to lower the RMSE.

In [0]:
param_grid = ParamGridBuilder() \
              .addGrid(als_model.rank, [20, 25]) \
              .addGrid(als_model.maxIter, [10, 15, 20]) \
              .addGrid(als_model.regParam, [.1, .5]) \
              .build()

In [35]:
evaluator = RegressionEvaluator(metricName='rmse', labelCol='rating', predictionCol='prediction')
print('Number of models to be tested: ', len(param_grid))

Number of models to be tested:  12


In [0]:
cv = CrossValidator(estimator=als_model, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=5)

In [0]:
model = cv.fit(training_data)

In [0]:
best_model = model.bestModel

In [0]:
test_predictions = best_model.transform(test_data)
RMSE_2 = evaluator.evaluate(test_predictions)

In [40]:
print('RMSE_2 = ' + str(RMSE_2))
print('***Best Model***')
print('Rank: ', (best_model.rank))
print('MaxIter: ', (best_model._java_obj.parent().getMaxIter()))
print('RegParam: ', (best_model._java_obj.parent().getRegParam()))

RMSE_2 = 1.0372083563619778
***Best Model***
Rank:  25
MaxIter:  20
RegParam:  0.1


## Tuning the model: 
Round 3

At this point, the RMSE has gotten down to 1.033. Each round the highest parameter has been the best. Going with this pattern, I bumped up the rank and maxIter to see if I can get the RMSE down again. I've also eliminated multiple values for regParam since it hasn't made an impact in the previous combinations.

In [0]:
param_grid = ParamGridBuilder() \
              .addGrid(als_model.rank, [30, 35, 40]) \
              .addGrid(als_model.maxIter, [20, 25, 30]) \
              .addGrid(als_model.regParam, [.1]) \
              .build()

In [42]:
evaluator = RegressionEvaluator(metricName='rmse', labelCol='rating', predictionCol='prediction')
print('Number of models to be tested: ', len(param_grid))

Number of models to be tested:  9


In [0]:
cv = CrossValidator(estimator=als_model, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=5)

In [0]:
model = cv.fit(training_data)

In [0]:
best_model = model.bestModel

In [0]:
test_predictions = best_model.transform(test_data)

RMSE_3 = evaluator.evaluate(test_predictions)

In [47]:
print('RMSE_3 = ' + str(RMSE_3))
print('***Best Model***')
print('Rank: ', (best_model.rank))
print('MaxIter: ', (best_model._java_obj.parent().getMaxIter()))

RMSE_3 = 1.0267772891445814
***Best Model***
Rank:  40
MaxIter:  20


## One more round

Just to see if we can get the RSME a any lower.

In [0]:
param_grid = ParamGridBuilder() \
              .addGrid(als_model.rank, [50, 55 ]) \
              .addGrid(als_model.maxIter, [20, 50, 100]) \
              .addGrid(als_model.regParam, [.1]) \
              .build()

In [49]:
evaluator = RegressionEvaluator(metricName='rmse', labelCol='rating', predictionCol='prediction')
print('Number of models to be tested: ', len(param_grid))

Number of models to be tested:  6


In [0]:
cv = CrossValidator(estimator=als_model, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=5)

In [0]:
model = cv.fit(training_data)

In [0]:
best_model = model.bestModel

In [0]:
test_predictions = best_model.transform(test_data)

RMSE_4 = evaluator.evaluate(test_predictions)

In [54]:
print('RMSE_4 = ' + str(RMSE_4))
print('***Best Model***')
print('Rank: ', (best_model.rank))
print('MaxIter: ', (best_model._java_obj.parent().getMaxIter()))

RMSE_4 = 1.0261489390009348
***Best Model***
Rank:  55
MaxIter:  20


## Model Analysis


> After many iterations of the ALS model parameters, the most successful combination is a rank of 55 and maxIter of 20.  The regParam stayed consitent with the default of 0.1.  This combination yields an RMSE for product rating predictions of approximately 1.026. This means that on average, the model predicted rating score is off by about 1 star.  When looking at a 5 star rating scale, predicting anywhere from 4 to 6 stars and being off by 1 is a pretty good guess. This also means that the model could predict a user giving a 2 star rating when they actually gives a 1. In this case, the prediction is not great. However, the recommender system takes the highest predictions, so predicting 1 star too high might not be that bad.


> If I had access to and the ability to process a larger amount of data, I believe this model would get an RMSE closer to zero.  This would be the benefit of using a distributed computing framework and easier access to data. 


> Using the best model I found, I then got the top recommendations for each user. 





## Get Recommendations for All Users



> The recommendForAllUsers method generates a list of the n number of products with the highest predicted rating score. 


In [55]:
#Get the top 10 recommendations for all users
user_recs = best_model.recommendForAllUsers(10)
user_recs.show()

+--------------+--------------------+
|reviewer_index|     recommendations|
+--------------+--------------------+
|          1580|[[1491, 4.8221908...|
|           471|[[594, 5.239127],...|
|          1591|[[594, 5.60205], ...|
|          1342|[[388, 5.2888317]...|
|          2122|[[594, 5.0728745]...|
|          2142|[[594, 5.113259],...|
|           463|[[87, 4.9177847],...|
|           833|[[666, 5.0613637]...|
|          3794|[[898, 4.829689],...|
|          1645|[[641, 5.1238627]...|
|          3175|[[594, 5.122922],...|
|           496|[[1068, 4.817307]...|
|          2366|[[236, 4.9172726]...|
|          2866|[[277, 1.6357831]...|
|           148|[[496, 4.63317], ...|
|          1088|[[875, 5.2042265]...|
|          1238|[[424, 2.3528478]...|
|          1829|[[1261, 4.7498364...|
|          1959|[[424, 5.883332],...|
|          2659|[[424, 1.1764729]...|
+--------------+--------------------+
only showing top 20 rows



## Improve Readability


The recommendations are not in a user friendly format when first extracted. In order to see the recommendations more clearly, I used the explode and lateral view commands in sql.

In [0]:
user_recs.createTempView('temp')

In [0]:
clean_recs = sqlContext.sql("SELECT reviewer_index,\
                                    products_and_ratings.product_index AS product_index,\
                                     products_and_ratings.rating AS prediction\
                              FROM temp\
                              LATERAL VIEW explode(recommendations) exploded_table\
                              AS products_and_ratings")

In [58]:
clean_recs.show()

+--------------+-------------+----------+
|reviewer_index|product_index|prediction|
+--------------+-------------+----------+
|          1580|         1491| 4.8221908|
|          1580|          665| 4.7136793|
|          1580|         1102|   4.71349|
|          1580|         1506|  4.538516|
|          1580|         1380| 4.5076046|
|          1580|          594|  4.501891|
|          1580|          219|  4.482654|
|          1580|          560| 4.4688725|
|          1580|         1370|  4.467898|
|          1580|          661|  4.451627|
|           471|          594|  5.239127|
|           471|          577|  5.188219|
|           471|          530| 5.1852994|
|           471|          253|  5.164042|
|           471|          388| 5.1612434|
|           471|          910|  5.137744|
|           471|         1058|  5.125388|
|           471|          875|   5.12529|
|           471|          711| 5.0952144|
|           471|          784|  5.092416|
+--------------+-------------+----

The resulting dataframe now lists a single recommendation per row. The final steps are to add on the title of the products and convert the indices back to the original IDs.

In [59]:
#Original dataframe containing all columns
df.show()

+----------+-------------+------+--------------+--------------+--------------------+
| productID|product_index|rating|    reviewerID|reviewer_index|               title|
+----------+-------------+------+--------------+--------------+--------------------+
|B000142FVW|          236|   4.0|A3DZFW2XL5S6XK|          1338|OPI Nail Lacquer,...|
|B0001435D4|          643|   1.0|A2LAFL37V088EU|            24|OPI Nail Lacquer ...|
|B00014GT8W|          353|   3.0|A2V92F5R7MLCVI|           326|Dermablend Cover ...|
|B00014GT8W|          353|   4.0| AS44QEHT3KSPK|           330|Dermablend Cover ...|
|B00021DXVA|          264|   5.0|A19UTUEBWKIZFT|          1947|VINCENT LONGO Wat...|
|B00021DXVA|          264|   4.0|A13KW5I9IHQ039|          3382|VINCENT LONGO Wat...|
|B00025WYZC|          786|   4.0|A2FNA5903D9E6Y|           653|Crabtree & Evelyn...|
|B0002RI2PG|           86|   5.0| AQCYEZYS51OBC|          3782|Dermablend Loose ...|
|B0002RI2PG|           86|   4.0|A2ZJPDH0YP90SD|          2773|De

In [60]:
#Make a list of unique products with ID and index
product_info = df.select('productId', 'product_index', 'title')
unique_products = product_info.dropDuplicates()
unique_products.show()

+----------+-------------+--------------------+
| productId|product_index|               title|
+----------+-------------+--------------------+
|B01ELU4DSE|          216|18.21 Man Made Po...|
|B00D7E26H6|          101|Laura Biagiotti E...|
|B000C1W4TC|          836|Rochas Tocade Eau...|
|B003L8RVZ0|          818|Mario Badescu Vit...|
|B00FU7L9N0|          239|VINCENT LONGO Crè...|
|B0017SY63Q|         1260|Borghese Bagno di...|
|B00EFG0XLQ|           41|Antica Farmacista...|
|B000YJ3UEK|         1355|RUSK Designer Col...|
|B0065I8MC4|         1322|     theBalm Manizer|
|B00IBDCYVQ|          674|Obagi360 HydraFac...|
|B00004U9V2|         1005|Crabtree & Evelyn...|
|B014OSNTJC|          896|T3 - Whirl Conver...|
|B000ULF8Q0|         1240|Epicuren Discover...|
|B0010E85TS|          595|Juice Beauty Oil-...|
|B00205CJWK|          185|Mustela Bathtime ...|
|B00M0V3B1C|           29|Dermablend Smooth...|
|B000068DWY|           85|Calvin Klein ck o...|
|B003RJ165Y|         1430|IMAGE Skincare

In [61]:
#Add titles and productID to recommendations
recs_with_titles = clean_recs.join(unique_products, on='product_index', how='left')
recs_with_titles = recs_with_titles.drop('product_index')
recs_with_titles.show()

+--------------+----------+----------+--------------------+
|reviewer_index|prediction| productId|               title|
+--------------+----------+----------+--------------------+
|          1580| 4.8221908|B0062CIEKI|VINCENT LONGO Dew...|
|          1580| 4.7136793|B01A61XUU4|FOREO LUNA 2 Pers...|
|          1580|   4.71349|B00Q7BBDXY|FHI Heat Stylus T...|
|          1580|  4.538516|B000VOHQW0|John Varvatos Vin...|
|          1580| 4.5076046|B004SUY46E|AHAVA Dead Sea De...|
|          1580|  4.501891|B00113RO5O|NEOVA Power Defen...|
|          1580|  4.482654|B003CR4KRM|JACK BLACK – Oil-...|
|          1580| 4.4688725|B019QSJMFU|Julep It's Whippe...|
|          1580|  4.467898|B0182349TY|Julep It's Whippe...|
|          1580|  4.451627|B000W8QWXO|Bioelements Stres...|
|           471|  5.239127|B00113RO5O|NEOVA Power Defen...|
|           471|  5.188219|B000VNIZGM|Paul Mitchell Sup...|
|           471| 5.1852994|B0002Y5JEG|Jurlique Moisture...|
|           471|  5.164042|B0015Z532O|Ma

In [62]:
ratings_df = df.select('productID','reviewer_index','reviewerID','rating')
ratings_df.show()

+----------+--------------+--------------+------+
| productID|reviewer_index|    reviewerID|rating|
+----------+--------------+--------------+------+
|B000142FVW|          1338|A3DZFW2XL5S6XK|   4.0|
|B0001435D4|            24|A2LAFL37V088EU|   1.0|
|B00014GT8W|           326|A2V92F5R7MLCVI|   3.0|
|B00014GT8W|           330| AS44QEHT3KSPK|   4.0|
|B00021DXVA|          1947|A19UTUEBWKIZFT|   5.0|
|B00021DXVA|          3382|A13KW5I9IHQ039|   4.0|
|B00025WYZC|           653|A2FNA5903D9E6Y|   4.0|
|B0002RI2PG|          3782| AQCYEZYS51OBC|   5.0|
|B0002RI2PG|          2773|A2ZJPDH0YP90SD|   4.0|
|B00063M3EC|          3157|A1SG1DJ72ISCRU|   2.0|
|B0006PJRRG|           786|A3HBZYCHTIGP5S|   2.0|
|B0006PLMFQ|          2854|A23H193P451P74|   3.0|
|B0006PLMFQ|           785| A7KDJ0MVYYRA0|   5.0|
|B0006PLMFQ|          1582|A17JGEYNQRYMHE|   5.0|
|B0006PLMFQ|          2442|A2P652CXM82IH6|   4.0|
|B0006PLMFQ|          2798|A34ZOFAJWL9UAM|   5.0|
|B0007M11XQ|           175| AV37OT7S682UW|   4.0|


At this point, the recommendations do not account for the fact that a reviewer may have already purchased and rated the product. To address this, I joined the original review dataframe to the recommendations. If the rating is null, then the reviewer has not previously rated the product. 

In [63]:
recommendation_list = recs_with_titles.join(ratings_df, on=['productID', 'reviewer_index'], how='left_outer')
recommendation_list = recommendation_list.drop('reviewerID')
recommendation_list.show()

+----------+--------------+----------+--------------------+------+
| productId|reviewer_index|prediction|               title|rating|
+----------+--------------+----------+--------------------+------+
|B0062CIEKI|          1580| 4.8221908|VINCENT LONGO Dew...|   5.0|
|B01A61XUU4|          1580| 4.7136793|FOREO LUNA 2 Pers...|   5.0|
|B00Q7BBDXY|          1580|   4.71349|FHI Heat Stylus T...|   5.0|
|B000VOHQW0|          1580|  4.538516|John Varvatos Vin...|  null|
|B004SUY46E|          1580| 4.5076046|AHAVA Dead Sea De...|  null|
|B00113RO5O|          1580|  4.501891|NEOVA Power Defen...|  null|
|B003CR4KRM|          1580|  4.482654|JACK BLACK – Oil-...|  null|
|B019QSJMFU|          1580| 4.4688725|Julep It's Whippe...|  null|
|B0182349TY|          1580|  4.467898|Julep It's Whippe...|  null|
|B000W8QWXO|          1580|  4.451627|Bioelements Stres...|  null|
|B00113RO5O|           471|  5.239127|NEOVA Power Defen...|  null|
|B000VNIZGM|           471|  5.188219|Paul Mitchell Sup...|  n

In [64]:
updated_rec_list = recommendation_list.where(col('rating').isNull())
updated_rec_list.show()

+----------+--------------+----------+--------------------+------+
| productId|reviewer_index|prediction|               title|rating|
+----------+--------------+----------+--------------------+------+
|B000VOHQW0|          1580|  4.538516|John Varvatos Vin...|  null|
|B004SUY46E|          1580| 4.5076046|AHAVA Dead Sea De...|  null|
|B00113RO5O|          1580|  4.501891|NEOVA Power Defen...|  null|
|B003CR4KRM|          1580|  4.482654|JACK BLACK – Oil-...|  null|
|B019QSJMFU|          1580| 4.4688725|Julep It's Whippe...|  null|
|B0182349TY|          1580|  4.467898|Julep It's Whippe...|  null|
|B000W8QWXO|          1580|  4.451627|Bioelements Stres...|  null|
|B00113RO5O|           471|  5.239127|NEOVA Power Defen...|  null|
|B000VNIZGM|           471|  5.188219|Paul Mitchell Sup...|  null|
|B0002Y5JEG|           471| 5.1852994|Jurlique Moisture...|  null|
|B0015Z532O|           471|  5.164042|Mario Badescu The...|  null|
|B00CSN0ZJI|           471| 5.1612434|MONTBLANC Presenc...|  n

In [65]:
#Make a list of unique Reviewers with ID and index
reviewerID_info = df.select('reviewerId', 'reviewer_index')
unique_reviewers = reviewerID_info.dropDuplicates()
unique_reviewers.show()

+--------------+--------------+
|    reviewerId|reviewer_index|
+--------------+--------------+
|A3TJ9NCZKS46LL|          1259|
|A16AXQ11SZA8SQ|          3460|
|A11OTLEDSW8ZXD|          3373|
|A23VNS0WLAYNT9|          2225|
|A17E3NBADNTI3X|          2561|
|A1Q4M82OFOIN59|           745|
|A3DZSPA2Y48NMG|           449|
|A3RV9I6P9O0T50|          3117|
|A1OMXVXXP07F05|          1328|
|A3PL5EW82A4QKW|          2275|
| AVE1SXX3A7B24|           680|
| AX9PVIIHKST28|            88|
|A1JUUPJVBDOE3O|          2179|
| A1UKA7HUAQAMY|          1500|
| ADVSW4W1HHYF7|          1437|
|A19FRW264WZTGP|           719|
|A1IFZDSZE0I08V|          1069|
|A24FOXO7AZ7V87|          1845|
|A37QX2520O8Z4X|          2099|
| A88H8HDPWVVMG|          3663|
+--------------+--------------+
only showing top 20 rows



In [66]:
#Add reviewerID to recommendations
final_rec_list = updated_rec_list.join(unique_reviewers, on='reviewer_index', how='left')
final_rec_list = final_rec_list.drop('reviewer_index')
final_rec_list.select('reviewerId', 'productId', 'title', 'prediction').show(truncate=False)

+--------------+----------+------------------------------------------------------------------------------------------------------------------------------------------------------------+----------+
|reviewerId    |productId |title                                                                                                                                                       |prediction|
+--------------+----------+------------------------------------------------------------------------------------------------------------------------------------------------------------+----------+
|A2V98TS4QDDCG9|B000VOHQW0|John Varvatos Vintage Eau de Toilette Spray, 4.2 fl. Oz. mens cologne                                                                                       |4.538516  |
|A2V98TS4QDDCG9|B004SUY46E|AHAVA Dead Sea Dermud Intensive Moisturizers                                                                                                                |4.5076046 |
|A2V98TS4QDDCG9|B001

## Final Thoughts

Now we have an organized dataframe that contains the top recommended products previously unrated for each user. I personally love finding new beauty products to try. Now, to go find something awesome on Amazon!