In [0]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from pyspark.ml import Pipeline
from pyspark.sql.functions import *
from pyspark.sql.types import *


from pyspark.sql import Row

from pyspark.ml.recommendation import ALS, ALSModel
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator

import folium
import html
import pickle
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.session import SparkSession

In [0]:
from google.colab import drive
drive.mount('/content/gdrive')

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


In [0]:
with open('gdrive/My Drive/Yelp Data/dfRestaurants.pickle', 'rb') as f:
    df_business = pickle.load(f)

In [0]:
sc = SparkContext('local')
spark = SparkSession(sc)

In [0]:
df_business_spark = spark.createDataFrame(df_business)

In [0]:
df_business_spark.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- neighborhood: string (nullable = true)
 |-- address: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- postal_code: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- stars: double (nullable = true)
 |-- review_count: long (nullable = true)
 |-- is_open: long (nullable = true)
 |-- categories: string (nullable = true)



In [0]:
df_business_spark.show(5)

+--------------------+--------------------+------------------+--------------------+-------+-----+-----------+-----------------+------------------+-----+------------+-------+--------------------+
|         business_id|                name|      neighborhood|             address|   city|state|postal_code|         latitude|         longitude|stars|review_count|is_open|          categories|
+--------------------+--------------------+------------------+--------------------+-------+-----+-----------+-----------------+------------------+-----+------------+-------+--------------------+
|l09JfMeQ6ynYs5MCJ...|    "Alize Catering"|Yonge and Eglinton|     "2459 Yonge St"|Toronto|   ON|    M4P 2H6|       43.7113993|       -79.3993388|  3.0|          12|      0|Italian;French;Re...|
|1K4qrnfyzKzGgJPBE...|"Chula Taberna Me...|       Leslieville|"1058 Gerrard Str...|Toronto|   ON|    M4M 3A6|43.66925620000001|       -79.3359022|  3.5|          39|      1|Tiki Bars;Nightli...|
|dTWfATVrBfKj7Vdn0...|   

In [0]:
business_rdd = df_business_spark.rdd

In [0]:
business_rdd.take(2)


[Row(business_id='l09JfMeQ6ynYs5MCJtrcmQ', name='"Alize Catering"', neighborhood='Yonge and Eglinton', address='"2459 Yonge St"', city='Toronto', state='ON', postal_code='M4P 2H6', latitude=43.7113993, longitude=-79.3993388, stars=3.0, review_count=12, is_open=0, categories='Italian;French;Restaurants'),
 Row(business_id='1K4qrnfyzKzGgJPBEcJaNQ', name='"Chula Taberna Mexicana"', neighborhood='Leslieville', address='"1058 Gerrard Street E"', city='Toronto', state='ON', postal_code='M4M 3A6', latitude=43.66925620000001, longitude=-79.3359022, stars=3.5, review_count=39, is_open=1, categories='Tiki Bars;Nightlife;Mexican;Restaurants;Bars')]

In [0]:
with open('gdrive/My Drive/Yelp Data/reviews_reduced_columns.pkl', 'rb') as f:
    df_ratings = pickle.load(f)

In [0]:
df_ratings

Unnamed: 0,review_id,user_id,business_id,stars
0,vkVSCC7xljjrAI4UGfnKEQ,bv2nCi5Qv5vroFiqKGopiw,AEx2SYEUJmTxVVB18LlCwA,5
1,n6QzIUObkYshz4dz2QRJTw,bv2nCi5Qv5vroFiqKGopiw,VR6GpWIda3SfvPC-lg9H3w,5
2,MV3CcKScW05u5LVfF6ok0g,bv2nCi5Qv5vroFiqKGopiw,CKC0-MOWMqoeWf6s-szl8g,5
3,IXvOzsEMYtiJI0CARmj77Q,bv2nCi5Qv5vroFiqKGopiw,ACFtxLv8pGrrxMm6EgjreA,4
4,L_9BTb55X0GDtThi6GlZ6w,bv2nCi5Qv5vroFiqKGopiw,s2I_Ni76bjJNK9yG60iD-Q,4
...,...,...,...,...
5261663,PoGSiNz1X5SUu0qEt-qM5w,mPjPyipaD0C_myqWqDipZg,Ngk84Ax1tXgpoJFEGxot3w,1
5261664,-CJNPrDWgIkorx4iEZJXIg,mPjPyipaD0C_myqWqDipZg,pOEL97ld-FJMKO8Ki8JmYg,3
5261665,W9eVvOcpBvG6lpJPoJOxuA,mPjPyipaD0C_myqWqDipZg,5ubokMNw8qfbX2WtxgJG1Q,4
5261666,hqQ1UTFKMN2P1ezUow48OQ,mPjPyipaD0C_myqWqDipZg,EO3i5kTUG7_S2OIQ23sdSA,3


In [0]:
def get_clean_df(df, min_user_review = 30, min_res_review = 0, cols = ['user_id', 'business_id', 'stars']):
    '''Cleans the df and gets rid of the unwanted cols and also allows to filter the user and business based on the min number of reviews received'''
    df_new = df[cols]
    df_new.dropna(axis = 0, how = 'any', inplace = True)
    df_new[cols[1]+'_freq'] = df_new.groupby(cols[1])[cols[1]].transform('count')
    df_clean = df_new[df_new[cols[1]+'_freq']>=min_res_review]
    df_clean[cols[0]+'_freq'] = df_clean.groupby(cols[0])[cols[0]].transform('count')
    df_clean_2 = df_clean[df_clean[cols[0]+'_freq']>=min_user_review]
    return df_clean_2

In [0]:
def get_sparsity(sparse_matrix):
    return 1 - sparse_matrix.nnz/(sparse_matrix.shape[0]*sparse_matrix.shape[1])

In [0]:
'''df_ratings_sparse = get_clean_df(df_ratings, min_user_review=30)'''

'df_ratings_sparse = get_clean_df(df_ratings, min_user_review=30)'

In [0]:
  '''df_ratings_sparse = df_ratings_sparse.sample(n=20000)'''

'df_ratings_sparse = df_ratings_sparse.sample(n=20000)'

In [0]:

'''with open('gdrive/My Drive/Yelp Data/df_ratings_sparse.pkl', 'wb') as f:
  pickle.dump(df_ratings_sparse, f)'''

"with open('gdrive/My Drive/Yelp Data/df_ratings_sparse.pkl', 'wb') as f:\n  pickle.dump(df_ratings_sparse, f)"

In [0]:
with open('gdrive/My Drive/Yelp Data/df_ratings_sparse.pkl', 'rb') as f:
    df_ratings_sparse = pickle.load(f)

In [0]:
df_ratings_sparse.shape

(20000, 5)

In [0]:
df_ratings_spark = spark.createDataFrame(df_ratings_sparse)

In [0]:
#users_df = pd.read_csv('gdrive/My Drive/Yelp Data/yelp_user.csv')

In [0]:
#users_df = users_df.drop(columns = ['review_count','friends','average_stars','name','yelping_since','useful','funny','cool','fans','elite','compliment_hot','compliment_more','compliment_profile','compliment_cute','compliment_list','compliment_note','compliment_plain','compliment_funny','compliment_cool','compliment_writer','compliment_photos',])

In [0]:
#users_df.drop(columns=['review_count','friends','average_stars'])

In [0]:
'''with open('gdrive/My Drive/Yelp Data/df_users.pkl', 'wb') as f:
  pickle.dump(users_df, f)'''

In [0]:
with open('gdrive/My Drive/Yelp Data/df_users.pkl', 'rb') as f:
    df_users = pickle.load(f)

In [0]:
df_users.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1326100 entries, 0 to 1326099
Data columns (total 1 columns):
user_id    1326100 non-null object
dtypes: object(1)
memory usage: 10.1+ MB


In [0]:
df_users_spark = spark.createDataFrame(df_users)

In [0]:
user_newid_df = spark.createDataFrame(df_users_spark.rdd.map(lambda x: x[0]).zipWithIndex(), \
        StructType([StructField("user_id", StringType(), True),StructField("userId", IntegerType(), True)]))

user_newid_df.show(2)

+--------------------+------+
|             user_id|userId|
+--------------------+------+
|JJ-aSuM4pCFPdkfoZ...|     0|
|uUzsFQn_6cXDh6rPN...|     1|
+--------------------+------+
only showing top 2 rows



In [0]:
a = df_users_spark.alias("a")
b = user_newid_df.alias("b")
    
user_new_df = a.join(b, col("a.user_id") == col("b.user_id"), 'inner') \
             .select([col('a.'+xx) for xx in a.columns] + [col('b.userId')])

user_new_df.select('userId','user_id').show(5)

+-------+--------------------+
| userId|             user_id|
+-------+--------------------+
| 545966|--CJT4d-S8UhwqHe0...|
|1079599|-0Ji0nOyFe-4yo8BK...|
|1278580|-0XPr1ilUAfp-yIXZ...|
| 950379|-1KKYzibGPyUX-Mwk...|
|1106539|-1zQA2f_syMAdA04P...|
+-------+--------------------+
only showing top 5 rows



In [0]:
business_newid_df = spark.createDataFrame(df_business_spark.rdd.map(lambda x: x[0]).zipWithIndex(), \
        StructType([StructField("business_id", StringType(), True),StructField("businessId", IntegerType(), True)]))

business_newid_df.show(5)

+--------------------+----------+
|         business_id|businessId|
+--------------------+----------+
|l09JfMeQ6ynYs5MCJ...|         0|
|1K4qrnfyzKzGgJPBE...|         1|
|dTWfATVrBfKj7Vdn0...|         2|
|1nhf9BPXOBFBkbRkp...|         3|
|sJ0MYSAIVK28cMzh-...|         4|
+--------------------+----------+
only showing top 5 rows



In [0]:
a = df_business_spark.alias("a")
b = business_newid_df.alias("b")
    
business_new_df = a.join(b, col("a.business_id") == col("b.business_id"), 'inner') \
             .select([col('a.'+xx) for xx in a.columns] + [col('b.businessId')])

business_new_df.select('businessId','business_id', 'name').show(5)

+----------+--------------------+--------------------+
|businessId|         business_id|                name|
+----------+--------------------+--------------------+
|      4417|1RFIVcZYV77tGIwVV...|    "More Than Pies"|
|      3002|2OFWvbHVwvnva7GxP...|"Pam's Caribbean ...|
|      1287|2vBo1wWJckBnGOHhx...|"St. Louis Bar & ...|
|       653|Be9hkCoOJZB7pUxvj...|"Kebaberie Yorkvi...|
|       642|CyQXHdumQvxKAkXgl...|        "Number One"|
+----------+--------------------+--------------------+
only showing top 5 rows



In [0]:
review_df = df_ratings_spark.select('user_id', 'business_id', 'stars')


# map the userId
a = df_ratings_spark.alias("a")
b = user_newid_df.alias("b")
    
review_userId_df = a.join(b, col("a.user_id") == col("b.user_id"), 'inner') \
                     .select([col('a.'+xx) for xx in a.columns] + [col('b.userId')])

# map the businessId
a = review_userId_df.alias("a")
b = business_newid_df.alias("b")

review_userId_businessId_df = a.join(b, col("a.business_id") == col("b.business_id"), 'inner') \
                         .select([col('a.'+xx) for xx in a.columns] + [col('b.businessId')])

review_userId_businessId_df.show(10)

+--------------------+--------------------+-----+----------------+------------+------+----------+
|             user_id|         business_id|stars|business_id_freq|user_id_freq|userId|businessId|
+--------------------+--------------------+-----+----------------+------------+------+----------+
|KOHvhD69T7rlnK7zK...|RtUvSWO_UZ8V3Wpj0...|    2|            1184|         112|488725|       322|
|4TVcAVhnypLekIauJ...|RtUvSWO_UZ8V3Wpj0...|    5|            1184|          30|311736|       322|
|SxMBY64n31UHPjxzI...|RtUvSWO_UZ8V3Wpj0...|    5|            1184|         293|583137|       322|
|DKURijAmB5Xz8dHLo...|RtUvSWO_UZ8V3Wpj0...|    4|            1184|          52|231561|       322|
|FREeRQtjdJU83AFtd...|RtUvSWO_UZ8V3Wpj0...|    4|            1184|         565|411995|       322|
|HyXK2GNR9i4SWpLQG...|RtUvSWO_UZ8V3Wpj0...|    5|            1184|         277|842245|       322|
|KyuznsYoqjSYxAK6j...|RtUvSWO_UZ8V3Wpj0...|    4|            1184|          35|747100|       322|
|MFnXQb1R1IGLL_-W4..

In [0]:
rating_df = review_userId_businessId_df.select('userId', 'businessId', review_userId_businessId_df.stars.cast('float').alias('rating'))
rating_df.show(10)
rating_df.printSchema()

+------+----------+------+
|userId|businessId|rating|
+------+----------+------+
|488725|       322|   2.0|
|311736|       322|   5.0|
|583137|       322|   5.0|
|231561|       322|   4.0|
|411995|       322|   4.0|
|842245|       322|   5.0|
|747100|       322|   4.0|
|476157|       322|   3.0|
|963661|      2152|   4.0|
|935280|      2837|   4.0|
+------+----------+------+
only showing top 10 rows

root
 |-- userId: integer (nullable = true)
 |-- businessId: integer (nullable = true)
 |-- rating: float (nullable = true)



In [0]:
(train, test) = rating_df.randomSplit([0.8, 0.2], seed=123)


In [0]:
als = ALS(userCol="userId", itemCol="businessId", ratingCol="rating", coldStartStrategy="drop")

param_grid = ParamGridBuilder().addGrid(
    als.rank,
    [10, 15, 20],
).addGrid(
    als.maxIter,
    [10, 15, 20],
).build()

evaluator = RegressionEvaluator(
    metricName="rmse",
    labelCol="rating",
)

cv = CrossValidator(estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=5, seed=123)
cv_als_model = cv.fit(train)

# Evaluate the model by computing the RMSE on the test data

als_predictions = cv_als_model.bestModel.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse = evaluator.evaluate(als_predictions)
print("Root-mean-square error = " + str(round(rmse,2)))

best_model = cv_als_model.bestModel

(best_model
    ._java_obj     
    .parent()      
    .getMaxIter())

Root-mean-square error = 0.89


In [0]:
with open('gdrive/My Drive/Yelp Data/collab_model.pkl', 'wb') as f:
  pickle.dump(best_model, f)

In [0]:
userRecoms = best_model.recommendForAllUsers(5)

In [0]:
userRecoms.show(5)

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|   833|[[3428, 3.9349976...|
|606094|[[333, 4.9199047]...|
|934964|[[4402, 3.9349976...|
|141737|[[3662, 4.9299006...|
|809289|[[1222, 3.9548275...|
+------+--------------------+
only showing top 5 rows



In [0]:
userRecoms.filter('userId == 606094').show(5)

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|606094|[[333, 4.9199047]...|
+------+--------------------+

