# Project Big Data Yelp

**This notebook has been written using spark. Please open it with databricks**

In [2]:
spark.conf.set(
  "fs.azure.account.key.storagestudent.blob.core.windows.net", 
  "8ykAjWw0X+FVY8PnmVhnY5XyDdbpBLceLsFTbuTttdn2W76+sdAsSwYNQ9E+H8Oxlsay8/uCm8ayyWq73kB82Q=="
)

In [3]:
datasets = {
  dataset: spark.read.load( 
    "wasbs://default@storagestudent.blob.core.windows.net/datasets/S8-4/Exo/restaurant-data-with-consumer-ratings/{0}.csv".format(dataset), 
    format="csv",
    header="true"
  )
  for dataset in [
    "chefmozaccepts", 
    "rating_final", 
    "userpayment"
  ]
} 
ratings = datasets["rating_final"]
chef = datasets["chefmozaccepts"]
user_payment = datasets["userpayment"]

In [4]:
display(ratings)

userID,placeID,rating,food_rating,service_rating
U1077,135085,2,2,2
U1077,135038,2,2,1
U1077,132825,2,2,2
U1077,135060,1,2,2
U1068,135104,1,1,2
U1068,132740,0,0,0
U1068,132663,1,1,1
U1068,132732,0,0,0
U1068,132630,1,1,1
U1067,132584,2,2,2


## Preprocessing

In [6]:
from pyspark.sql import functions as F
from pyspark.sql.types import *

In [7]:
length = F.udf(len, IntegerType())

In [8]:
display(ratings.select(length(F.col("userID"))).distinct())

len(userID)
5


all userID have the same length, I can use "substr" to remove the "U" on all the dataset 

In [10]:
ratings = ratings.select(F.col("userID").substr(2,5).cast(IntegerType()).alias("userID"),
                       F.col("placeID").cast(IntegerType()),
                       F.col("rating").cast(IntegerType())
                      )

In [11]:
display(ratings)

userID,placeID,rating
1077,135085,2
1077,135038,2
1077,132825,2
1077,135060,1
1068,135104,1
1068,132740,0
1068,132663,1
1068,132732,0
1068,132630,1
1067,132584,2


In [12]:
ratings.select("placeID").distinct().count()

In [13]:
ratings.select("userID").distinct().count()

This dataset contains 130 different restaurants and 138 different clients 

# Model Training

In [16]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [17]:
train, test = ratings.randomSplit([0.7, 0.3], 100)

In [18]:
als = ALS(userCol = "userID", itemCol="placeID", ratingCol = "rating", coldStartStrategy="drop", nonnegative=True)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
paramGrid = ParamGridBuilder()\
            .addGrid(als.rank, [1,5])\
            .addGrid(als.maxIter, [10,15]).build()

In [19]:
cv = CrossValidator(
  estimator = als,
  evaluator = evaluator,
  estimatorParamMaps = paramGrid,
  numFolds = 5
)

In [20]:
model = cv.fit(train)

# Evaluation

In [22]:
predictions = model.transform(test)
evaluator.evaluate(predictions)

In [23]:
display(predictions)

userID,placeID,rating,prediction
1100,135000,2,1.5704908
1072,135000,0,1.5042318
1077,135027,0,1.0472511
1075,135066,2,0.7830303
1058,135066,1,1.6951612
1014,135066,2,1.5049499
1031,132663,0,0.0
1087,132663,1,0.32778248
1069,135108,0,0.52731025
1111,135108,2,1.3544235


In [24]:
import pandas as pd
params = [{p.name: v for p, v in m.items()} for m in model.getEstimatorParamMaps()]
pd.DataFrame.from_dict([
    {model.getEvaluator().getMetricName(): metric, **ps} 
    for ps, metric in zip(params, model.avgMetrics)
]).sort_values("rmse")

Unnamed: 0,maxIter,rank,rmse
1,15,1,0.748526
0,10,1,0.748696
2,10,5,0.837963
3,15,5,0.843146


# Recommendations

In [26]:
user_recommendations = model.bestModel.recommendForAllUsers(10)
display(user_recommendations)

userID,recommendations
1088,"List(List(132717, 1.9194726), List(134986, 1.6705197), List(132922, 1.6403724), List(132755, 1.6327808), List(132773, 1.619939), List(132937, 1.602175), List(132851, 1.5752245), List(132613, 1.5486337), List(135034, 1.5376678), List(135000, 1.5149316))"
1084,"List(List(132717, 2.658769), List(134986, 2.3139303), List(132922, 2.2721715), List(132755, 2.2616558), List(132773, 2.243868), List(132937, 2.2192621), List(132851, 2.1819317), List(132613, 2.1450992), List(135034, 2.1299095), List(135000, 2.0984166))"
1025,"List(List(132717, 2.3688438), List(134986, 2.061608), List(132922, 2.024403), List(132755, 2.015034), List(132773, 1.9991859), List(132937, 1.9772631), List(132851, 1.9440033), List(132613, 1.9111872), List(135034, 1.8976539), List(135000, 1.869595))"
1127,"List(List(132717, 3.2231047), List(134986, 2.8050725), List(132922, 2.7544503), List(132755, 2.7417026), List(132773, 2.7201393), List(132937, 2.6903107), List(132851, 2.6450565), List(132613, 2.6004062), List(135034, 2.5819926), List(135000, 2.543815))"
1133,"List(List(132717, 2.053721), List(134986, 1.7873563), List(132922, 1.7551005), List(132755, 1.7469778), List(132773, 1.7332379), List(132937, 1.7142315), List(132851, 1.6853962), List(132613, 1.6569455), List(135034, 1.6452125), List(135000, 1.6208863))"
1005,"List(List(132717, 2.5342004), List(134986, 2.2055182), List(132922, 2.165716), List(132755, 2.155693), List(132773, 2.1387386), List(132937, 2.1152856), List(132851, 2.079704), List(132613, 2.0445971), List(135034, 2.0301192), List(135000, 2.0001018))"
1016,"List(List(132717, 2.8693225), List(134986, 2.4971755), List(132922, 2.4521098), List(132755, 2.4407616), List(132773, 2.421565), List(132937, 2.3950105), List(132851, 2.3547237), List(132613, 2.3149743), List(135034, 2.2985818), List(135000, 2.2645948))"
1068,"List(List(132717, 1.1728821), List(134986, 1.0207609), List(132922, 1.0023396), List(132755, 0.9977007), List(132773, 0.98985386), List(132937, 0.97899926), List(132851, 0.9625314), List(132613, 0.94628316), List(135034, 0.93958247), List(135000, 0.92568976))"
1031,"List(List(132560, 0.0), List(132630, 0.0), List(132660, 0.0), List(132740, 0.0), List(132830, 0.0), List(132870, 0.0), List(135000, 0.0), List(135030, 0.0), List(135040, 0.0), List(135050, 0.0))"
1051,"List(List(132717, 1.2751657), List(134986, 1.1097785), List(132922, 1.0897508), List(132755, 1.0847074), List(132773, 1.0761762), List(132937, 1.064375), List(132851, 1.046471), List(132613, 1.0288059), List(135034, 1.0215207), List(135000, 1.0064164))"


In [27]:
item_recommendations = model.bestModel.recommendForAllItems(10)
display(item_recommendations)

placeID,recommendations
135000,"List(List(1021, 3.1574771), List(1055, 2.631266), List(1020, 2.5615323), List(1127, 2.543815), List(1052, 2.5283103), List(1078, 2.4990141), List(1102, 2.3995512), List(1110, 2.3615167), List(1071, 2.3075306), List(1054, 2.3022351))"
135027,"List(List(1021, 1.9296571), List(1055, 1.6080691), List(1020, 1.5654521), List(1127, 1.5546243), List(1052, 1.5451487), List(1078, 1.5272448), List(1102, 1.466459), List(1110, 1.4432147), List(1071, 1.4102218), List(1054, 1.4069854))"
135066,"List(List(1021, 2.690701), List(1055, 2.2422807), List(1020, 2.1828558), List(1127, 2.1677575), List(1052, 2.154545), List(1078, 2.1295798), List(1102, 2.0448205), List(1110, 2.0124087), List(1071, 1.9664037), List(1054, 1.9618909))"
132663,"List(List(1021, 1.4287605), List(1055, 1.1906496), List(1020, 1.159095), List(1127, 1.151078), List(1052, 1.1440622), List(1078, 1.1308056), List(1102, 1.0857985), List(1110, 1.0685879), List(1071, 1.0441592), List(1054, 1.0417628))"
135108,"List(List(1021, 3.1500924), List(1055, 2.625112), List(1020, 2.5555413), List(1127, 2.5378654), List(1052, 2.522397), List(1078, 2.4931693), List(1102, 2.393939), List(1110, 2.3559935), List(1071, 2.3021338), List(1054, 2.2968504))"
135071,"List(List(1021, 2.1168242), List(1055, 1.7640436), List(1020, 1.7172929), List(1127, 1.7054149), List(1052, 1.6950203), List(1078, 1.6753798), List(1102, 1.6086981), List(1110, 1.5831991), List(1071, 1.5470061), List(1054, 1.5434558))"
132723,"List(List(1021, 2.609916), List(1055, 2.174959), List(1020, 2.1173182), List(1127, 2.1026733), List(1052, 2.0898576), List(1078, 2.0656419), List(1102, 1.9834274), List(1110, 1.9519887), List(1071, 1.9073648), List(1054, 1.9029876))"
135062,"List(List(1021, 2.5019517), List(1055, 2.0849876), List(1020, 2.0297313), List(1127, 2.0156922), List(1052, 2.0034065), List(1078, 1.9801925), List(1102, 1.901379), List(1110, 1.8712409), List(1071, 1.828463), List(1054, 1.8242668))"
132862,"List(List(1021, 2.543925), List(1055, 2.1199658), List(1020, 2.0637825), List(1127, 2.0495079), List(1052, 2.0370162), List(1078, 2.0134127), List(1102, 1.933277), List(1110, 1.9026333), List(1071, 1.8591378), List(1054, 1.8548712))"
132773,"List(List(1021, 3.3763375), List(1055, 2.813652), List(1020, 2.7390847), List(1127, 2.7201393), List(1052, 2.7035599), List(1078, 2.672233), List(1102, 2.5658758), List(1110, 2.525205), List(1071, 2.467477), List(1054, 2.4618142))"


# Getting in precision ...

In [29]:
ratings_predictions = predictions.select(
  F.concat(F.lit("U"), F.col("userID")).alias("userID"),
  F.col("placeID"),
  F.col("rating"),
  F.col("prediction")
)

In [30]:
display(ratings_predictions)

userID,placeID,rating,prediction
U1100,135000,2,1.5704908
U1072,135000,0,1.5042318
U1077,135027,0,1.0472511
U1075,135066,2,0.7830303
U1058,135066,1,1.6951612
U1014,135066,2,1.5049499
U1031,132663,0,0.0
U1087,132663,1,0.32778248
U1069,135108,0,0.52731025
U1111,135108,2,1.3544235


In [31]:
display(chef)

placeID,Rpayment
135110,cash
135110,VISA
135110,MasterCard-Eurocard
135110,American_Express
135110,bank_debit_cards
135109,cash
135107,cash
135107,VISA
135107,MasterCard-Eurocard
135107,American_Express


In [32]:
display(user_payment)

userID,Upayment
U1001,cash
U1002,cash
U1003,cash
U1004,cash
U1004,bank_debit_cards
U1005,cash
U1006,cash
U1007,cash
U1008,cash
U1009,cash


In [33]:
payment_matching = user_payment.join(chef, user_payment["Upayment"]==chef["Rpayment"], how="inner")
payment_matching = payment_matching.select("userID", "placeID").dropDuplicates()

In [34]:
dataset = ratings_predictions.join(payment_matching, ["userID","placeID"], how="inner")
display(dataset)

userID,placeID,rating,prediction
U1100,135000,2,1.5704908
U1072,135000,0,1.5042318
U1077,135027,0,1.0472511
U1075,135066,2,0.7830303
U1058,135066,1,1.6951612
U1014,135066,2,1.5049499
U1124,135071,1,0.1617671
U1094,135071,0,0.0
U1115,135071,2,1.034408
U1092,135071,1,0.7642129


In [35]:
dataset.count()

In [36]:
test_2 = dataset.select(F.col("userID").substr(2,5).cast(IntegerType()).alias("userID"),
                       F.col("placeID").cast(IntegerType()),
                       F.col("rating").cast(IntegerType())
                       )

In [37]:
predictions_2 = model.transform(test_2)
evaluator.evaluate(predictions_2)

In [38]:
display(predictions_2)

userID,placeID,rating,prediction
1100,135000,2,2.1271644
1072,135000,0,1.5427417
1077,135027,0,1.2429208
1075,135066,2,1.3838029
1058,135066,1,1.891922
1014,135066,2,1.436259
1124,135071,1,0.815222
1094,135071,0,0.0
1115,135071,2,1.0773803
1092,135071,1,0.4832484
