# Spark Recommender System

Spark offers a built in ALS recommendation system that we will use to compare against our LightFM model. Unfortunately it does not handle the cold start problem we aimed at solving for, but it does a good job of using collaborative techniques to make a prediction. 

The algorithm uses alternating least squares as a metric for optimizing predictions of a users preference for an item. How it does it is by taking the original matrix of users and product ratings R, which in our case is the number of times a user bought an item for a given product, and factorizes it into two matrices U and P. When U and P are multiplied back together the empty ratings will be replaced with an estimation. The alternating part of ALS comes from the way the algorithm minimizes least squares error. It will alternate between matrices U and P by fixing one matrix and optimizing for the other and then repeating this process a designated number of times to minimize least squared error. The resulting matrix will have ratings filled in for each product which we can sample from to do a comparison with our LightFM model. We will tune our model by estimating our parameters and iterating until an optimal root mean squared error is achieved. 

In [1]:
# import necessary modules
import os
import shutil
import pyspark as ps
from pyspark.ml import Pipeline, Transformer
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS, ALSModel
from pyspark.sql import Row
from pyspark.sql.types import DoubleType

In [2]:
# create spark context
spark = (ps.sql.SparkSession.builder
        .appName("sandbox")
        .getOrCreate()
        )
sc = spark.sparkContext
print(spark.version)

3.0.0


## Read in Data

We will use user clusters as part of user features for our model here. 

In [3]:
# source data from prior step
data_dir = os.path.join("modified_data", "")
file = os.path.join(data_dir, "item_features_clustered.csv")

# options are specified to read in data without error
df_user = spark.read.format("csv")\
               .option("multiline", "true")\
               .option("quote", '"')\
               .option("header", "true")\
               .option("escape", "\\")\
               .option("escape", '"')\
               .load(file)

In [4]:
# source data from prior step
data_dir = os.path.join("modified_data", "")
file = os.path.join(data_dir, "item_features.csv")

# options are specified to read in data without error
df_item = spark.read.format("csv")\
               .option("multiline", "true")\
               .option("quote", '"')\
               .option("header", "true")\
               .option("escape", "\\")\
               .option("escape", '"')\
               .load(file)

## Create use and item feature matrices

In [5]:
user_features = df_user.select(df_user['customer_unique_id'], 
                               df_user['product_id'], 
                               df_user['product_category_count'])

In [6]:
user_features.show(4)

+--------------------+--------------------+----------------------+
|  customer_unique_id|          product_id|product_category_count|
+--------------------+--------------------+----------------------+
|7c396fd4830fd0422...|87285b34884572647...|                     1|
|7c396fd4830fd0422...|9abb00920aae319ef...|                     1|
|e781fdcc107d13d86...|87285b34884572647...|                     1|
|3a51803cc0d012c3b...|87285b34884572647...|                     1|
+--------------------+--------------------+----------------------+
only showing top 4 rows



In [7]:
user_features = user_features.sort("customer_unique_id")

In [8]:
item_features = df_item.select(df_item['product_id'], 
                               df_item['product_category_name'], 
                               df_item['avg_price_binned'])

In [9]:
item_features.show(4)

+--------------------+---------------------+----------------+
|          product_id|product_category_name|avg_price_binned|
+--------------------+---------------------+----------------+
|372645c7439f9661f...|       bed_bath_table|   (74.9, 135.0]|
|5099f7000472b634f...|        health_beauty|   (0.849, 39.9]|
|64b488de448a5324c...|           stationery|    (39.9, 74.9]|
|2345a354a6f203360...|            telephony|   (0.849, 39.9]|
+--------------------+---------------------+----------------+
only showing top 4 rows



## Index user and product ids

In [10]:
from pyspark.ml.feature import StringIndexer

# create object of StringIndexer class and specify input and output column
SI_customer = StringIndexer(inputCol='customer_unique_id',outputCol='customer_index')
SI_product = StringIndexer(inputCol='product_id',outputCol='product_index')

# transform the data
user_features = SI_customer.fit(user_features).transform(user_features)
user_features = SI_product.fit(user_features).transform(user_features)
item_features = SI_product.fit(item_features).transform(item_features)

# view the transformed data
user_features.select('customer_unique_id', 'customer_index', 'product_id', 'product_index').show(10)
item_features.select('product_id', 'product_index').show(10)

+--------------------+--------------+--------------------+-------------+
|  customer_unique_id|customer_index|          product_id|product_index|
+--------------------+--------------+--------------------+-------------+
|0000366f3b9a7992b...|       11614.0|372645c7439f9661f...|        380.0|
|0000b849f77a49e4a...|       11615.0|5099f7000472b634f...|       2737.0|
|0000f46a3911fa3c0...|       11616.0|64b488de448a5324c...|       4156.0|
|0000f6ccb0745a6a4...|       11617.0|2345a354a6f203360...|       6611.0|
|0004aac84e0df4da2...|       11618.0|c72e18b3fe2739b8d...|      28356.0|
|0004bd2a26a76fe21...|       11619.0|25cf184645f3fae66...|       6625.0|
|00050ab1314c0e55a...|       11620.0|8cefe1c6f2304e7e6...|       2384.0|
|00053a61a98854899...|        2805.0|62984ea1bba7fcea1...|       5331.0|
|00053a61a98854899...|        2805.0|58727e154e8e85d84...|       1357.0|
|0005e1862207bf6cc...|       11621.0|e24f73b7631ee3fbb...|        801.0|
+--------------------+--------------+--------------

In [11]:
from pyspark.sql.types import IntegerType
# convert columns to integer types
user_features = user_features.withColumn("product_category_count",
                                        user_features["product_category_count"].cast(IntegerType()))

## Model Training

In [12]:
# split 80-20
(training, test) = user_features.randomSplit([0.8, 0.2])

In [13]:
# train the recommender with als
als_alg = ALS(maxIter=5, 
              regParam=0.01, 
              userCol='customer_index', 
              itemCol="product_index", 
              ratingCol='product_category_count',
              coldStartStrategy='drop', 
              seed = 3)

model=als_alg.fit(training)

# evaluate with the holdout set
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName='rmse',
                                labelCol='product_category_count',
                                predictionCol='prediction')
rmse = evaluator.evaluate(predictions)

print("Root-mean-squared-error = " + str(round(rmse, 3)))

Root-mean-squared-error = 0.273


## Generate user and product recommendations

These can be sampled from to output predictions for specific users.

In [14]:
# generate top 5 product recommendations for user
user_recs = model.recommendForAllUsers(5)
user_recs.show(4)

+--------------+--------------------+
|customer_index|     recommendations|
+--------------+--------------------+
|           148|[[507, 6.7362304]...|
|           463|[[956, 5.002464],...|
|           471|[[901, 8.583217],...|
|           496|[[352, 19.671946]...|
+--------------+--------------------+
only showing top 4 rows



In [15]:
product_recs = model.recommendForAllItems(10)
product_recs.show()

+-------------+--------------------+
|product_index|     recommendations|
+-------------+--------------------+
|          148|[[348, 10.787726]...|
|          463|[[447, 11.842729]...|
|          471|[[100, 5.606615],...|
|          496|[[93, 9.08362], [...|
|          833|[[708, 13.14601],...|
|         1088|[[348, 8.86548], ...|
|         1238|[[93, 8.416886], ...|
|         1342|[[227, 10.962105]...|
|         1580|[[348, 8.376414],...|
|         1591|[[348, 11.388984]...|
|         1645|[[98, 7.9219284],...|
|         1829|[[708, 8.222495],...|
|         1959|[[93, 9.122136], ...|
|         2122|[[386, 7.2687736]...|
|         2142|[[62, 5.9342213],...|
|         2366|[[348, 9.065077],...|
|         2659|[[348, 13.379322]...|
|         2866|[[348, 12.944078]...|
|         3175|[[55, 9.250828], ...|
|         3749|[[55, 5.104551], ...|
+-------------+--------------------+
only showing top 20 rows



In [16]:
users = user_features.select(als_alg.getUserCol()).distinct().limit(3)
user_subset_recs = model.recommendForUserSubset(users, 10)
user_subset_recs.show(n=4)

+--------------+--------------------+
|customer_index|     recommendations|
+--------------+--------------------+
|         11757|[[1757, 2.2729402...|
|           558|[[596, 27.327358]...|
|          2815|[[366, 2.5956264]...|
+--------------+--------------------+



### Parameter Tuning

In [18]:
# Import the required functions
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [19]:
# train the recommender with als
als = ALS(maxIter=5, 
              regParam=0.01, 
              userCol='customer_index', 
              itemCol="product_index", 
              ratingCol='product_category_count',
              coldStartStrategy='drop')

In [20]:
# Import the requisite packages
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator

# Add hyperparameters and their respective values to param_grid
param_grid = ParamGridBuilder() \
            .addGrid(als.rank, [10, 50, 100, 150]) \
            .addGrid(als.regParam, [.01, .05, .1, .15]) \
            .build()

In [21]:
evaluator = RegressionEvaluator(
           metricName="rmse", 
           labelCol="product_category_count", 
           predictionCol="prediction") 
print ("Num models to be tested: ", len(param_grid))

Num models to be tested:  16


In [22]:
# Build cross validation using CrossValidator
cv = CrossValidator(estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=5)

In [23]:
#Fit cross validator to the 'train' dataset
model = cv.fit(training)
#Extract best model from the cv model above
best_model = model.bestModel
# View the predictions
test_predictions = best_model.transform(test)
RMSE = evaluator.evaluate(test_predictions)
print(RMSE)

0.25907724967729984


In [24]:
print("**Best Model**")
# Print "Rank"
print("  Rank:", best_model._java_obj.parent().getRank())
# Print "MaxIter"
print("  MaxIter:", best_model._java_obj.parent().getMaxIter())
# Print "RegParam"
print("  RegParam:", best_model._java_obj.parent().getRegParam())

**Best Model**
  Rank: 150
  MaxIter: 5
  RegParam: 0.05


In [25]:
# Generate n Recommendations for all users
recommendations = best_model.recommendForAllUsers(5)
recommendations.show()

+--------------+--------------------+
|customer_index|     recommendations|
+--------------+--------------------+
|           148|[[1358, 5.983632]...|
|           463|[[586, 5.0032835]...|
|           471|[[353, 4.9666333]...|
|           496|[[14701, 3.994572...|
|           833|[[5976, 3.981684]...|
|          1088|[[8835, 4.0529366...|
|          1238|[[19280, 1.956733...|
|          1342|[[3733, 2.9766014...|
|          1580|[[2116, 2.9813814...|
|          1591|[[30130, 3.586678...|
|          1645|[[8578, 2.9761343...|
|          1829|[[8550, 2.9761338...|
|          1959|[[23256, 3.067747...|
|          2122|[[460, 1.9372059]...|
|          2142|[[13069, 3.092081...|
|          2366|[[23114, 3.457238...|
|          2659|[[1923, 2.969991]...|
|          2866|[[678, 3.32295], ...|
|          3175|[[3776, 1.9789387...|
|          3749|[[5388, 1.3674556...|
+--------------+--------------------+
only showing top 20 rows



### Re-run model for all users

In [26]:
# train the recommender with als
als_alg = ALS(rank=best_model._java_obj.parent().getRank(),
              maxIter=best_model._java_obj.parent().getMaxIter(), 
              regParam=best_model._java_obj.parent().getRegParam(), 
              userCol='customer_index', 
              itemCol="product_index", 
              ratingCol='product_category_count',
              coldStartStrategy='drop')

final_model=als_alg.fit(user_features)

In [27]:
# generate top_n product recommendations for user
nrecommend = 5
user_recs = final_model.recommendForAllUsers(nrecommend)
user_recs.show(4)

+--------------+--------------------+
|customer_index|     recommendations|
+--------------+--------------------+
|           148|[[1358, 5.9854245...|
|           463|[[586, 5.0025682]...|
|           471|[[353, 4.9697766]...|
|           496|[[14701, 3.999740...|
+--------------+--------------------+
only showing top 4 rows



In [28]:
recs = user_recs.toPandas()

## Recommender Function

In [29]:
# Generate pandas df for accessing products in recommender function
products = item_features.toPandas()

In [32]:
user_features_df = user_features.toPandas()

In [33]:
def user_recommendations(user_id, top_n = 3):
    
    if top_n > nrecommend:
        print("Please select up to {} items to recommend".format(nrecommend))
        return; 
    
    prior_purchases = user_features_df[user_features_df['customer_unique_id'] == user_id]\
                                                                                        ['product_id'].unique()
    num_items = len(prior_purchases)
    
    if num_items < 3:
        items = num_items
    else:
        items = 3
    
    print("User: {}\n".format(user_id))
    print("Known positives: ")
    for n in range(items):
        known_like_product = user_features_df[user_features_df['customer_unique_id'] == user_id]\
                                                            ['product_id'].unique()[n]
        known_like_category = products[products['product_id'] == known_like_product]\
                                                            ['product_category_name'].unique()[0]
    
        print("\t", known_like_product)
        print("\t", known_like_category, "\n")
    
    
    customer_index = user_features_df[user_features_df['customer_unique_id'] == user_id]\
                                                            ['customer_index'].unique()[0]
    print("Top {} Recommendations: \n".format(top_n))
    rec_products = []
    
    for n in range(top_n):
        
        rec_products.append(list(recs[recs['customer_index'] == customer_index]['recommendations'])[0][n][0])
        
        print("{}.\n".format(n+1), products[products['product_index'] == rec_products[n]]\
                                                  [['product_id', 'product_category_name']].iloc[0][0])
        
        print(products[products['product_index'] == rec_products[n]]\
                                                  [['product_id', 'product_category_name']].iloc[0][1])

__Test for customer_id = 'c8ed31310fc440a3f8031b177f9842c3'__

In [34]:
user_recommendations('c8ed31310fc440a3f8031b177f9842c3', top_n=5)

User: c8ed31310fc440a3f8031b177f9842c3

Known positives: 
	 1065e0ebef073787a7bf691924c60eeb
	 construction_tools_construction 

	 0cf2faf9749f53924cea652a09d8e327
	 construction_tools_construction 

	 309dd69eb83cea38c51709d62befe1a4
	 construction_tools_construction 

Top 5 Recommendations: 

1.
 15b1f9b06d0e709552d7d8638387e09b
furniture_decor
2.
 7189fb70393a0b87189f93f19655f8db
toys
3.
 3e7ec3672e5549ba74cf635752bfc70b
furniture_decor
4.
 14ad6805c263d8d758d648f46a06570e
baby
5.
 329c661807f085964b1877bfeca6ff73
furniture_decor


__Test for customer_id = '698e1cf81d01a3d389d96145f7fa6df8'__

In [35]:
user_recommendations('698e1cf81d01a3d389d96145f7fa6df8', top_n=5)

User: 698e1cf81d01a3d389d96145f7fa6df8

Known positives: 
	 9571759451b1d780ee7c15012ea109d4
	 auto 

Top 5 Recommendations: 

1.
 0a4f9f421af66d2ea061fbb8883419f7
health_beauty
2.
 fdd84aefb08c8f8225e0b8c97429d53b
health_beauty
3.
 12485f9cdebb6ca179826ede539554ad
air_conditioning
4.
 616042729c11849827291496b18e9ec5
sports_leisure
5.
 7a5c07212703b5f01ee199d29a29a587
cool_stuff


__Test for customer_id = '89be58cbdd6ef318e3ed93fdb22be178'__

In [36]:
user_recommendations('89be58cbdd6ef318e3ed93fdb22be178', top_n=5)

User: 89be58cbdd6ef318e3ed93fdb22be178

Known positives: 
	 3fdb534dccf5bc9ab0406944b913787d
	 diapers_and_hygiene 

Top 5 Recommendations: 

1.
 779dd392d4fbe5ca656bf3ceabecbf0b
construction_tools_construction
2.
 bdcf6a834e8faa30dac3886c7a58e92e
health_beauty
3.
 91b08d34d0ba4db44da2dc382867ba49
telephony
4.
 1b8ee158f59c098470fad33f39660964
furniture_living_room
5.
 d9339c5714743c460a9470730f79f6c5
computers_accessories


## Summary of Results

It's clear after producing a simple recommendation system with matrix factorization using only users prior purchase history that this dataset simply does not have the data necessary to give accurate results. Many of the attempted recommendations produced results that are clearly not relevant for the user. As can be seen in the suggestions above, many of the top items are from categories very different from the original purchase. 

Comparing these same recommendations with LightFM shows how well hybrid recommenders can do for data sets like this one, with very few return users. 