In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('recommender').getOrCreate()
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder

In [None]:
# SIMPLE WAY

In [2]:
import pandas as pd
reviews = pd.read_csv('data/upload/all_reviews_through_SA.csv')
reviews2 = pd.read_csv('data/upload/all_reviews_through_SA_class.csv')
datatest, datatrain  = reviews.sample(frac =.2), reviews.sample(frac=.8)
datatest2, datatrain2  = reviews2.sample(frac =.2), reviews2.sample(frac=.8)
DataTrain = spark.createDataFrame(datatrain)
DataTest = spark.createDataFrame(datatest)
DataTrain2 = spark.createDataFrame(datatrain2)
DataTest2 = spark.createDataFrame(datatest2)

In [6]:
def RE2(thecolumn, datatrain, datatest):
    als = ALS(userCol='userId', itemCol='itemId', 
              ratingCol=thecolumn, nonnegative=True,
              )
    #Tune model using ParamGridBuilder
    param_grid = ParamGridBuilder()\
                .addGrid(als.rank,[12,13,14])\
                .addGrid(als.maxIter,[18,19,20])\
                .addGrid(als.regParam, [.17, .18, .19])\
                .build()
    #Define evaluator as RMSE
    evaluator_ = RegressionEvaluator(metricName="rmse", labelCol=thecolumn,
                                   predictionCol = "prediction")
    #Build cross validation using TrainvalidationSplit
    tvs = TrainValidationSplit(
        estimator = als,
        estimatorParamMaps = param_grid,
        evaluator=evaluator_)
    #Fit ALS model to training data
    model = tvs.fit(datatrain)
    #Extract best model from the tuning exercise using ParamGridBuilder
    bmodel = model.bestModel
    # Generate Predictions and evaluate using RMSE
    predictions = bmodel.transform(datatest)
    predictions = predictions.na.drop()
    rmse = evaluator_.evaluate(predictions)
    mIter = bmodel._java_obj.parent().getMaxIter()
    rParam = bmodel._java_obj.parent().getRegParam()
    return (predictions, model, rmse, mIter, rParam)

In [7]:
prediction_rating_re2, model_rating_re2, rmse_re2, mIter_re2, rParam_re2 = RE2('rating', DataTrain, DataTest)

In [8]:
print("RMSE = " + str(rmse_re2))
print("best model :")
print("maxIter = " + str(mIter_re2))
print("regParam = " + str(rParam_re2))

RMSE = 0.33665470660912095
best model :
maxIter = 18
regParam = 0.17


In [9]:
prediction_tes_re2, model_tes_re2, rmse_tes_re2, mIter_tes_re2, rParam_tes_re2 = RE2('testimony_class', DataTrain, DataTest)

In [10]:
print("RMSE = " + str(rmse_tes_re2))
print("best model :")
print("maxIter = " + str(mIter_tes_re2))
print("regParam = " + str(rParam_tes_re2))

RMSE = 0.22924448598954295
best model :
maxIter = 18
regParam = 0.17


In [11]:
prediction_rating2_re2, model_rating2_re2, rmse2_re2, mIter2_re2, rParam2_re2 = RE2('rating', DataTrain2, DataTest2)

In [12]:
print("RMSE = " + str(rmse2_re2))
print("best model :")
print("maxIter = " + str(mIter2_re2))
print("regParam = " + str(rParam2_re2))

RMSE = 0.32202640819037937
best model :
maxIter = 18
regParam = 0.17


In [13]:
prediction2_tes_re2, model2_tes_re2, rmse2_tes_re2, mIter2_tes_re2, rParam2_tes_re2 = RE2('testimony_class', DataTrain2, DataTest2)

In [14]:
print("RMSE = " + str(rmse2_tes_re2))
print("best model :")
print("maxIter = " + str(mIter2_tes_re2))
print("regParam = " + str(rParam2_tes_re2))

RMSE = 0.320391667778923
best model :
maxIter = 18
regParam = 0.17


In [6]:
def RE(thecolumn, datatrain, datatest):
    als = ALS(userCol='userId', itemCol='itemId', 
              ratingCol=thecolumn, nonnegative=True)
    model = als.fit(datatrain)
    # Generate Predictions and evaluate using RMSE
    predictions = model.transform(datatest)
    predictions = predictions.na.drop()
    return (predictions, model)

In [7]:
prediction_rating, model_rating = RE('rating', DataTrain, DataTest)
prediction_testimony_class, model_tclass = RE('testimony_class', DataTrain, DataTest)

In [8]:
prediction_rating2, model_rating2 = RE('rating', DataTrain2, DataTest2)
prediction_testimony_class2, model_tclass2 = RE('testimony_class', DataTrain2, DataTest2)

In [5]:
user_recs_tc_top3 = model_tclass.recommendForAllUsers(3)
user_recs_rat_top3 = model_rating.recommendForAllUsers(3)

In [6]:
user_recs_tc_top3.show(3)
user_recs_rat_top3.show(3)

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|  1580|[[16, 1.8787358],...|
|  1591|[[5, 1.8766649], ...|
|  1342|[[28, 1.9247665],...|
+------+--------------------+
only showing top 3 rows

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|  1580|[[36, 5.064891], ...|
|  1591|[[18, 4.4343705],...|
|  1342|[[3, 5.155411], [...|
+------+--------------------+
only showing top 3 rows



In [7]:
user_recs_tc_top3.select(['recommendations']).filter(
    user_recs_tc_top3.userId == 1580).first()

Row(recommendations=[Row(itemId=16, rating=1.8787357807159424), Row(itemId=40, rating=1.856463074684143), Row(itemId=7, rating=1.8411509990692139)])

In [8]:
user_recs_rat_top3.select(['recommendations']).filter(
    user_recs_rat_top3.userId == 1580).first()

Row(recommendations=[Row(itemId=36, rating=5.0648908615112305), Row(itemId=16, rating=4.81621789932251), Row(itemId=24, rating=4.691054344177246)])

In [6]:
import numpy as np

In [7]:
TopRecBasedOnTestimony = pd.DataFrame(np.array([[16, 3], [40, 2], [7,1]]),
                                              columns=['itemId', 'points_tes'])
TopRecBasedOnRating= pd.DataFrame(np.array([[36, 3], [16, 2], [24,1]]),
                                              columns=['itemId', 'points_rat'])

In [8]:
topRecommendations = pd.merge(
    TopRecBasedOnTestimony, TopRecBasedOnRating,
    on='itemId', how='outer').fillna(0)

In [9]:
# calculate the result
topRecommendations['Result'] = topRecommendations.apply(
    lambda row: (row['points_rat']+row['points_tes'])/2,
    axis=1
)
topRecommendations = topRecommendations.sort_values(by=['Result'],ascending=[0])
topRecommendations

Unnamed: 0,itemId,points_tes,points_rat,Result
0,16,3.0,2.0,2.5
3,36,0.0,3.0,1.5
1,40,2.0,0.0,1.0
2,7,1.0,0.0,0.5
4,24,0.0,1.0,0.5


In [12]:
item = pd.read_csv('data/modal/all_item.csv')

In [22]:
rec_item_df = item.loc[item.itemId.isin(topRecommendations.itemId)]

In [24]:
topRecommendationsTitle= pd.merge(
    topRecommendations, rec_item_df,
    on='itemId', how='outer').fillna(0)
topRecommendationsTitle

Unnamed: 0,itemId,points_tes,points_rat,Result,itemname,Category
0,16,3.0,2.0,2.5,"Fire Kids Edition Tablet, 7 Display, Wi-Fi, 16...",Tablet
1,36,0.0,3.0,1.5,"Travel Adapter,WGGE Multi-Nation Travel Adapte...",Adapter
2,40,2.0,0.0,1.0,"Travel Adapter, 2400W International Power Adap...",Adapter
3,7,1.0,0.0,0.5,"Fire Kids Edition Tablet, 7 Display, Wi-Fi, 16...",Tablet
4,24,0.0,1.0,0.5,TCL 32S325 32 Inch 720p Roku Smart LED TV (2019),TV


In [28]:
print("Top recommendation for user", 1580)
topRecommendationsTitle[['itemId', 'itemname']].head(3)


Top recommendation for user 1580


Unnamed: 0,itemId,itemname
0,16,"Fire Kids Edition Tablet, 7 Display, Wi-Fi, 16..."
1,36,"Travel Adapter,WGGE Multi-Nation Travel Adapte..."
2,40,"Travel Adapter, 2400W International Power Adap..."


In [9]:
def Evaluation(thecolumn, theprediction):
    evaluator = RegressionEvaluator(metricName='rmse', 
                                    labelCol=thecolumn, 
                                    predictionCol='prediction')    
    # RMSE Root Mean Square Error
    rmse = evaluator.evaluate(theprediction)
    return (rmse)


In [13]:
rating_rmse = round(Evaluation('rating', 
                               prediction_rating), 3)
testimony_class = round(Evaluation('testimony_class', 
                                   prediction_testimony_class), 3)

In [14]:
print("RMSE of rating predictions is " + str(rating_rmse))
print("RMSE of testimony class predictions is " + str(testimony_class))

RMSE of rating predictions is 0.404
RMSE of testimony class predictions is 0.21


In [15]:
rating_rmse2 = round(Evaluation('rating', 
                               prediction_rating2), 3)
testimony_class2 = round(Evaluation('testimony_class', 
                                   prediction_testimony_class2), 3)

In [16]:
print("RMSE of rating predictions is " + str(rating_rmse2))
print("RMSE of testimony class predictions is " + str(testimony_class2))

RMSE of rating predictions is 0.456
RMSE of testimony class predictions is 0.39


In [None]:
# from pyspark.sql.functions import lit

# def recommendMovies(model, user, nbRecommendations):
#     # Create a Spark DataFrame with the specified user and all the movies listed in the ratings DataFrame
#     dataSet = ratings.select("movieId").distinct().withColumn("userId", lit(user))

#     # Create a Spark DataFrame with the movies that have already been rated by this user
#     moviesAlreadyRated = ratings.filter(ratings.userId == user).select("movieId", "userId")

#     # Apply the recommender system to the data set without the already rated movies to predict ratings
#     predictions = model.transform(dataSet.subtract(moviesAlreadyRated)).dropna().orderBy("prediction", ascending=False).limit(nbRecommendations).select("movieId", "prediction")

#     # Join with the movies DataFrame to get the movies titles and genres
#     recommendations = predictions.join(movies, predictions.movieId == movies.movieId).select(predictions.movieId, movies.title, movies.genres, predictions.prediction)

#     recommendations.show(truncate=False)


RMSE of rating predictions is 2.961
