In [1]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.sql.functions import col
from pyspark.sql import SparkSession


In [3]:
# Load Amazon Review Dataset of Digital Music Purchases as DataFrame
amz = pd.read_csv('amazon_reviews_us_Digital_Music_Purchase_v1_00.tsv', sep='\t', error_bad_lines=False)

b'Skipping line 1686: expected 15 fields, saw 22\nSkipping line 23469: expected 15 fields, saw 22\nSkipping line 25225: expected 15 fields, saw 22\nSkipping line 48778: expected 15 fields, saw 22\nSkipping line 54061: expected 15 fields, saw 22\n'
b'Skipping line 66479: expected 15 fields, saw 22\nSkipping line 85019: expected 15 fields, saw 22\nSkipping line 102383: expected 15 fields, saw 22\nSkipping line 108349: expected 15 fields, saw 22\nSkipping line 111834: expected 15 fields, saw 22\nSkipping line 122189: expected 15 fields, saw 22\nSkipping line 124293: expected 15 fields, saw 22\n'
b'Skipping line 137493: expected 15 fields, saw 22\nSkipping line 138671: expected 15 fields, saw 22\nSkipping line 147213: expected 15 fields, saw 22\nSkipping line 156030: expected 15 fields, saw 22\nSkipping line 159108: expected 15 fields, saw 22\nSkipping line 162860: expected 15 fields, saw 22\nSkipping line 166640: expected 15 fields, saw 22\nSkipping line 174287: expected 15 fields, saw 22

In [None]:
# Preview DataFrame
amz.head()

In [4]:
# Slice DataFrame for Necessary Columns
cols = ['customer_id', 'product_id', 'product_title', 'star_rating']
amz_data = amz[cols]
amz_data

In [18]:
# Sample DataSet
# https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.sample.html
amz_sample = amz_data.sample(frac = .15, random_state = 123)

In [19]:
amz_sample

Unnamed: 0,customer_id,product_id,product_title,star_rating
287116,32347684,B00JHBGFN4,Doses & Mimosas [Explicit],5.0
1026268,36524721,B009MIA2WM,The White Room,5.0
929842,13574121,B006ZDU8EE,Somebody That I Used To Know [feat. Kimbra],4.0
1614412,18348450,B0034EIVKK,I’m New Here,3.0
854189,45899968,B0089QLT4I,Sexify (Album Version),5.0
...,...,...,...,...
1350624,23156980,B001AQR5CE,Hanukkah Swings,5.0
307620,47992230,B00T5755TA,Beyoncé's Top Songs,5.0
603761,37667090,B00E5ZFDN0,Greater Than (Live),5.0
143913,42875977,B008E62MHG,The Touch Of Your Hand,5.0


In [None]:
# Connect to SparkSession
spark = SparkSession.builder.appName('PySpark ALS Recommender System').getOrCreate()

In [20]:
# Load Amazon DataFrame to Spark
amz_spark = spark.createDataFrame(amz_sample)

In [12]:
amz_spark.show()

+-----------+----------+--------------------+-----------+
|customer_id|product_id|       product_title|star_rating|
+-----------+----------+--------------------+-----------+
|   36264374|B002O4H6OI|          Simple Man|        4.0|
|   10681359|B006ZDU7QI|      Making Mirrors|        5.0|
|   13989297|B002IIXIU6|Country Pie (Stud...|        5.0|
|   13364862|B00YQXKF70|Black Rose [Expli...|        5.0|
|   51299750|B002PZ5K2G|             Make Me|        5.0|
|   44518341|B00BK5BR2Y|     The Party Troll|        5.0|
|   50348291|B00CNGV878|The Dance (Garth ...|        5.0|
|   21844443|B001DRG8HS|               Split|        4.0|
|   44993663|B008U73LI8|Isaiah 2:2, Josep...|        1.0|
|   29306231|B001BKCTAM|   Kiss My Irish Ass|        5.0|
|   47089919|B00DGI0RUG|Bully (Original Mix)|        5.0|
|   15089637|B001NZ75UY|Ridin' (Album Ver...|        5.0|
|   48426096|B006N9AQXI|   Am I The Only One|        5.0|
|   37924955|B00123LQWK|How Deep The Fath...|        5.0|
|   33332777|B

In [21]:
# Assign Every Unique Product ID, Customer ID, and Product Title an Index - Model Needs Numeric IDs
indexer = [StringIndexer(inputCol=column, outputCol=column+"_index") for column in list(set(amz_spark.columns)-set(['star_rating'])) ]
pipeline = Pipeline(stages=indexer)
transformed = pipeline.fit(amz_spark).transform(amz_spark)
transformed.show()

+-----------+----------+--------------------+-----------+-----------------+----------------+-------------------+
|customer_id|product_id|       product_title|star_rating|customer_id_index|product_id_index|product_title_index|
+-----------+----------+--------------------+-----------+-----------------+----------------+-------------------+
|   32347684|B00JHBGFN4|Doses & Mimosas [...|        5.0|           8941.0|        150767.0|            55771.0|
|   36524721|B009MIA2WM|      The White Room|        5.0|         121849.0|        126693.0|           119998.0|
|   13574121|B006ZDU8EE|Somebody That I U...|        4.0|          48526.0|            70.0|               47.0|
|   18348450|B0034EIVKK|        I’m New Here|        3.0|           4265.0|          6562.0|             9368.0|
|   45899968|B0089QLT4I|Sexify (Album Ver...|        5.0|         151665.0|        120759.0|            28609.0|
|   49693727|B001BLMBK4|            Capiche?|        5.0|         164253.0|         62469.0|    

In [22]:
# Split DataFrame into Training and Test Sets - 80:20 Ratio
# Need to add Seed here
(training, test) = transformed.randomSplit([0.8, 0.2])

In [None]:
# ALS Model - 2 Iterations, RegParam 0.01
als = ALS(maxIter=2, regParam=0.01, userCol="customer_id_index", itemCol="product_id_index", ratingCol="star_rating",
          coldStartStrategy="drop", nonnegative=True)

In [None]:
# ALS Model - 2 Iterations, RegParam 0.09
als = ALS(maxIter=2, regParam=0.09, userCol="customer_id_index", itemCol="product_id_index", ratingCol="star_rating",
          coldStartStrategy="drop", nonnegative=True)

In [23]:
# ALS Model - 5 Iterations, RegParam 0.01
als = ALS(maxIter=5, regParam=0.09, userCol="customer_id_index", itemCol="product_id_index", ratingCol="star_rating",
          coldStartStrategy="drop", nonnegative=True)

In [24]:
# Fit ALS Model to Training Set
als_spark_model = als.fit(training)

In [25]:
# Evaluate Model with RMSE
evaluator = RegressionEvaluator(metricName="rmse",labelCol="star_rating",predictionCol="prediction")
predictions = als_spark_model.transform(test)
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

RMSE=2.744569246560837


In [None]:
# Create Pandas DataFrame to User Recommendations
user_recs = als_spark_model.recommendForAllUsers(5).toPandas()

In [31]:
# Create Pandas DataFrame to Product Recommendations
product_recs = als_spark_model.recommendForAllItems(5).toPandas()

In [38]:
product_recs.head()

Unnamed: 0,product_id_index,recommendations
0,148,"[(55116, 5.388706207275391), (21126, 5.3887062..."
1,463,"[(114084, 8.646773338317871), (27553, 8.229619..."
2,471,"[(161512, 10.749791145324707), (113283, 10.749..."
3,496,"[(27553, 11.78837776184082), (101957, 11.57784..."
4,833,"[(136768, 12.800902366638184), (65299, 11.8965..."


In [None]:
"""
Next Steps, Extract Lookuptables from Spark
    customer_id_index -> customer_id
    product_id_index -> product_id -> product_title

Join Lookup Tables to Recommendation Outputs and Show Recommendations by Name
"""

In [37]:
product_recs['recommendations'][0][]

Row(customer_id_index=21126, rating=5.388706207275391)

In [None]:
# Tried creating a function, never tried
def get_recs(model, rec_type=rec_type, no_recs=no_recs):
    if rec_type = 'user':
        return model.recommendForAllUsers(no_recs)
    elif rec_type = 'item':
        return model.recommendForAllItems(no_recs)

In [None]:
spark.stop()