# Recommender Engine using Spark ALS

This project focuses on using the "Alternating Least Squares (ALS)" algorithm from Spark to build a recommendation engine for musical instruments

The data for this has been taken from actual Amazon reviews. It can be downloaded from the below link:

http://deepyeti.ucsd.edu/jianmo/amazon/index.html

You can just replace the datasets with any of the review datasets available like appliances, books, software etc. and it will return the corresponding recommendations

You need to download both these datasets:
<br>1) Reviews data
<br>2) Meta data (has the product descriptions)

#### Import the required libraries

In [28]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame

from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS, ALSModel

from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.sql.functions import col, explode
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit

#### Create a spark session

In [2]:
spark = SparkSession.builder.appName("Recommender").getOrCreate()

#### Read in the Reviews data and the Meta data for the product details

In [3]:
df = spark.read.json(r'E:\Machine Learning\Data\Musical Instruments Review\Musical_Instruments.json')
df_meta = spark.read.json(r'E:\Machine Learning\Data\Musical Instruments Review\meta_Musical_Instruments.json')

Check the columns in the reviews data

In [4]:
print(df.columns)

['asin', 'image', 'overall', 'reviewText', 'reviewTime', 'reviewerID', 'reviewerName', 'style', 'summary', 'unixReviewTime', 'verified', 'vote']


Check the columns in the meta data

In [5]:
print(df_meta.columns)

['also_buy', 'also_view', 'asin', 'brand', 'category', 'date', 'description', 'details', 'feature', 'fit', 'image', 'main_cat', 'price', 'rank', 'similar_item', 'tech1', 'tech2', 'title']


Select only the UserID, ProductID and the Rating columns from the Reviews data

In [6]:
df_reviews = df.select(['reviewerID','asin','overall'])
df_reviews.show(5)

+--------------+----------+-------+
|    reviewerID|      asin|overall|
+--------------+----------+-------+
| AXHY24HWOF184|0470536454|    5.0|
|A29OWR79AM796H|0470536454|    4.0|
| AUPWU27A7X5F6|0470536454|    5.0|
|A1N69A47D4JO6K|0470536454|    4.0|
| AHTIQUMVCGBFJ|0470536454|    5.0|
+--------------+----------+-------+
only showing top 5 rows



Select only the "distinct" ProductID (asin) and the Product Name (title) from the product meta data

In [7]:
df_products = df_meta.select(['asin','title']).distinct()
df_products.show(5, truncate=False)

+----------+--------------------------------------------------------------------------------+
|asin      |title                                                                           |
+----------+--------------------------------------------------------------------------------+
|B00006343B|Casio GSDX Deluxe Keyboard Stand                                                |
|B00007E7Q6|Gemini XL500II Phonograph Turntable                                             |
|B0002CBRCQ|Yamaha PSR-295 Portatone 61-Key Touch-Sensitive Musical Keyboard                |
|B0002CZVOQ|Schecter Stiletto Elite-4 Electric Bass (4 String, Black Cherry)                |
|B0002D02F8|Seymour Duncan Antiquity For Strat Texas Hot Custom-Bridge Pickup (added output)|
+----------+--------------------------------------------------------------------------------+
only showing top 5 rows



#### Get a feel for the data volumes - total users, products etc.

Create temporary tables for running SQL queries

In [57]:
df_reviews.createOrReplaceTempView("Reviews")
df_products.createOrReplaceTempView("Products")

In [58]:
stats = spark.sql("SELECT COUNT(asin) as total_rows, \
                          COUNT(DISTINCT(asin)) as asin, \
                          COUNT(DISTINCT(reviewerID)) as reviewerID \
                          FROM Reviews").collect()[0]

print("Total products : {}   Total users : {}   Total rows : {}".format(stats['reviewerID'], stats['asin'], stats['total_rows']))

Total products : 903330   Total users : 112222   Total rows : 1512530


## Data Preparation

#### Select a smaller dataset to test the approach

We have over 1.5 million data points in the musical instruments dataset!. That is too big to be handled by a local Spark instance. Hence for this project we'll restrict to using 10% of the total data

In [10]:
df_reviews_small = df_reviews.limit(150000)

#### Convert text columns to numeric

In [11]:
indexer = [StringIndexer(inputCol = column, outputCol = column + "_index") for column in ['reviewerID','asin']]

pipeline = Pipeline(stages = indexer)

df_reviews_transformed = pipeline.fit(df_reviews_small).transform(df_reviews_small)
df_reviews_transformed.show(10)

+--------------+----------+-------+----------------+----------+
|    reviewerID|      asin|overall|reviewerID_index|asin_index|
+--------------+----------+-------+----------------+----------+
| AXHY24HWOF184|0470536454|    5.0|        113338.0|     137.0|
|A29OWR79AM796H|0470536454|    4.0|         52763.0|     137.0|
| AUPWU27A7X5F6|0470536454|    5.0|        111456.0|     137.0|
|A1N69A47D4JO6K|0470536454|    4.0|         37395.0|     137.0|
| AHTIQUMVCGBFJ|0470536454|    5.0|        102420.0|     137.0|
|A1J8LQ7HVLR9GU|0470536454|    5.0|         34605.0|     137.0|
| ABVTZ63S6GOWF|0470536454|    5.0|         98348.0|     137.0|
|A2HX9NFBXGSWRW|0470536454|    4.0|         58239.0|     137.0|
| AP1TQR64HQRCI|0470536454|    4.0|        107555.0|     137.0|
| A37FC9MED20AO|0470536454|    5.0|         75792.0|     137.0|
+--------------+----------+-------+----------------+----------+
only showing top 10 rows



#### Split into training and test data

In [12]:
(train, test) = df_reviews_transformed.randomSplit([0.8, 0.2])

#### Create the ALS model and specify the parameters for hyperparameter tuning

In [13]:
als = ALS(userCol="reviewerID_index",
          itemCol="asin_index",
          ratingCol="overall",
          coldStartStrategy="drop",
          nonnegative=True)

params = ParamGridBuilder()\
         .addGrid(als.maxIter, [5,10,15])\
         .addGrid(als.rank, [10, 15, 20])\
         .addGrid(als.regParam, [0.1, 0.4])\
         .build()

evaluator = RegressionEvaluator(labelCol = "overall",
                                predictionCol = "prediction",
                                metricName = "rmse")

crossVal = TrainValidationSplit(estimator = als,
                                estimatorParamMaps = params,
                                evaluator=evaluator,
                                trainRatio=0.8)

#### Fit the training data to the model

This step will take a long time as it evaluates multiple models and selects the best one. So feel free to step away for a 15-20 min coffee break while this finishes :)

In [15]:
model = crossVal.fit(train)

#### Save the model

In [32]:
best_model = model.bestModel
best_model.save('ALS_best_model')

#### Load the model

Once you have run and saved the best model you can reload it from this step next time you run the code

In [14]:
best_model = ALSModel.load('ALS_best_model')

#### Generate the predictions

In [15]:
train_pred = best_model.transform(train)
test_pred = best_model.transform(test)

In [16]:
train_pred.show(10)

+--------------------+----------+-------+----------------+----------+----------+
|          reviewerID|      asin|overall|reviewerID_index|asin_index|prediction|
+--------------------+----------+-------+----------------+----------+----------+
|A0005916MHK9RK69491E|0739079891|    5.0|         21435.0|      27.0|  4.481688|
|A0044312CECEQYZIPS3B|B000068O59|    4.0|         21436.0|     118.0| 3.6588285|
|A0096681Y127OL1H8W3U|B0002CZUDS|    5.0|            77.0|     143.0|  4.621826|
|A0096681Y127OL1H8W3U|B0002CZWKE|    5.0|            77.0|     227.0|  4.582752|
|A0096681Y127OL1H8W3U|B0002D06IG|    5.0|            77.0|     411.0| 4.3432307|
|A0096681Y127OL1H8W3U|B0002E2NEK|    5.0|            77.0|     106.0| 4.5630455|
|A0096681Y127OL1H8W3U|B0002E2NFO|    5.0|            77.0|     196.0| 4.2879457|
|A0096681Y127OL1H8W3U|B0002E52AW|    5.0|            77.0|     715.0|  4.460223|
|A0096681Y127OL1H8W3U|B0002F77PO|    4.0|            77.0|     944.0| 4.1705194|
|A0096681Y127OL1H8W3U|B0002H

In [17]:
test_pred.show(10)

+--------------------+----------+-------+----------------+----------+----------+
|          reviewerID|      asin|overall|reviewerID_index|asin_index|prediction|
+--------------------+----------+-------+----------------+----------+----------+
|A0072193KFP6LUHKEXLT|B000068O35|    4.0|         21437.0|      31.0|  3.636929|
|A0096681Y127OL1H8W3U|B0002D0NJ8|    5.0|            77.0|      81.0|   4.56794|
|A0096681Y127OL1H8W3U|B0002D0QW2|    5.0|            77.0|     631.0| 4.4112897|
|A0096681Y127OL1H8W3U|B0002GJ3E6|    5.0|            77.0|     348.0| 4.1839924|
|A0103849GBVWICKXD4T6|B0002D035C|    5.0|          2880.0|     102.0|  3.920868|
|A0133510AABKV6ANFN5H|B0002GJILE|    5.0|         21439.0|     469.0| 4.5611115|
|A02196236Z4QJMRQADW1|B0002E1G5C|    5.0|         21442.0|       3.0|  4.555663|
| A0240957MWDGNUMT3GF|B0002DV6TO|    1.0|         21443.0|      42.0|0.91423136|
|A0279100VZXR9A2495P4|B0002D017M|    5.0|         21447.0|      40.0| 4.5828023|
|A0326684HE4FUBTB3FC8|B0002G

#### Evaluate the performance by checking the RMSE score

In [18]:
train_rmse = evaluator.evaluate(train_pred)
test_rmse  = evaluator.evaluate(test_pred)
print("RMSE on training : {} \nRMSE for test : {}".format(train_rmse, test_rmse))

RMSE on training : 0.534348253925742 
RMSE for test : 0.5400845854834098


We can see that the RMSE scores for both train and test sets are comparable (which means the model is not overfitting), and that both are reasonably small which means that the model is pretty good

## Get the recommendations for all users

In [31]:
recommendations = best_model.recommendForAllUsers(10)
recommendations.show(5)

+----------------+--------------------+
|reviewerID_index|     recommendations|
+----------------+--------------------+
|             148|[[1836, 4.873973]...|
|             463|[[1836, 4.735553]...|
|             471|[[1739, 4.3731275...|
|             496|[[1836, 4.792375]...|
|             833|[[1635, 4.988263]...|
+----------------+--------------------+
only showing top 5 rows



#### Convert the recommendations back into a format which tells us the actual user/item from the original data

Get the reviewerID_index and reviewerID mapping

In [23]:
map_reviewer = df_reviews_transformed.select('reviewerID_index','reviewerID').distinct()
map_reviewer.show(5)

+----------------+--------------+
|reviewerID_index|    reviewerID|
+----------------+--------------+
|        113338.0| AXHY24HWOF184|
|         52763.0|A29OWR79AM796H|
|        111456.0| AUPWU27A7X5F6|
|         37395.0|A1N69A47D4JO6K|
|        102420.0| AHTIQUMVCGBFJ|
+----------------+--------------+
only showing top 5 rows



Get the asin_index and asin mapping

In [59]:
map_asin = df_reviews_transformed.select('asin_index','asin').distinct()
map_asin.show(5)

+----------+----------+
|asin_index|      asin|
+----------+----------+
|     137.0|0470536454|
|    1692.0|0615582222|
|      27.0|0739079891|
|     485.0|0739079883|
|    1232.0|0739084542|
+----------+----------+
only showing top 5 rows



Join all the different datasets together to get the final formatted recommendations

In [60]:
final_recos = recommendations\
             .withColumn('col4',explode('recommendations'))\
             .select('reviewerID_index','col4.*')\
             .join(map_reviewer, 'reviewerID_index')\
             .join(map_asin, 'asin_index')\
             .join(df_products, 'asin')\
             .select('reviewerID','asin','rating','title')

final_recos.show(truncate=False)

+--------------+----------+---------+--------------------------------------------------------------------+
|reviewerID    |asin      |rating   |title                                                               |
+--------------+----------+---------+--------------------------------------------------------------------+
|A1A9WJHKBSTI4C|B0002D0EMO|4.873973 |Kyser Instrument Polish                                             |
|A1A9WJHKBSTI4C|B0002E2G3S|4.8514533|Fender American Vintage Stratocaster Tremolo Arm - Chrome           |
|A1A9WJHKBSTI4C|B0002DUYJW|4.8422904|Ernie Ball Pedal Steel Nickel Wound 10-String Set, E9 Tuning        |
|A1A9WJHKBSTI4C|B0002E38BW|4.796107 |Monster Bass - 21' Instrument Cable -  Straight to Straight ¼” plugs|
|A1A9WJHKBSTI4C|B0002F7402|4.7809873|Vic Firth American Classic Metal Nylon Drumsticks (CMN)             |
|A1A9WJHKBSTI4C|B0002D050U|4.7790837|Zildjian A Series 21" Sweet Ride Cymbal                             |
|A1A9WJHKBSTI4C|B0002F7J3O|4.7730007|

## Check the recommendations for a given user

Let us pick some userIDs who have reviewed a few products

In [34]:
test.groupBy('reviewerID').count().sort(col('count').desc()).show(10)

+--------------+-----+
|    reviewerID|count|
+--------------+-----+
|A2NYK9KWFMJV4Y|   13|
|A15TYOEWBQYF0X|    7|
|A10HS51L0GC1R4|    7|
|A1FOXJ8TMYVKRK|    7|
|A1S7QABO64Z554|    6|
|A3SMT15X2QVUR8|    6|
|A1MAQQXJZTMSKT|    6|
|A20JJ8634DG3FS|    6|
|A2SUG35F6A6S3C|    6|
|A2BGZ52M908MJY|    5|
+--------------+-----+
only showing top 10 rows



In [54]:
user_ID = 'A15TYOEWBQYF0X'

Show the items this user has already reviewed/purchased

In [55]:
test.filter(col('reviewerID') == user_ID)\
.select('reviewerID','asin')\
.join(df_products, 'asin')\
.show(truncate=False)

+----------+--------------+-----------------------------------------------------------------------------------------------+
|asin      |reviewerID    |title                                                                                          |
+----------+--------------+-----------------------------------------------------------------------------------------------+
|B0002DUPZU|A15TYOEWBQYF0X|Ernie Ball Earthwood Light Phosphor Bronze Acoustic String Set, .011 - .052                    |
|B0002D0JZ6|A15TYOEWBQYF0X|Pignose 7-100 Legendary portable amplifier                                                     |
|B0002E1O2W|A15TYOEWBQYF0X|Elixir Strings Electric Guitar Strings w NANOWEB Coating, Super Light (.009-.042)              |
|B0002D0LP4|A15TYOEWBQYF0X|Gibson Regular Style 2 Inch Safety Guitar Strap, Jet Black                                     |
|B0002E1NWI|A15TYOEWBQYF0X|Elixir Strings 80/20 Bronze Acoustic Guitar Strings w POLYWEB Coating, Light/Medium (.012-.056)|
|B0002GZ

Check the items recommended for this user

In [56]:
final_recos.filter(col('reviewerID') == user_ID).show(truncate=False)

+--------------+----------+---------+--------------------------------------------------------------------+
|reviewerID    |asin      |rating   |title                                                               |
+--------------+----------+---------+--------------------------------------------------------------------+
|A15TYOEWBQYF0X|B0002D0EMO|4.879595 |Kyser Instrument Polish                                             |
|A15TYOEWBQYF0X|B0002F7402|4.8672314|Vic Firth American Classic Metal Nylon Drumsticks (CMN)             |
|A15TYOEWBQYF0X|B0002E38BW|4.829496 |Monster Bass - 21' Instrument Cable -  Straight to Straight ¼” plugs|
|A15TYOEWBQYF0X|B0002E2G3S|4.817302 |Fender American Vintage Stratocaster Tremolo Arm - Chrome           |
|A15TYOEWBQYF0X|B0002E37T0|4.7553754|Monster Jazz -21'  Instrument Cable -  Straight to Straight ¼” plugs|
|A15TYOEWBQYF0X|B0002DUYJW|4.741025 |Ernie Ball Pedal Steel Nickel Wound 10-String Set, E9 Tuning        |
|A15TYOEWBQYF0X|B0002F4YF0|4.7362065|

## Summary
We can see that this user seems to be a Guitar player, and that many of the recommendations for this user do recommend items related to the guitar. This recommendation engine seems to be working well!