##CUNY - Data 612

#Project 5
#Implementing a Recommender System on Spark

##Peter Kowalchuk and Violeta Stoyanova

#Introduction

The purpose of this project is to build a recommender system using the same dataset as in previous project https://rpubs.com/pkowalchuk/588534 and compare the performance with the previous iteration. The dataset is derived from Amazon and consists of users' ratings of instant videos. In the previous project, Project 4, both item (IBCF) and user (UBCF) based collaborative filtering recommenders were designed on a reduced set (80k entries) of data from the Amazon dataset. It was not possible to train a model with a larger dataset on an on-premise setting or without using a distributed machine-learning framework.
In Project 5, the whole dataset will be used (over 500k entries) and will be adapted to work with Apache Spark via Databricks Community Edition. An Alternating Least Squares (ALS) model was chosen instead of ICBF and UBCF. ALS is an algorithm very well suited for distributed computing since it is an iterative process. 
“ALS recommender is a matrix factorization algorithm that uses Alternating Least Squares with Weighted-Lamda-Regularization (ALS-WR). It factors the user to item matrix A into the user-to-feature matrix U and the item-to-feature matrix M: It runs the ALS algorithm in a parallel fashion.  The ALS algorithm should uncover the latent factors that explain the observed user to item ratings and tries to find optimal factor weights to minimize the least squares between predicted and actual ratings.” –
https://www.elenacuoco.com/2016/12/22/alternating-least-squares-als-spark-ml/?cn-reloaded=1

#Load Data

Datasets are available on Julian McAuley’s site: http://jmcauley.ucsd.edu/data/amazon/links.html
The csv file for Instant Video was uploaded onto the databricks file system using the UI. The file is then loaded onto a Databricks dataframe.

In [4]:
# File location and type
file_location = "/FileStore/tables/ratings_Amazon_Instant_Video.csv"
file_type = "csv"

# CSV options
infer_schema = "false"
first_row_is_header = "false"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
data = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(data)

_c0,_c1,_c2,_c3
A1EE2E3N7PW666,B000GFDAUG,5.0,1202256000
AGZ8SM1BGK3CK,B000GFDAUG,5.0,1198195200
A2VHZ21245KBT7,B000GIOPK2,4.0,1215388800
ACX8YW2D5EGP6,B000GIOPK2,4.0,1185840000
A9RNMO9MUSMTJ,B000GIOPK2,2.0,1281052800
A3STFVPM8NHJ7B,B000GIOPK2,5.0,1203897600
A2582KMXLK2P06,B000GIOPK2,5.0,1205884800
A1TZCLCW9QGGBH,B000GIOPK2,4.0,1209427200
A2E2I6B878CRMA,B000GIOPK2,5.0,1378684800
AD5MZA8SOVMPJ,B000GIOPK2,5.0,1218240000


Drop timestamp from dataset and rename columns

In [6]:
data=data.selectExpr("_c0 as userId","_c1 as itemId","_c2 as rating")
display(data)

userId,itemId,rating
A1EE2E3N7PW666,B000GFDAUG,5.0
AGZ8SM1BGK3CK,B000GFDAUG,5.0
A2VHZ21245KBT7,B000GIOPK2,4.0
ACX8YW2D5EGP6,B000GIOPK2,4.0
A9RNMO9MUSMTJ,B000GIOPK2,2.0
A3STFVPM8NHJ7B,B000GIOPK2,5.0
A2582KMXLK2P06,B000GIOPK2,5.0
A1TZCLCW9QGGBH,B000GIOPK2,4.0
A2E2I6B878CRMA,B000GIOPK2,5.0
AD5MZA8SOVMPJ,B000GIOPK2,5.0


In [7]:
print("Total number of user/item ratings in dataset"),str(data.count())

Build dataframe with average ratings for movies and movie counts

In [9]:
from pyspark.sql import functions as F

item_ids_with_avg_ratings_df = data.groupBy('itemId').agg(F.count(data.rating).alias("Count"), F.avg(data.rating).alias("average"))
print ('item_ids_with_avg_ratings_df:')
item_ids_with_avg_ratings_df.show(5, truncate=False)

In [10]:
print("Total number of items in dataset"),item_ids_with_avg_ratings_df.count()

#Select relevant data

To build this model we are going to select items with a 1000 or more reviews

In [12]:
from pyspark.sql.functions import col

In [13]:
items_with_many_ratings_or_more = item_ids_with_avg_ratings_df.filter(item_ids_with_avg_ratings_df.Count>1000).sort(col("average").desc())
print ('Items with highest ratings:')
items_with_many_ratings_or_more.show(20, truncate=False)

In [14]:
print("Number of items with many ratings:"),items_with_many_ratings_or_more.count()

In [15]:
df=data.join(items_with_many_ratings_or_more,data.itemId==items_with_many_ratings_or_more.itemId).drop(items_with_many_ratings_or_more.itemId)
print("Number of user/item ratings with items with more than 1000 ratings"),df.count()

In [16]:
display(df)

userId,rating,itemId,Count,average
A3OSZISQRMTWZX,5.0,B003NRWVMC,1132,4.845406360424028
AUMYZ3TS44MNP,5.0,B003NRWVMC,1132,4.845406360424028
A3RA3ZTC0HDP9U,5.0,B003NRWVMC,1132,4.845406360424028
A18NIX1QH8L1Z9,4.0,B003NRWVMC,1132,4.845406360424028
A1HQ4RXXJNS5TC,5.0,B003NRWVMC,1132,4.845406360424028
A36GS5K3YYXFMX,5.0,B003NRWVMC,1132,4.845406360424028
A34IPQQL7NIJQL,5.0,B003NRWVMC,1132,4.845406360424028
A20ZO8UFW5EFS6,5.0,B003NRWVMC,1132,4.845406360424028
A38IOP4TW32Y2Q,5.0,B003NRWVMC,1132,4.845406360424028
A133G8UNAV44BP,5.0,B003NRWVMC,1132,4.845406360424028


#Prepare dataframe for pySpark

In order to build a model using pySpark ALS we have to remove the strings from the user and item variables since pySpark ALS does not take strings. The next step is to create corresponding numeric indecies for user and items. 
From Apache Spark Documentation
Note: The DataFrame-based API for ALS currently only supports integers for user and item ids. Other numeric types are supported for the user and item id columns, but the ids must be within the integer value range.

In [18]:
from pyspark.ml.feature import StringIndexer

In [19]:
stringIndexer = StringIndexer(inputCol="userId", outputCol="userIndex")
model = stringIndexer.fit(df)
dataIndexed = model.transform(df)

stringIndexer = StringIndexer(inputCol="itemId", outputCol="itemIndex")
model = stringIndexer.fit(dataIndexed)
dataIndexed = model.transform(dataIndexed)

display(dataIndexed)

userId,rating,itemId,Count,average,userIndex,itemIndex
A3OSZISQRMTWZX,5.0,B003NRWVMC,1132,4.845406360424028,3930.0,60.0
AUMYZ3TS44MNP,5.0,B003NRWVMC,1132,4.845406360424028,31909.0,60.0
A3RA3ZTC0HDP9U,5.0,B003NRWVMC,1132,4.845406360424028,15727.0,60.0
A18NIX1QH8L1Z9,4.0,B003NRWVMC,1132,4.845406360424028,85495.0,60.0
A1HQ4RXXJNS5TC,5.0,B003NRWVMC,1132,4.845406360424028,114301.0,60.0
A36GS5K3YYXFMX,5.0,B003NRWVMC,1132,4.845406360424028,148614.0,60.0
A34IPQQL7NIJQL,5.0,B003NRWVMC,1132,4.845406360424028,65610.0,60.0
A20ZO8UFW5EFS6,5.0,B003NRWVMC,1132,4.845406360424028,6588.0,60.0
A38IOP4TW32Y2Q,5.0,B003NRWVMC,1132,4.845406360424028,48178.0,60.0
A133G8UNAV44BP,5.0,B003NRWVMC,1132,4.845406360424028,6488.0,60.0


In [20]:
dataIndexed.printSchema()

Now we have userIndex, itemIndex corresponding with userId and ItemId and we are going to select only the columns required for modeling and cast rating as a float

In [22]:
df=dataIndexed.select(dataIndexed.userId,dataIndexed.itemId,dataIndexed.rating.cast("float"),dataIndexed.userIndex,dataIndexed.itemIndex)
df.printSchema()

Split data .8 train and .2 test

In [24]:
(training,test)=df.randomSplit([0.8,0.2])

print('Training: {0}, test: {1}\n'.format(
  training.count(), test.count())
)
training.show(3)
test.show(3)

#Model

Train ALS models with different K

In [26]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import TrainValidationSplit,ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator

"ALS is an iterative optimization process where we for every iteration try to arrive closer and closer to a factorized representation of our original data."
Just like other recommender systems ALS has it's own hyper-parameters that need to be tuned and for that purpose we are using the build-in function ParamGridBuilder to come up with the best combination of:

•	rank: the number of latent factors in the model (defaults to 10)
•	maxIter: the maximum number of iterations to run (defaults to 10)
•	regParam: the regularization parameter in ALS (defaults to 1.0)

In [28]:
als = ALS()

als.setUserCol('userIndex')\
   .setItemCol('itemIndex')

param_grid=ParamGridBuilder()\
            .addGrid(als.rank,[8,12,20])\
            .addGrid(als.maxIter,[10,20])\
            .addGrid(als.regParam,[.05, .1])\
            .build()

evaluator=RegressionEvaluator(metricName="rmse",labelCol="rating",predictionCol="prediction")

tvs=TrainValidationSplit(estimator=als,estimatorParamMaps=param_grid,evaluator=evaluator)

In [29]:
# Import mlflow
import mlflow
import mlflow.sklearn

In [30]:
model=tvs.fit(training)

In [31]:
best_model=model.bestModel

predictions=best_model.transform(test)

predicted_ratings_df = predictions.filter(predictions.prediction != float('nan'))

rmse=evaluator.evaluate(predicted_ratings_df)

print("RMSE = " + str(rmse))
print("**Best Model**")
print(" Rank:"),best_model.rank
print(" MaxIter:"),best_model._java_obj.parent().getMaxIter()
print(" RegParam:"),best_model._java_obj.parent().getRegParam()

The best model has 22 latent factors and regulization = 0.1
The RMSE is 1.89644273379, which is bigger than both IBCF= 1.1570069 and UBCF= 0.9240670 from the previous project

In [33]:
display(predicted_ratings_df)

userId,itemId,rating,userIndex,itemIndex,prediction
A3UZV6KHEPQKQ5,B00CDBQNZU,5.0,17753.0,31.0,2.8665416
A2UWXEEWIKFPBP,B00CDBQNZU,5.0,897.0,31.0,3.7309794
A2Y2JTGER56QR4,B00CDBQNZU,5.0,4119.0,31.0,1.6496632
A2L3FAPDVEUL5K,B00CDBQNZU,1.0,16234.0,31.0,2.8665416
A07210292XA7ZPKESLMC8,B00CDBQNZU,4.0,5375.0,31.0,2.9191685
A2T4OL4DO5CF6V,B00CDBQNZU,5.0,15093.0,31.0,3.5831769
A3S48Q63UPO9L6,B00CDBQNZU,5.0,13170.0,31.0,2.323049
AEQ5O09E7ERTB,B00CDBQNZU,4.0,18193.0,31.0,-0.09269881
A2NJOS6PYGAB5H,B00CDBQNZU,5.0,17196.0,31.0,2.199551
A1CFRFLS49G8R8,B00CDBQNZU,1.0,3313.0,31.0,3.799327


In [34]:
reg_eval = RegressionEvaluator(predictionCol="prediction", labelCol="rating", metricName="rmse")

#Recommendations

Generate Recommendations for all users

In [36]:
user_rec=best_model.recommendForAllUsers(10)

In [37]:
display(user_rec)

userIndex,recommendations
148,"List(List(15, 4.9310813), List(46, 4.7800746), List(6, 4.7537084), List(63, 4.7201915), List(1, 4.637907), List(60, 4.601981), List(10, 4.5879793), List(18, 4.5705414), List(39, 4.5512996), List(20, 4.523264))"
463,"List(List(24, 6.560636), List(62, 6.3836765), List(3, 6.249261), List(25, 6.135927), List(61, 5.940811), List(21, 5.9244184), List(53, 5.7974215), List(64, 5.7418127), List(57, 5.699632), List(33, 5.609172))"
471,"List(List(62, 6.1830344), List(32, 4.943763), List(7, 4.702999), List(14, 4.658553), List(61, 4.318445), List(12, 4.3122706), List(65, 3.9326615), List(1, 3.5566816), List(3, 3.4101014), List(37, 3.2326102))"
496,"List(List(12, 5.872194), List(32, 5.850674), List(51, 5.5642624), List(37, 5.533883), List(61, 5.507338), List(29, 5.4738364), List(9, 5.361377), List(18, 5.3560534), List(23, 5.082713), List(14, 5.0502806))"
833,"List(List(62, 6.6936264), List(3, 4.775024), List(31, 4.4702916), List(17, 4.4406714), List(36, 4.314538), List(61, 4.2573624), List(1, 4.2495894), List(65, 4.0683), List(30, 3.9462829), List(38, 3.9390357))"
1088,"List(List(54, 5.4669986), List(28, 5.211794), List(26, 5.1970687), List(4, 5.1481004), List(68, 5.0985236), List(35, 5.04649), List(27, 4.960038), List(16, 4.930458), List(5, 4.920898), List(56, 4.769057))"
1238,"List(List(67, 5.0436606), List(64, 5.037113), List(6, 5.035599), List(33, 4.958264), List(29, 4.907932), List(39, 4.8235645), List(55, 4.818003), List(35, 4.756812), List(28, 4.751462), List(15, 4.7511744))"
1342,"List(List(15, 4.9602337), List(10, 4.8338776), List(18, 4.826921), List(60, 4.802887), List(63, 4.757739), List(46, 4.611818), List(6, 4.576256), List(59, 4.5233107), List(58, 4.4695516), List(2, 4.463224))"
1580,"List(List(28, 5.0033064), List(67, 4.973325), List(0, 4.8970094), List(22, 4.8352075), List(11, 4.8274565), List(8, 4.772457), List(68, 4.7621307), List(55, 4.752386), List(35, 4.7518287), List(49, 4.701833))"
1591,"List(List(24, 5.280665), List(54, 5.0922127), List(56, 4.6291947), List(41, 4.614948), List(50, 4.5113573), List(44, 4.4997964), List(43, 4.4841704), List(0, 4.3178077), List(28, 4.2971864), List(27, 4.229585))"


In [38]:
user_rec.count()

Produce recommendations for one user

In [40]:
user = user_rec.filter(user_rec.userIndex==148)

In [41]:
display(user)

userIndex,recommendations
148,"List(List(15, 4.9310813), List(46, 4.7800746), List(6, 4.7537084), List(63, 4.7201915), List(1, 4.637907), List(60, 4.601981), List(10, 4.5879793), List(18, 4.5705414), List(39, 4.5512996), List(20, 4.523264))"


We can see that the algorithm is recommnding items with similar ratings, which is an indication that a user is getting similar items recommnded to them

In [43]:
rec=user.select("recommendations.itemIndex","recommendations.rating")

In [44]:
display(rec)

itemIndex,rating
"List(15, 46, 6, 63, 1, 60, 10, 18, 39, 20)","List(4.9310813, 4.7800746, 4.7537084, 4.7201915, 4.637907, 4.601981, 4.5879793, 4.5705414, 4.5512996, 4.523264)"


In [45]:
import pandas as pd

In [46]:
rec_items=rec.select("itemIndex").toPandas().iloc[0,0]
rec_ratings=rec.select("rating").toPandas().iloc[0,0]

In [47]:
rec_matrix=pd.DataFrame(rec_items,columns=["itemsIndex"])
rec_matrix["ratings"]=rec_ratings
ratings_matrix_df=sqlContext.createDataFrame(rec_matrix)
display(ratings_matrix_df)

itemsIndex,ratings
15,4.931081295013428
46,4.780074596405029
6,4.753708362579346
63,4.720191478729248
1,4.637907028198242
60,4.601981163024902
10,4.587979316711426
18,4.5705413818359375
39,4.551299571990967
20,4.523263931274414


We map the index of the items recommended back to their ids

In [49]:
test_items=test.dropDuplicates(['itemId','itemIndex']).select("itemId","itemIndex")
display(test_items)

itemId,itemIndex
B004GTOKM0,67.0
B00BLCHYKU,38.0
B0083IJGBU,45.0
B000VZUWZW,56.0
B006Z48TZS,15.0
B00C7KXUOE,20.0
B00L86ZKAK,61.0
B00I3MMN4I,2.0
B003RRW3BC,58.0
B008EQHT4M,54.0


In [50]:
user_recommendations=test_items.join(ratings_matrix_df,test_items.itemIndex==ratings_matrix_df.itemsIndex).drop("itemIndex","itemsIndex")
display(user_recommendations)

itemId,ratings
B004MWZLYC,4.5705413818359375
B00I3MPDP4,4.637907028198242
B00F0XPJH6,4.551299571990967
B003NRVE6Q,4.720191478729248
B003NRWVMC,4.601981163024902
B00H7NDSPC,4.780074596405029
B00APE00H4,4.587979316711426
B00B8P8O9K,4.753708362579346
B00C7KXUOE,4.523263931274414
B006Z48TZS,4.931081295013428


This table shows the recommended items for the selected user and the corresponding ratings.

##Conclusion

The goal of this project was to practice working with a distributed recommnder system. 
We were able to adapt an ALS recommender system to work with Apache Spark via Databricks platform. The dataset that was used to build the model was the same as in Project 4 where IBCF anf UBCF collaborative systems were build in order to recommend new items to users. The best performing algorithm was UBCF with RMSE of 0.9240670. 
The advantages of working with Spark as oppose to local memory were its ability to handle the entire dataset of 583933 where before was almost impossible and the high-level libraries, including support for SQL queries.

The architechture of Apache Spark-- "At a fundamental level, an Apache Spark application consists of two main components: a driver, which converts the user's code into multiple tasks that can be distributed across worker nodes, and executors, which run on those nodes and execute the tasks assigned to them. Some form of cluster manager is necessary to mediate between the two." from https://www.infoworld.com/article/3236869/what-is-apache-spark-the-big-data-platform-that-crushed-hadoop.html

The ALS model that we build resulted in 22 latent factors and regulization = 0.1 with RMSE of 1.89644273379, which is bigger than both IBCF= 1.1570069 and UBCF= 0.9240670 from the previous project. Although when evaluating the model we can see that the items that were recommended to one user were all with similar ratings.
A possible improvemnet of the model might include some implicit data where we can also have additional information such as genre, number of views, clicks, purchases etc and set the implicitPrefs=True to get a better idea of the similarity of the iteams and also to improve the results/improve the model.