Author : Alex Aw  
Last Edit : 22 Mar 2022  
Reference: https://schaper.io/2017/10/building-a-recommendation-engine-with-spark-and-emr/

## PySpark Script
- This script was sandboxed on Google Colab with PySpark. 
- ALS recommender system built evaluated with RMSE
- Preparing script for AWS EMR

In [1]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 35 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 52.0 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=b7a52ea9263bb850450dae0042e7441642d3904e633a962574baea501d9fa4f9
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1


In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
from pyspark.sql import Row
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import *
from pyspark.sql.functions import explode
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import IndexToString
import numpy as np
import pandas as pd

#### Start Spark

In [4]:
spark = SparkSession.builder.getOrCreate()

spark

In [None]:
# spark.stop()

#### Loading and Prepping the data

In [5]:
file = "datasets/beerReview_150kFull.csv"
ratings_full = spark.read.csv(file, header=True)
ratings_full.columns

['_c0',
 'brewery_id',
 'brewery_name',
 'review_time',
 'review_overall',
 'review_aroma',
 'review_appearance',
 'review_profilename',
 'beer_style',
 'review_palate',
 'review_taste',
 'beer_name',
 'beer_abv',
 'beer_beerid']

In [6]:
ratings_spdf = ratings_full.select('review_profilename','beer_name', 'review_overall')
newcolnames = ['userid','itemid','rating']
ratings_spdf = ratings_spdf.toDF(*newcolnames)
ratings_spdf.printSchema()
ratings_spdf.show(10)

root
 |-- userid: string (nullable = true)
 |-- itemid: string (nullable = true)
 |-- rating: string (nullable = true)

+------------+--------------------+------+
|      userid|              itemid|rating|
+------------+--------------------+------+
|    Schmidts|  Black Butte Porter|   4.5|
|     oline73|Schlafly Pumpkin Ale|   4.5|
|   aforbes10|   Mouthy Muskie Ale|   1.0|
|  birdman200|          Squall IPA|   4.0|
|    Phyl21ca|Noël Christmas We...|   2.0|
|    billybob|         Beck's Dark|   3.5|
|       mjl21|                 ESB|   4.0|
|     Hands22|         Grey Monday|   4.5|
|magictrokini|Portsmouth Lupe's...|   4.0|
| Huhzubendah|Ayinger Celebrato...|   4.5|
+------------+--------------------+------+
only showing top 10 rows



In [7]:
ratings_spdf = ratings_spdf.withColumn("rating", ratings_spdf.rating.cast("Float"))
ratings_spdf.printSchema()

root
 |-- userid: string (nullable = true)
 |-- itemid: string (nullable = true)
 |-- rating: float (nullable = true)



In [8]:
# Indexing UserID and ItemID

userIndexer = StringIndexer(inputCol='userid', outputCol='userIndex').fit(ratings_spdf)
itemIndexer = StringIndexer(inputCol='itemid', outputCol='itemIndex').fit(ratings_spdf)

pipeline = Pipeline(stages=[userIndexer, itemIndexer])
indexedRatings = pipeline.fit(ratings_spdf).transform(ratings_spdf)

indexedRatings.show()

+---------------+--------------------+------+---------+---------+
|         userid|              itemid|rating|userIndex|itemIndex|
+---------------+--------------------+------+---------+---------+
|       Schmidts|  Black Butte Porter|   4.5|   3947.0|    199.0|
|        oline73|Schlafly Pumpkin Ale|   4.5|    322.0|   1257.0|
|      aforbes10|   Mouthy Muskie Ale|   1.0|    296.0|  17819.0|
|     birdman200|          Squall IPA|   4.0|   4849.0|    423.0|
|       Phyl21ca|Noël Christmas We...|   2.0|     10.0|  10316.0|
|       billybob|         Beck's Dark|   3.5|    809.0|    915.0|
|          mjl21|                 ESB|   4.0|   1078.0|   1688.0|
|        Hands22|         Grey Monday|   4.5|   7305.0|  15470.0|
|   magictrokini|Portsmouth Lupe's...|   4.0|    307.0|  10582.0|
|    Huhzubendah|Ayinger Celebrato...|   4.5|    174.0|     43.0|
|    BeerPanther| Hercules Double IPA|   4.5|    848.0|    100.0|
|     fattire513|Samuel Adams Nobl...|   5.0|   5554.0|    101.0|
|  Buckeye

#### Training model

In [9]:
# test, train, split
(training, test) = indexedRatings.randomSplit([0.7, 0.3])

# train model
als = ALS(maxIter=20, rank=40, regParam=0.25, userCol="userIndex", itemCol="itemIndex", ratingCol="rating", coldStartStrategy="drop", implicitPrefs=False)
model = als.fit(training)

# evaluate model
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root Mean Sqaure Error = ", rmse)


Root Mean Sqaure Error =  0.7450735160189643


#### Recommender List

In [10]:
userRecs = model.recommendForAllUsers(10)
beerRecs = model.recommendForAllItems(10)

userRecs.show(10)
beerRecs.show(10)



+---------+--------------------+
|userIndex|     recommendations|
+---------+--------------------+
|        1|[{21168, 5.152205...|
|        3|[{21168, 4.869573...|
|        5|[{21168, 4.785491...|
|        6|[{21168, 5.567945...|
|        9|[{21168, 5.161485...|
|       12|[{21168, 5.115019...|
|       13|[{21168, 5.218595...|
|       15|[{21168, 4.709941...|
|       16|[{18245, 4.992645...|
|       17|[{21168, 5.058577...|
+---------+--------------------+
only showing top 10 rows

+---------+--------------------+
|itemIndex|     recommendations|
+---------+--------------------+
|        1|[{8144, 6.357976}...|
|        3|[{8144, 6.3653316...|
|        5|[{8144, 6.5584245...|
|        6|[{8144, 6.104564}...|
|        9|[{14080, 6.238182...|
|       12|[{14080, 6.053038...|
|       13|[{14080, 6.108824...|
|       15|[{14080, 6.688264...|
|       16|[{14080, 5.731580...|
|       17|[{14080, 7.010308...|
+---------+--------------------+
only showing top 10 rows



In [11]:
flatUserRecs = userRecs.withColumn("itemAndRating", explode(userRecs.recommendations)).select("userIndex", "itemAndRating.*")
flatUserRecs.show()

+---------+---------+---------+
|userIndex|itemIndex|   rating|
+---------+---------+---------+
|        1|    21168| 5.152205|
|        1|    18245|5.1228256|
|        1|    19165|5.0577483|
|        1|    10019| 5.040913|
|        1|    20432|5.0111403|
|        1|    13316|4.9878473|
|        1|    15313|4.9878473|
|        1|    18847| 4.978482|
|        1|    20017| 4.945473|
|        1|    12127|4.9454045|
|        3|    21168| 4.869573|
|        3|    18245| 4.841165|
|        3|    19165|4.7826595|
|        3|    10019|  4.75526|
|        3|    20432| 4.716397|
|        3|    13316|4.7123113|
|        3|    15313|4.7123113|
|        3|    12127|  4.67973|
|        3|    20017| 4.675818|
|        3|    18847|4.6733046|
+---------+---------+---------+
only showing top 20 rows



In [12]:
# flatItemRecs = beerRecs.withColumn("userAndRating", explode(beerRecs.recommendations)).select("itemIndex", "userAndRating.*")
# flatItemRecs.show()

In [13]:
userConverter = IndexToString(inputCol="userIndex", outputCol="userid", labels=userIndexer.labels)
itemConverter = IndexToString(inputCol="itemIndex", outputCol="itemid", labels=itemIndexer.labels)

convertedUserRecs = Pipeline(stages=[userConverter, itemConverter]).fit(indexedRatings).transform(flatUserRecs)
CUR = convertedUserRecs.select("userid", "itemid", "rating")
CUR.show()

+-------------+--------------------+---------+
|       userid|              itemid|   rating|
+-------------+--------------------+---------+
|BuckeyeNation|       Unique Singel| 5.152205|
|BuckeyeNation|Old Combine 4-Gra...|5.1228256|
|BuckeyeNation|           Red Storm|5.0577483|
|BuckeyeNation|    Love (Foeder #3)| 5.040913|
|BuckeyeNation|Strongman Belgian...|5.0111403|
|BuckeyeNation| Buckeye Engine Mild|4.9878473|
|BuckeyeNation|         Golden Funk|4.9878473|
|BuckeyeNation|             Premier| 4.978482|
|BuckeyeNation|Smuttynose Belgia...| 4.945473|
|BuckeyeNation|Arthur's Robust P...|4.9454045|
|    Thorpe429|       Unique Singel| 4.869573|
|    Thorpe429|Old Combine 4-Gra...| 4.841165|
|    Thorpe429|           Red Storm|4.7826595|
|    Thorpe429|    Love (Foeder #3)|  4.75526|
|    Thorpe429|Strongman Belgian...| 4.716397|
|    Thorpe429| Buckeye Engine Mild|4.7123113|
|    Thorpe429|         Golden Funk|4.7123113|
|    Thorpe429|Arthur's Robust P...|  4.67973|
|    Thorpe42

In [51]:
# Look up userid 'davidbowers13' that had done 20 reviews previously to see what he would like.
# To use default user, simply press enter at input value

useridx = input('Enter userid : ') or 'davidbowers13'

matchUser = CUR.where(CUR.userid == useridx)
result = list(matchUser.select('itemid').toPandas()['itemid'])
print("\n")
print("Target User:" + useridx)
print("*" * 40 + "\n")
print("Top 10 Recommended beer :")
for i in result:
  print(i)

Enter userid : Thorpe429


Target User:Thorpe429
****************************************

Top 10 Recommended beer :
Unique Singel
Old Combine 4-Grain Lager
Red Storm
Love (Foeder #3)
Strongman Belgian Strong Dark
Buckeye Engine Mild
Golden Funk
Arthur's Robust Porter
Smuttynose Belgian Style White Ale (Big Beer Series)
Premier
