In [10]:
!pip install pyspark findspark



In [11]:
import findspark
findspark.init()

In [None]:
from bigdata_a3_utils import *
import pyspark
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import *
from pyspark.ml.feature import StringIndexer
import pickle
import numpy as np
import pandas as pd
from pathlib import Path
import gc

In [None]:
def get_needed_cols(base_path, category, frame):
    #columns we want
    needed= ["user_id", "asin", "rating"]
    #read in pickle file
    df= pd.read_pickle(base_path / f"cleaned_data_{category}.pkl")
    #drop everything but the columns we want
    subset= df[needed]
    #clear large pickle file from memory
    del df
    gc.collect()
    #concat into unified dataframe
    frame= pd.concat([frame, subset])

    del subset
    gc.collect()
    return frame

In [13]:
spark= SparkSession.builder \
      .master("local") \
      .appName("Recommendation System") \
      .getOrCreate()

In [14]:
sc= spark.sparkContext

In [None]:
# #read pickle file
# pandas_df= pd.read_pickle('test.pkl')
# pandas_df.head(5)
base_path= Path(r"C:\Users\saeed\OneDrive\Desktop")
pandas_df= pd.DataFrame()

# run after test is successful
for category in VALID_CATEGORIES:
    pandas_df= get_needed_cols(base_path, category, pandas_df)

#test run. Please run this to ensure you are getting the categories in the list below
#Once working, comment test code and uncomment real code and then Run All
# categories= ["Amazon_Fashion"]
# for category in categories:
#     pandas_df= get_needed_cols(base_path, category, pandas_df)

Unnamed: 0,rating,title_reviews,text,images_reviews,asin,parent_asin,user_id,timestamp,helpful_vote,verified_purchase,...,videos,store,categories,details,bought_together,subtitle,author,brand,review_length,year
0,5.0,Pretty locket,I think this locket is really pretty. The insi...,[],B00LOPVX74,B00LOPVX74,AGBFYI2DDIKXC5Y4FARTYDTQBMFQ,1578528394489,3,True,...,{'title': ['Irish Locket Celtic Locket Necklac...,CHUVORA,[],"{""Is Discontinued By Manufacturer"": ""No"", ""Pro...",,,,CHUVORA,58,2020
1,5.0,A,Great,[],B07B4JXK8D,B07B4JXK8D,AFQLNQNQYFWQZPJQZS6V3NZU4QBQ,1608426246701,0,True,...,"{'title': [], 'url': [], 'user_id': []}",PrimeMed,[],"{""Is Discontinued By Manufacturer"": ""No"", ""Pac...",,,,PrimeMed,1,2020
2,2.0,Two Stars,One of the stones fell out within the first 2 ...,[],B007ZSEQ4Q,B007ZSEQ4Q,AHITBJSS7KYUBVZPX7M2WJCOIVKQ,1432344828000,3,True,...,"{'title': [], 'url': [], 'user_id': []}",KEZEF,[],"{""Material"": ""not-applicable"", ""Brand"": ""KEZEF...",,,,KEZEF,18,2015
3,1.0,Won’t buy again,Crappy socks. Money wasted. Bought to wear wit...,[],B07F2BTFS9,B07F2BTFS9,AFVNEEPDEIH5SPUN5BWC6NKL3WNQ,1546289847095,2,True,...,"{'title': [], 'url': [], 'user_id': []}",VERO MONTE,[],"{""Is Discontinued By Manufacturer"": ""No"", ""Pro...",,,,VERO MONTE,15,2018
4,5.0,I LOVE these glasses,I LOVE these glasses! They fit perfectly over...,[],B00PKRFU4O,B00XESJTDE,AHSPLDNW5OOUK2PLH7GXLACFBZNQ,1439476166000,0,True,...,"{'title': [], 'url': [], 'user_id': []}",SA106,[],"{""Is Discontinued By Manufacturer"": ""No"", ""Ite...",,,,SA106,56,2015


In [16]:
#Retain user_id, product_id, rating
pandas_df= pandas_df[['user_id', 'asin', 'rating']]
pandas_df.head(5)

Unnamed: 0,user_id,asin,rating
0,AGBFYI2DDIKXC5Y4FARTYDTQBMFQ,B00LOPVX74,5.0
1,AFQLNQNQYFWQZPJQZS6V3NZU4QBQ,B07B4JXK8D,5.0
2,AHITBJSS7KYUBVZPX7M2WJCOIVKQ,B007ZSEQ4Q,2.0
3,AFVNEEPDEIH5SPUN5BWC6NKL3WNQ,B07F2BTFS9,1.0
4,AHSPLDNW5OOUK2PLH7GXLACFBZNQ,B00PKRFU4O,5.0


In [17]:
#convert to spark dataframe
spark_df= spark.createDataFrame(pandas_df)
spark_df.show()

+--------------------+----------+------+
|             user_id|      asin|rating|
+--------------------+----------+------+
|AGBFYI2DDIKXC5Y4F...|B00LOPVX74|   5.0|
|AFQLNQNQYFWQZPJQZ...|B07B4JXK8D|   5.0|
|AHITBJSS7KYUBVZPX...|B007ZSEQ4Q|   2.0|
|AFVNEEPDEIH5SPUN5...|B07F2BTFS9|   1.0|
|AHSPLDNW5OOUK2PLH...|B00PKRFU4O|   5.0|
|AHTTU2FL6FCNBBAES...|B089S8MFCQ|   3.0|
|AFWHJ6O3PV4JC7PVO...|B01MZDN9Z4|   5.0|
|AEWJXW46LKSW22FWT...|B00N9X4XNU|   4.0|
|AEQAYV7RXZEBXMQIQ...|B087M39LVV|   5.0|
|AFNT6ZJCYQN3WDIKU...|B0799L7FMC|   2.0|
|AF232QQ7UDHGQOVR6...|B077GN3YZJ|   2.0|
|AEVPPTMG43C6GWSR7...|B091GMMYPS|   5.0|
|AHREXOGQPZDA6354M...|B09DQ5M2BB|   2.0|
|AHREXOGQPZDA6354M...|B095M3HHTJ|   5.0|
|AHREXOGQPZDA6354M...|B089PWHFVW|   3.0|
|AHREXOGQPZDA6354M...|B097DQPCP2|   5.0|
|AHREXOGQPZDA6354M...|B092J4ZT1V|   5.0|
|AH6CATODIVPVUOJEW...|B09QH7GPH1|   5.0|
|AH6CATODIVPVUOJEW...|B07C3P1DKX|   5.0|
|AEO4M665ZOCBF7HEF...|B01E39FOVS|   5.0|
+--------------------+----------+------+
only showing top

In [18]:
#need to index the user_id and asin to numerical of model
user_index= StringIndexer(inputCol= 'user_id', outputCol= 'user_id_index')
asin_index= StringIndexer(inputCol= 'asin', outputCol= 'asin_index')

user_index_model= user_index.fit(spark_df)
asin_index_model= asin_index.fit(spark_df)

spark_df= user_index_model.transform(spark_df)
spark_df= asin_index_model.transform(spark_df)

spark_df.show()

+--------------------+----------+------+-------------+----------+
|             user_id|      asin|rating|user_id_index|asin_index|
+--------------------+----------+------+-------------+----------+
|AGBFYI2DDIKXC5Y4F...|B00LOPVX74|   5.0|        202.0|      42.0|
|AFQLNQNQYFWQZPJQZ...|B07B4JXK8D|   5.0|        170.0|     280.0|
|AHITBJSS7KYUBVZPX...|B007ZSEQ4Q|   2.0|        252.0|      23.0|
|AFVNEEPDEIH5SPUN5...|B07F2BTFS9|   1.0|        179.0|     323.0|
|AHSPLDNW5OOUK2PLH...|B00PKRFU4O|   5.0|        262.0|      52.0|
|AHTTU2FL6FCNBBAES...|B089S8MFCQ|   3.0|        264.0|     728.0|
|AFWHJ6O3PV4JC7PVO...|B01MZDN9Z4|   5.0|        182.0|     191.0|
|AEWJXW46LKSW22FWT...|B00N9X4XNU|   4.0|        133.0|      45.0|
|AEQAYV7RXZEBXMQIQ...|B087M39LVV|   5.0|        121.0|     709.0|
|AFNT6ZJCYQN3WDIKU...|B0799L7FMC|   2.0|        164.0|     278.0|
|AF232QQ7UDHGQOVR6...|B077GN3YZJ|   2.0|        141.0|     261.0|
|AEVPPTMG43C6GWSR7...|B091GMMYPS|   5.0|        130.0|     885.0|
|AHREXOGQP

In [19]:
#drop users with less than 5 full reviews
user_number= spark_df.groupBy('user_id_index').count().alias('num')
print("User_Number:")
user_number.show()

valid_user= user_number.filter(col('count')>= 5).select('user_id_index')
print("Valid_User:")
valid_user.show()

spark_df= spark_df.join(valid_user, on= 'user_id_index')
spark_df.show()

User_Number:
+-------------+-----+
|user_id_index|count|
+-------------+-----+
|        170.0|    1|
|        184.0|    1|
|        147.0|    1|
|        160.0|    1|
|        169.0|    1|
|         70.0|    2|
|         67.0|    2|
|          8.0|   20|
|        168.0|    1|
|        206.0|    1|
|         69.0|    2|
|          0.0|  145|
|        249.0|    1|
|          7.0|   32|
|        142.0|    1|
|        191.0|    1|
|        112.0|    1|
|        154.0|    1|
|        232.0|    1|
|        124.0|    1|
+-------------+-----+
only showing top 20 rows

Valid_User:
+-------------+
|user_id_index|
+-------------+
|          8.0|
|          0.0|
|          7.0|
|         18.0|
|          1.0|
|          4.0|
|         23.0|
|         11.0|
|         21.0|
|         14.0|
|         22.0|
|         19.0|
|          3.0|
|          2.0|
|         17.0|
|         10.0|
|         13.0|
|          6.0|
|         20.0|
|         15.0|
+-------------+
only showing top 20 rows

+----------

In [20]:
spark_df.printSchema()

root
 |-- user_id_index: double (nullable = false)
 |-- user_id: string (nullable = true)
 |-- asin: string (nullable = true)
 |-- rating: double (nullable = true)
 |-- asin_index: double (nullable = false)



In [21]:
#Split data 80/20 (train/test)

train, test= spark_df.randomSplit([0.8, 0.2])
print("Train: ", train.count())
print("Test: ", test.count())

Train:  510
Test:  130


In [22]:
#ALS Model
model= ALS(maxIter= 5, regParam= 0.1,
           userCol= 'user_id_index', itemCol= 'asin_index',
           ratingCol= 'rating', coldStartStrategy= 'drop')

In [27]:
fittedModel= model.fit(train)

In [28]:
#RMSE eval on model
pred= fittedModel.transform(test)

eval= RegressionEvaluator(metricName= "rmse", labelCol= 'rating', predictionCol= 'prediction')
rmse= eval.evaluate(pred)
print("RMSE = ", rmse)

RMSE =  4.148684000007313


In [45]:
#Demo
unique_test_users= spark_df.select('user_id').distinct()
random_selection= unique_test_users.orderBy(rand()).limit(3)

#model requires dataframes
random_id_df= random_selection.select(col("user_id")) #puts user_id in dataframe
user_id_index_df= spark_df.select("user_id", "user_id_index").distinct()
random_id_indexed_df= random_id_df.join(user_id_index_df, "user_id", "inner")
# random_id_indexed_df.show()

recommendations= fittedModel.recommendForUserSubset(random_id_indexed_df, 5)
recommendations= recommendations.join(spark_df.select("user_id", "user_id_index")).distinct().limit(3)
recommendations= recommendations.select("user_id", "recommendations")
recommendations.show(truncate= False)

+----------------------------+------------------------------------------------------------------------------------------+
|user_id                     |recommendations                                                                           |
+----------------------------+------------------------------------------------------------------------------------------+
|AFA26DYXVLJYTYZ3KET77GE27N2A|[{271, 3.9448905}, {144, 3.9448905}, {672, 3.5661492}, {305, 3.5661492}, {299, 3.5661492}]|
|AHEJ5LC7BSEADCIZQQQPZPVWOLCA|[{631, 4.9485283}, {540, 4.9485283}, {436, 4.9485283}, {434, 4.9485283}, {307, 4.9485283}]|
|AEIMJ5XEUIE4TKJLGSZYNNKB2WDQ|[{271, 3.9448905}, {144, 3.9448905}, {672, 3.5661492}, {305, 3.5661492}, {299, 3.5661492}]|
+----------------------------+------------------------------------------------------------------------------------------+

