In [38]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from pyspark.ml import Pipeline
from pyspark.sql.functions import *
from pyspark.sql.types import *


from pyspark.sql import Row

from pyspark.ml.recommendation import ALS, ALSModel
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator


In [3]:
data_path = '/home/osboxes/yelp-data/dataset/'
model_path = '/home/osboxes/yelp-data/'

### Collaborative Filtering

In [4]:
# create the business dataframe
business_df = spark.read.parquet(data_path + 'business-small.parquet')

# print the schema of the review dataframe
business_df.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- business_name: string (nullable = true)
 |-- neighborhood: string (nullable = true)
 |-- address: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- postal_code: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- stars: double (nullable = true)
 |-- review_count: long (nullable = true)
 |-- categories: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [5]:
# create the business dataframe
user_df = spark.read.parquet(data_path + 'user-small.parquet')

# print the schema of the review dataframe
user_df.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- user_name: string (nullable = true)
 |-- review_count: long (nullable = true)
 |-- yelping_since: string (nullable = true)
 |-- useful: long (nullable = true)
 |-- funny: long (nullable = true)
 |-- cool: long (nullable = true)
 |-- fans: long (nullable = true)
 |-- average_stars: double (nullable = true)



In [6]:
# create the review dataframe

review_df = spark.read.parquet(data_path + 'review-small.parquet')

# print the schema of the review dataframe
review_df.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- business_id: string (nullable = true)
 |-- stars: long (nullable = true)
 |-- review_date: string (nullable = true)
 |-- review_text: string (nullable = true)
 |-- useful: long (nullable = true)
 |-- funny: long (nullable = true)
 |-- cool: long (nullable = true)



In [7]:
business_df.select('business_id', 'business_name').show(2)

+--------------------+--------------------+
|         business_id|       business_name|
+--------------------+--------------------+
|qim0lD112TkDhm8Zy...|McCarthy's Irish Pub|
|Wf5C8Amv_SlhoYE3_...|         Oishi Sushi|
+--------------------+--------------------+
only showing top 2 rows



In [8]:
user_df.select('user_id', 'user_name').show(2)

+--------------------+---------+
|             user_id|user_name|
+--------------------+---------+
|om5ZiponkpRqUNa3p...|   Andrea|
|Wc5L6iuvSNF5WGBlq...|     Risa|
+--------------------+---------+
only showing top 2 rows



In [9]:
review_df.select('user_id', 'business_id', 'stars').show(2)

+--------------------+--------------------+-----+
|             user_id|         business_id|stars|
+--------------------+--------------------+-----+
|djpMXOA1ic5wv3FPt...|mr4FiPaXTWlJ3qGzp...|    3|
|-pXs08gJq9ExIk275...|mr4FiPaXTWlJ3qGzp...|    3|
+--------------------+--------------------+-----+
only showing top 2 rows



Spark ALS implementation requires the rating matrix to have the follwoing data types:

```
ratings_df_schema = StructType(
[StructField('userId', IntegerType()),
 StructField('businessId', IntegerType()),
 StructField('rating', DoubleType())]
)
```

So, we need to map existing string user_id, and business_id to integer values

In [10]:
# create a new userId column (integer)

user_newid_df = sqlContext.createDataFrame(user_df.rdd.map(lambda x: x[0]).zipWithIndex(), \
        StructType([StructField("user_id", StringType(), True),StructField("userId", IntegerType(), True)]))

user_newid_df.show(2)

+--------------------+------+
|             user_id|userId|
+--------------------+------+
|om5ZiponkpRqUNa3p...|     0|
|Wc5L6iuvSNF5WGBlq...|     1|
+--------------------+------+
only showing top 2 rows



In [11]:
# add the new userId column the user dataframe

a = user_df.alias("a")
b = user_newid_df.alias("b")
    
user_new_df = a.join(b, col("a.user_id") == col("b.user_id"), 'inner') \
             .select([col('a.'+xx) for xx in a.columns] + [col('b.userId')])

user_new_df.select('userId','user_id', 'user_name').show(2)

+------+--------------------+---------+
|userId|             user_id|user_name|
+------+--------------------+---------+
|     0|om5ZiponkpRqUNa3p...|   Andrea|
|     1|Wc5L6iuvSNF5WGBlq...|     Risa|
+------+--------------------+---------+
only showing top 2 rows



In [12]:

# create a new businessId column (integer)

business_newid_df = sqlContext.createDataFrame(business_df.rdd.map(lambda x: x[0]).zipWithIndex(), \
        StructType([StructField("business_id", StringType(), True),StructField("businessId", IntegerType(), True)]))

business_newid_df.show(2)

+--------------------+----------+
|         business_id|businessId|
+--------------------+----------+
|qim0lD112TkDhm8Zy...|         0|
|Wf5C8Amv_SlhoYE3_...|         1|
+--------------------+----------+
only showing top 2 rows



In [13]:
# add the new businessId column the business dataframe

a = business_df.alias("a")
b = business_newid_df.alias("b")
    
business_new_df = a.join(b, col("a.business_id") == col("b.business_id"), 'inner') \
             .select([col('a.'+xx) for xx in a.columns] + [col('b.businessId')])

business_new_df.select('businessId','business_id', 'business_name').show(2)

+----------+--------------------+--------------------+
|businessId|         business_id|       business_name|
+----------+--------------------+--------------------+
|         0|qim0lD112TkDhm8Zy...|McCarthy's Irish Pub|
|         1|Wf5C8Amv_SlhoYE3_...|         Oishi Sushi|
+----------+--------------------+--------------------+
only showing top 2 rows



In [14]:
# map new userId and businessId in the review dataframe

review_df = review_df.select('user_id', 'business_id', 'stars')


# map the userId
a = review_df.alias("a")
b = user_newid_df.alias("b")
    
review_userId_df = a.join(b, col("a.user_id") == col("b.user_id"), 'inner') \
                     .select([col('a.'+xx) for xx in a.columns] + [col('b.userId')])

# map the businessId
a = review_userId_df.alias("a")
b = business_newid_df.alias("b")

review_userId_businessId_df = a.join(b, col("a.business_id") == col("b.business_id"), 'inner') \
                         .select([col('a.'+xx) for xx in a.columns] + [col('b.businessId')])

review_userId_businessId_df.show(2)

+--------------------+--------------------+-----+------+----------+
|             user_id|         business_id|stars|userId|businessId|
+--------------------+--------------------+-----+------+----------+
|u642WP1g6Z3oRA9qd...|1RFIVcZYV77tGIwVV...|    5| 23561|       872|
|CGmWH1Nwx1hbasHqo...|1RFIVcZYV77tGIwVV...|    4|  6268|       872|
+--------------------+--------------------+-----+------+----------+
only showing top 2 rows



In [15]:
# create the rating dataframe required by the ALS model

rating_df = review_userId_businessId_df.select('userId', 'businessId', review_userId_businessId_df.stars.cast('float').alias('rating'))
rating_df.show(2)
rating_df.printSchema()

+------+----------+------+
|userId|businessId|rating|
+------+----------+------+
| 23561|       872|   5.0|
|  6268|       872|   4.0|
+------+----------+------+
only showing top 2 rows

root
 |-- userId: integer (nullable = true)
 |-- businessId: integer (nullable = true)
 |-- rating: float (nullable = true)



In [33]:

(train, test) = rating_df.randomSplit([0.8, 0.2], seed=123)

```
# very lengthy process of Cross Validation

als = ALS(userCol="userId", itemCol="businessId", ratingCol="rating", coldStartStrategy="drop")

param_grid = ParamGridBuilder().addGrid(
    als.rank,
    [10, 15, 20],
).addGrid(
    als.maxIter,
    [10, 15, 20],
).build()

evaluator = RegressionEvaluator(
    metricName="rmse",
    labelCol="rating",
)

cv = CrossValidator(estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=5, seed=123)
cv_als_model = cv.fit(train)

# Evaluate the model by computing the RMSE on the test data

als_predictions = cv_als_model.bestModel.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse = evaluator.evaluate(als_predictions)
print("Root-mean-square error = " + str(rmse))

best_model = cv_als_model.bestModel

#best_rank is 20
best_model.rank

#best_maxIter is 20
(best_model
    ._java_obj     # Get Java object
    .parent()      # Get parent (ALS estimator)
    .getMaxIter()) # Get maxIter


# Root-mean-square error is 1.3383152747968081
```

In [34]:

alsb = ALS(rank=20, maxIter=20, regParam=0.3, userCol="userId", itemCol="businessId", ratingCol="rating", \
               coldStartStrategy="drop", seed=123)
alsb_model = alsb.fit(train)

alsb_predictions = alsb_model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse = evaluator.evaluate(alsb_predictions)
print("Root-mean-square error = " + str(rmse))

# save the ALS model
alsb_model.write().overwrite().save(model_path + 'als')


Root-mean-square error = 1.2579688933524986


In [41]:
# load a new instance of the saved ALS model
alsn_model = ALSModel.load(model_path + 'als')

In [42]:
# generate top 10 business recommendations for each user

userRecs = alsn_model.recommendForAllUsers(10)


In [82]:
userRecs.cache()
userRecs.show(1, truncate = False)

+------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|userId|recommendations                                                                                                                                                             |
+------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|148   |[[4592,4.6922364], [2432,4.614214], [401,4.4767885], [6596,4.439251], [2848,4.3852043], [810,4.359094], [4267,4.340767], [1410,4.307626], [4390,4.2542963], [4408,4.245825]]|
+------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
only showing top 1 row



In [63]:
u_id = 'ZWD8UH1T7QXQr0Eq-mcWYg'

uId = user_newid_df.filter(col('user_id') == u_id).select('userId').collect()[0][0]

userFlatRec =  sqlContext.createDataFrame(userRecs.filter(col('userId') == uId).rdd.flatMap(lambda p: p[1]))


In [64]:
userFlatRec.show()

+----------+------------------+
|businessId|            rating|
+----------+------------------+
|      2432| 4.570364475250244|
|      4592| 4.527902126312256|
|      2668| 4.490123271942139|
|       401| 4.438344955444336|
|       810| 4.414823055267334|
|      2028|4.4012932777404785|
|      3237| 4.399106025695801|
|      4133| 4.392766952514648|
|      4408| 4.377004623413086|
|      5664| 4.362335205078125|
+----------+------------------+



In [68]:

a = business_new_df.alias("a")
b = userFlatRec.alias("b")

user_collab_df = a.join(b, col("a.businessId") == col("b.businessId"), 'inner') \
                         .select([col('a.'+xx) for xx in a.columns] + [col('b.rating')])
    
user_collab_df.show()

+--------------------+--------------------+--------------------+--------------------+-------+-----+-----------+-------------+--------------+-----+------------+--------------------+----------+------------------+
|         business_id|       business_name|        neighborhood|             address|   city|state|postal_code|     latitude|     longitude|stars|review_count|          categories|businessId|            rating|
+--------------------+--------------------+--------------------+--------------------+-------+-----+-----------+-------------+--------------+-----+------------+--------------------+----------+------------------+
|2H5EaBEreDzzP7sPm...|          Vila Verde|   Trinity Bellwoods| 869 Dundas Street W|Toronto|   ON|    M6J 1V6|   43.6512434|   -79.4106314|  4.0|           4|[Restaurants, Eve...|      5664| 4.362335205078125|
|mpDcuUs6dB5uBsYVK...| Druxy's Famous Deli|       Downtown Core|Commerce Court, 1...|Toronto|   ON|    M5J 2S4|   43.6482347|   -79.3795255|  4.0|          

In [76]:
def getCollabRecom(u_id):
    

    uId = user_newid_df.filter(col('user_id') == u_id).select('userId').collect()[0][0]

    userFlatRec =  sqlContext.createDataFrame(userRecs.filter(col('userId') == uId).rdd.flatMap(lambda p: p[1]))

    a = userFlatRec.alias("a")
    b = business_new_df.alias("b")
    
    return a.join(b, col("a.businessId") == col("b.businessId"), 'inner') \
             .select([col('b.business_id'), col('a.rating'), col('b.business_name'),col('b.categories'),
                                                           col('b.stars'),col('b.review_count'),
                                                           col('b.latitude'),col('b.longitude')]) \
             .orderBy("rating", ascending = False)
    

In [75]:
getBusinessDetails(userFlatRec).toPandas()

Unnamed: 0,business_id,rating,business_name,categories,stars,review_count,latitude,longitude
0,LcIgUlWaJJwtOfPoPWCmBg,4.570364,Souppe Shoppe,"[Restaurants, Street Vendors, Food, Soup, Food...",5.0,4,43.651425,-79.404123
1,mpDcuUs6dB5uBsYVKDWCNQ,4.527902,Druxy's Famous Deli,"[Restaurants, Sandwiches, Delis, Breakfast & B...",4.0,4,43.648235,-79.379525
2,1VAsBosvx02jpvIUxiKvmg,4.490123,The Dumpling Shop,"[Restaurants, Specialty Food, Chinese, Dim Sum...",4.5,11,43.767971,-79.401363
3,9GLN1xfck07CKfNfejKCwg,4.438345,T-Sushi,"[Food, Restaurants, Sushi Bars, Food Delivery ...",5.0,13,43.644745,-79.390892
4,vAz5pelrjwkpMDo_OHCDAg,4.414823,Kuya Willie's Kainan,"[Breakfast & Brunch, Filipino, Restaurants]",3.5,3,43.759288,-79.310866
5,y9yeMK6N0UINVECI3Ijz3Q,4.401293,Hot Dog Stand,"[Hot Dogs, Restaurants]",4.0,3,43.681236,-79.377222
6,XKa5R1lJSvNrbo8InhNliQ,4.399106,Toronto Star Food Building,"[Food, Fast Food, Restaurants]",4.5,3,43.632265,-79.420313
7,LIjlU7K-0SPXPtYFQiXamQ,4.392767,Magic Oven,"[Food Stands, Sandwiches, Restaurants, Indian]",5.0,3,43.652294,-79.405521
8,fxRcHzovnRyWh_WMdQoNOQ,4.377005,Taj Restaurant,"[Restaurants, Russian, Mediterranean]",5.0,4,43.696764,-79.446227
9,2H5EaBEreDzzP7sPmD_oDQ,4.362335,Vila Verde,"[Restaurants, Event Planning & Services, Portu...",4.0,4,43.651243,-79.410631


In [83]:
u_id = 'ZWD8UH1T7QXQr0Eq-mcWYg'

getCollabRecom(u_id).toPandas()

Unnamed: 0,business_id,rating,business_name,categories,stars,review_count,latitude,longitude
0,LcIgUlWaJJwtOfPoPWCmBg,4.570364,Souppe Shoppe,"[Restaurants, Street Vendors, Food, Soup, Food...",5.0,4,43.651425,-79.404123
1,mpDcuUs6dB5uBsYVKDWCNQ,4.527902,Druxy's Famous Deli,"[Restaurants, Sandwiches, Delis, Breakfast & B...",4.0,4,43.648235,-79.379525
2,1VAsBosvx02jpvIUxiKvmg,4.490123,The Dumpling Shop,"[Restaurants, Specialty Food, Chinese, Dim Sum...",4.5,11,43.767971,-79.401363
3,9GLN1xfck07CKfNfejKCwg,4.438345,T-Sushi,"[Food, Restaurants, Sushi Bars, Food Delivery ...",5.0,13,43.644745,-79.390892
4,vAz5pelrjwkpMDo_OHCDAg,4.414823,Kuya Willie's Kainan,"[Breakfast & Brunch, Filipino, Restaurants]",3.5,3,43.759288,-79.310866
5,y9yeMK6N0UINVECI3Ijz3Q,4.401293,Hot Dog Stand,"[Hot Dogs, Restaurants]",4.0,3,43.681236,-79.377222
6,XKa5R1lJSvNrbo8InhNliQ,4.399106,Toronto Star Food Building,"[Food, Fast Food, Restaurants]",4.5,3,43.632265,-79.420313
7,LIjlU7K-0SPXPtYFQiXamQ,4.392767,Magic Oven,"[Food Stands, Sandwiches, Restaurants, Indian]",5.0,3,43.652294,-79.405521
8,fxRcHzovnRyWh_WMdQoNOQ,4.377005,Taj Restaurant,"[Restaurants, Russian, Mediterranean]",5.0,4,43.696764,-79.446227
9,2H5EaBEreDzzP7sPmD_oDQ,4.362335,Vila Verde,"[Restaurants, Event Planning & Services, Portu...",4.0,4,43.651243,-79.410631
