In [1]:
import os
import numpy as np
import pandas as pd
from pyspark.sql import SparkSession
import matplotlib.pyplot as plt
import seaborn as sns
%matplotlib inline

from pyspark.mllib.recommendation import ALS
from pyspark.ml.recommendation import ALS as mlals
from pyspark.ml.evaluation import RegressionEvaluator

import math
from pyspark.sql.functions import udf
from pyspark.sql.types import *
from pyspark.ml.evaluation import RegressionEvaluator

Calling spark session to register application

In [6]:

spark = SparkSession \
    .builder \
    .appName("Recom") \
    .config("spark.recom.demo", "1") \
    .getOrCreate()


Loading and Parsing Dataset

In [7]:

ratings_df = spark.read \
    .format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("C:/Users/arshi/Desktop/ArshiyaUSB/Fall2018/256/Project/CMPE256-Airbnb/Module1/output/rating_prediction_by_sentiment_analysis.csv")

In [8]:
ratings_df


DataFrame[listing_id: int, reviewer_id: int, rating: int]

In [10]:

data = ratings_df.select('reviewer_id','listing_id','Rating')
data.show(5)


+-----------+----------+------+
|reviewer_id|listing_id|Rating|
+-----------+----------+------+
|    6279455|   9452127|     5|
|   23770684|   9452127|     2|
|   29070567|   9452127|     5|
|   58381183|   9452127|     3|
|   29014080|   9452127|     5|
+-----------+----------+------+
only showing top 5 rows



whole dataset is used for recommendation

In [11]:
data.describe().show()

+-------+-------------------+-----------------+------------------+
|summary|        reviewer_id|       listing_id|            Rating|
+-------+-------------------+-----------------+------------------+
|  count|             800995|           800995|            800995|
|   mean|4.230238320549816E7|7180466.618531951|3.6731240519603743|
| stddev|3.903449681193699E7|5966198.629587632| 1.573049482796602|
|    min|                  1|             2515|                 0|
|    max|          152696826|         21154538|                 5|
+-------+-------------------+-----------------+------------------+



To accurately evaluate the model,we need to split dataset into train, validation, and test datasets.

In [15]:

(trainingData,validationData,testData) = data.randomSplit([0.7,0.15,0.15])

In [16]:

validation_for_predict = validationData.select('reviewer_id','listing_id')
test_for_predict = testData.select('reviewer_id','listing_id')

Spark MLlib library for Machine Learning provides a Collaborative Filtering implementation by 
using Alternating Least Squares. The implementation in MLlib has the following parameters:

    1. numBlocks is the number of blocks used to parallelize computation (set to -1 to auto-configure).
    2. rank is the number of latent factors in the model.
    3. iterations is the number of iterations to run.
    4. lambda specifies the regularization parameter in ALS.


In [17]:

seed = 5 
iterations = 5
regularization_parameter = 0.2 
ranks = [8, 12, 16]
nonnegative = True
min_error = float('inf')
best_rank = 0
best_iteration = -1

Use the SparkMLib ALS algorithm to train a model to provide recommendations. The mandatory parameters to the ALS algorithm are the columns that identify the users, the items, and the ratings. Run the train() method to train the model

In [20]:
for rank in ranks:
    model = ALS.train(trainingData, rank, iterations=iterations,
                      lambda_=regularization_parameter,seed=seed,nonnegative=True)
    predictions = model.predictAll(validation_for_predict.rdd).map(lambda r: ((r[0], r[1]), r[2]))
    rates_and_preds = validationData.rdd.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
    error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean()) # RMSE Error
    print ('For rank %s the RMSE is %s' % (rank, error))
    if error < min_error:
        min_error = error
        best_rank = rank

print ('The best model was trained with rank %s and minimum RMSE %s' % (best_rank,min_error))


For rank 8 the RMSE is 2.3138998129294057
For rank 12 the RMSE is 2.3340921573811797
For rank 16 the RMSE is 2.3435864389544623
The best model was trained with rank 8 and minimum RMSE 2.310336342779189


Using parameters obtained from parameter tuning, train model and predict ratings on test data

In [21]:

predictions_test = model.predictAll(test_for_predict.rdd).map(lambda r: ((r[0], r[1]), r[2]))


Using model from pyspark.mllib.recommendation import ALS

In [22]:
predictions_test.collect()

[((3138636, 10731317), 3.211995674474443),
 ((3820552, 1983466), 3.6833904763252288),
 ((41757276, 20775239), 2.693834506518477),
 ((234248, 9435568), 1.5098551367010533),
 ((20707344, 3013158), 0.031187597683649537),
 ((118367668, 17404757), 4.707487232420968),
 ((12903580, 2201154), 3.7719107610021387),
 ((253716, 15797715), 2.0800273163788656),
 ((14949616, 1130680), 3.401430422450034),
 ((33765820, 9513357), 2.4873313163152266),
 ((68839808, 16145871), 1.7118276581022573),
 ((15378032, 13311392), 4.805727762903423),
 ((15378032, 13311392), 4.805727762903423),
 ((19694380, 751818), 1.435023557086831),
 ((15262064, 1085916), 1.8100878610997106),
 ((20535148, 7281066), 0.8936648760914814),
 ((23885652, 5058683), 1.7643306118015365),
 ((141341848, 16525691), 1.7741228042545418),
 ((42044696, 5826716), 1.910650435519933),
 ((16534696, 6500246), 1.667224033195731),
 ((15311576, 690516), 3.539326027844868),
 ((14028432, 155158), 1.5841176135158395),
 ((34422508, 209310), 2.536333680137498

In [28]:
def getRecommendations(user,testDf,trainDf,model):
    # get all user and his/her rated listings
    userDf = testDf.filter(testDf.reviewer_id == user)
    
    # filter listings from main set which have not been rated by selected user
    # and pass it to model we sreated above
    mov = trainDf.select('listing_id').subtract(userDf.select('listing_id'))
    
    # Again we need to covert our dataframe into RDD
    pred_rat = model.predictAll(mov.rdd.map(lambda x: (user, x[0]))).collect()
    
    # Get the top recommendations
    recommendations = sorted(pred_rat, key=lambda x: x[2], reverse=True)[:3]
    
    return recommendations

Get recommendation for a particular user

In [39]:

# Assign user id for which we need recommendations
user = 4298916

# Call getRecommendations method
derived_rec = getRecommendations(user,testData,trainingData,model)

print ("listings recommended for:%d" % user)


listings recommended for:4298916


In [40]:
derived_rec

[Rating(user=4298916, product=8719522, rating=4.397638237776303),
 Rating(user=4298916, product=20715101, rating=4.2130248885822565),
 Rating(user=4298916, product=4817016, rating=4.172347838061459)]

In [38]:
model.recommendProducts(4298916, 3)

[Rating(user=4298916, product=8719522, rating=4.397638237776303),
 Rating(user=4298916, product=20715101, rating=4.2130248885822565),
 Rating(user=4298916, product=4817016, rating=4.172347838061459)]

Recommend top 3 listings to all users

In [41]:
model.recommendProductsForUsers(3).collect()

[(1498000,
  (Rating(user=1498000, product=10120021, rating=6.220082286210225),
   Rating(user=1498000, product=16050081, rating=5.639434171363293),
   Rating(user=1498000, product=18551084, rating=5.602752889431356))),
 (2691056,
  (Rating(user=2691056, product=3457770, rating=8.370602140367826),
   Rating(user=2691056, product=20532328, rating=8.268334387166878),
   Rating(user=2691056, product=16436865, rating=8.259386258567215))),
 (79632096,
  (Rating(user=79632096, product=6627472, rating=6.579925439608241),
   Rating(user=79632096, product=20761753, rating=6.32064006852626),
   Rating(user=79632096, product=17107963, rating=6.310246916284131))),
 (1438816,
  (Rating(user=1438816, product=18793901, rating=5.273661261205453),
   Rating(user=1438816, product=7853110, rating=5.034180961932682),
   Rating(user=1438816, product=13392941, rating=4.990395396390011))),
 (30559456,
  (Rating(user=30559456, product=20482262, rating=6.083380694793209),
   Rating(user=30559456, product=20085

---------------------------------------

Using SPARK ML package 
and best parameters obtained after parameter tuning
Crossvalidate for different folds

Using the RegressionEvaluator method to compare continuous values with the root mean squared calculation.

In [42]:
als =  mlals(maxIter=5,rank=best_rank,seed=5,regParam=0.2,nonnegative=True, userCol="reviewer_id", itemCol="listing_id",ratingCol="Rating",coldStartStrategy="drop")
modelml = als.fit(trainingData)
pred = modelml.transform(trainingData)
    
# Evaluate the model by computing RMSE
evaluator = RegressionEvaluator(metricName="rmse", labelCol="Rating",predictionCol="prediction")
rmse = evaluator.evaluate(pred)

print ('RMSE is %s' % rmse)


RMSE is 0.32258807850771865


Using K fold cross validation

In [43]:
from pyspark.sql import SQLContext
from pyspark.context import SparkContext

sc = spark.sparkContext

def kfoldALS(data, k=3, userCol="reviewer_id", itemCol="listing_id", ratingCol="Rating", metricName="rmse"):
    evaluations = []
    weights = [1.0] * k
    splits = data.randomSplit(weights)
    for i in range(0, k):  
        testingSet = splits[i]
        trainingSet = spark.createDataFrame(sc.emptyRDD(), data.schema)
        for j in range(0, k):
            if i == j:
                continue
            else:
                trainingSet = trainingSet.union(splits[j])
        modelml = als.fit(trainingSet)
        predictions = modelml.transform(testingSet)
        evaluator = RegressionEvaluator(metricName=metricName, labelCol="Rating", predictionCol="prediction")
        evaluation = evaluator.evaluate(predictions.na.drop())
        print ("Loop " + str(i+1) + ": " + metricName + " = " + str(evaluation))
        evaluations.append(evaluation)
    return sum(evaluations)/float(len(evaluations))

In [44]:
print ("RMSE = " + str(kfoldALS(data, k=3)))

Loop 1: rmse = 2.339179534073168
Loop 2: rmse = 2.3353140952126887
Loop 3: rmse = 2.329706128349394
RMSE = 2.334733252545084


Test accuracy of modelml from spark.ml package on test set

In [45]:
pred_test = modelml.transform(testData)
    
# Evaluate the model by computing RMSE
evaluator = RegressionEvaluator(metricName="rmse", labelCol="Rating",predictionCol="prediction")
rmse = evaluator.evaluate(pred_test)

print ('RMSE is %s' % rmse)

RMSE is 2.315878781573675


We can use recommendForAllUsers() method from Spark.ml to recommend top-N listings to all users

In [46]:
userRecs = modelml.recommendForAllUsers(3)

In [47]:
userRecs.show(truncate=False)

+-----------+--------------------------------------------------------------------+
|reviewer_id|recommendations                                                     |
+-----------+--------------------------------------------------------------------+
|1580       |[[18412502, 5.434989], [19493897, 5.427614], [20252916, 5.3878593]] |
|5803       |[[19242867, 7.090307], [458377, 7.0143676], [19618550, 7.007602]]   |
|14570      |[[12833198, 7.8791685], [7675994, 7.693499], [20401541, 7.6932373]] |
|16386      |[[19860469, 5.341811], [12385036, 5.2844076], [10207451, 5.2550488]]|
|16861      |[[9894602, 6.4305577], [15361994, 6.4305353], [13016230, 6.412688]] |
|19530      |[[16960374, 9.53728], [11786930, 9.344186], [19679564, 9.293844]]   |
|29719      |[[18412502, 7.5303593], [20252916, 7.4749584], [9559239, 7.323085]] |
|32539      |[[15631030, 3.3553224], [16520303, 3.3189719], [8844652, 3.2789319]]|
|34061      |[[17261226, 8.34224], [18834905, 8.320983], [17733383, 8.301521]]   |
|356

In [48]:
userRecs.where(userRecs.reviewer_id == 53).select("recommendations.listing_id", "recommendations.Rating").collect()

[Row(listing_id=[3372925, 9906178, 1934765], Rating=[7.526704788208008, 7.5143327713012695, 7.486354827880859])]

In [None]:
temp_path = "/Users/arshi/Downloads/arshiya"
als_path = temp_path + "/als"
als.save(als_path)
als2 = mlals.load(als_path)


In [None]:
model_path = temp_path + "/als_model"
modelml.save(model_path)

In [61]:
from pyspark.ml.recommendation import ALSModel
model2 = ALSModel.load(model_path)
modelml.rank == model2.rank

True