In [None]:
!pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/f0/26/198fc8c0b98580f617cb03cb298c6056587b8f0447e20fa40c5b634ced77/pyspark-3.0.1.tar.gz (204.2MB)
[K     |████████████████████████████████| 204.2MB 65kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 43.4MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.1-py2.py3-none-any.whl size=204612243 sha256=92de9c79308ce25170ee3a2fb77839cde5e5a252dd896cad31c6437f4a2bb970
  Stored in directory: /root/.cache/pip/wheels/5e/bd/07/031766ca628adec8435bb40f0bd83bb676ce65ff4007f8e73f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.0.1


In [None]:
import os
import time

# spark imports
from pyspark.sql import SparkSession
from pyspark.sql.functions import UserDefinedFunction, explode, desc
from pyspark.sql.types import StringType, ArrayType
from pyspark.mllib.recommendation import ALS

# data science imports
import math
import matplotlib.pyplot as plt

from sklearn.feature_extraction.text import TfidfVectorizer, CountVectorizer
from sklearn.cluster import KMeans

%matplotlib inline

In [None]:
# spark config
spark = SparkSession \
    .builder \
    .appName("restaurant recommendation") \
    .config("spark.driver.maxResultSize", "96g") \
    .config("spark.driver.memory", "96g") \
    .config("spark.executor.memory", "8g")\
    .config("spark.master", "local[12]") \
    .getOrCreate()
# get spark context

sc = spark.sparkContext

In [None]:
rest = spark.read.load(os.path.join('/content/restaurants_1.csv'), format='csv', header=True, inferSchema=True)
rest=rest.drop("_c0")

In [None]:
# load data
rest_rating = sc.textFile('/content/modified.csv')
header = rest_rating.take(1)[0]
rating_data = rest_rating \
    .filter(lambda line: line!=header) \
    .map(lambda line: line.split(",")) \
    .map(lambda tokens: (int(tokens[1]), int(tokens[2]), float(tokens[3]))) \
    .cache()
# check three rows
rating_data.take(3)


[(0, 10, 0.0), (1, 68, 0.0), (2, 68, 0.0)]

In [None]:
train, validation, test = rating_data.randomSplit([6, 2, 2], seed=99)

def train_ALS(train_data, validation_data, num_iters, reg_param, ranks):
    """
    Grid Search Function to select the best model based on RMSE of hold-out data
    """
    # initial
    min_error = float('inf')
    best_rank = -1
    best_regularization = 0
    best_model = None
    for rank in ranks:
        for reg in reg_param:
            # train ALS model
            model = ALS.train(
                ratings=train_data,    # (userID, productID, rating) tuple
                iterations=num_iters,
                rank=rank,
                lambda_=reg,           # regularization param
                seed=99)
            # make prediction
            valid_data = validation_data.map(lambda p: (p[0], p[1]))
            predictions = model.predictAll(valid_data).map(lambda r: ((r[0], r[1]), r[2]))
            # get the rating result
            ratesAndPreds = validation_data.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
            # get the RMSE
            MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
            error = math.sqrt(MSE)
            if error < min_error:
                min_error = error
                best_rank = rank
                best_regularization = reg
                best_model = model
    print('\nThe best model has {} latent factors and regularization = {}'.format(best_rank, best_regularization))
    return best_model,best_rank,best_regularization

In [None]:
# hyper-param config
num_iterations = 10
ranks = [10, 14, 16, 18, 20]
reg_params = [0.005, 0.05, 0.1, 0.2]

# grid search and select best model
start_time = time.time()
final_model,best_rank,best_reg = train_ALS(train, validation, num_iterations, reg_params, ranks)


The best model has 10 latent factors and regularization = 0.2


In [None]:
def get_resId(df_res, fav_res_list):
    resId_list = []
    for res in fav_res_list:
        resIds = df_res \
            .filter(rest.title.like('%{}%'.format(res))) \
            .select('resId') \
            .rdd \
            .map(lambda r: r[0]) \
            .collect()
        resId_list.extend(resIds)
    return list(set(resId_list))


def add_new_user_to_data(train_data, resId_list, spark_context):

    # get new user id
    new_id = train_data.map(lambda r: r[0]).max() + 1
    # get max rating
    max_rating = train_data.map(lambda r: r[2]).max()
    # create new user rdd
    user_rows = [(new_id, resId, max_rating) for resId in resId_list]
    new_rdd = spark_context.parallelize(user_rows)
    # return new train data
    return train_data.union(new_rdd)


def get_inference_data(train_data, df_res, resId_list):
    
    # get new user id
    new_id = train_data.map(lambda r: r[0]).max() + 1
    # return inference rdd
    return df_res.rdd \
        .map(lambda r: r[0]) \
        .distinct() \
        .filter(lambda x: x not in resId_list) \
        .map(lambda x: (new_id, x))


def make_recommendation(best_model_params, ratings_data, df_res, 
                        fav_movie_list, n_recommendations, spark_context):
    
    # modify train data by adding new user's rows
    resId_list = get_resId(df_res, fav_movie_list)
    train_data = add_new_user_to_data(ratings_data, resId_list, spark_context)
    
    # train best ALS
    model = ALS.train(
        ratings=train_data,
        iterations=best_model_params.get('iterations', None),
        rank=best_model_params.get('rank', None),
        lambda_=best_model_params.get('lambda_', None),
        seed=99)
    
    # get inference rdd
    inference_rdd = get_inference_data(ratings_data, df_res, resId_list)
    
    # inference
    predictions = model.predictAll(inference_rdd).map(lambda r: (r[1], r[2]))
    
    # get top n movieId
    topn_rows = predictions.sortBy(lambda r: r[1], ascending=False).take(n_recommendations)
    topn_ids = [r[0] for r in topn_rows]
    
    # return movie titles
    return df_res.filter(rest.resId.isin(topn_ids)) \
                    .select('title') \
                    .rdd \
                    .map(lambda r: r[0]) \


In [None]:
import pandas as pd
data = pd.read_csv("/content/cleaned_british_dataset.csv")
data.head(2)

Unnamed: 0.1,Unnamed: 0,Establishment_Type,Name,Rating,No_of_Reviews,Timings,Latitude,Longitude,Tags,Address,Contact_No,food,service,value,atmosphere,Rating_weighted,day_a,day_a1,time_a,time_a1,time_a2,time_a3,day_b,day_b1,time_b,time_b1,time_b2,time_b3,day_c,day_c1,time_c,time_c1,time_c2,time_c3,day_d,day_d1,time_d,time_d1,time_d2,time_d3,day_e,day_e1,time_e,time_e1,time_e2,time_e3,day_f,day_f1,time_f,time_f1,Cuisine,Price
0,0,Restaurant,Little Dim Sum,4.0,4.0,"[{'days': 'Sun - Sat', 'times': ['7:00 AM - 9:...",49.04866,-122.28979,-,"33766 Essendene Ave, Abbotsford, British Colum...",+1 604-758-0888,0.0,0,0.0,0.0,3.6639,0.0,6.0,7.0,21.0,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,not_available
1,1,Restaurant,Freshii,3.5,9.0,"[{'days': 'Mon - Fri', 'times': ['7:00 AM - 8:...",49.036045,-122.293106,-,"125-1878 McCallum Rd McCallum Junction, Abbots...",+1 604-425-1055,4.0,4.5},0.0,0.0,3.610702,1.0,5.0,7.0,20.0,,,6.0,0.0,11.0,19.0,,,,,,,,,,,,,,,,,,,,,,,,,canadian,lowcost


In [None]:
data['Cuisine'] = data['Cuisine'].fillna(" ")

In [None]:
import re
def remove_punc(text):
    text = text.replace(',', ' ')
    return text

data['Cuisine'] = data['Cuisine'].apply(lambda x : remove_punc(x))

In [None]:
tfv = TfidfVectorizer(min_df=3,  max_features=None, 
            strip_accents='unicode', analyzer='word',token_pattern=r'\w{1,}',
            ngram_range=(1, 3),
            stop_words = 'english')

tfv_matrix = tfv.fit_transform(data['Cuisine'])  
df = pd.DataFrame(tfv_matrix[0].T.todense(), index=tfv.get_feature_names(), columns=["TF-IDF"])
df = df.sort_values('TF-IDF', ascending=False)
df.head()

Unnamed: 0,TF-IDF
afghani,0.0
korean canadian,0.0
italian vegetarian friendly,0.0
jamaican,0.0
jamaican vegetarian,0.0


In [None]:
'''
wcss = []
for i in range(30,70):
    kmeans = KMeans(n_clusters=i,init='k-means++',random_state=0)
    kmeans.fit(tfv_matrix)
    wcss.append(kmeans.inertia_)
plt.plot(range(1,50),wcss)
plt.title('The Elbow Method')
plt.xlabel('Number of clusters')
plt.ylabel('WCSS')
plt.show()
'''

"\nwcss = []\nfor i in range(30,70):\n    kmeans = KMeans(n_clusters=i,init='k-means++',random_state=0)\n    kmeans.fit(tfv_matrix)\n    wcss.append(kmeans.inertia_)\nplt.plot(range(1,50),wcss)\nplt.title('The Elbow Method')\nplt.xlabel('Number of clusters')\nplt.ylabel('WCSS')\nplt.show()\n"

In [None]:
kmeans = KMeans(n_clusters=50,init='k-means++',random_state=0)
kmeans.fit(tfv_matrix)
data['clusters'] = kmeans.labels_
data.head(3)

Unnamed: 0.1,Unnamed: 0,Establishment_Type,Name,Rating,No_of_Reviews,Timings,Latitude,Longitude,Tags,Address,Contact_No,food,service,value,atmosphere,Rating_weighted,day_a,day_a1,time_a,time_a1,time_a2,time_a3,day_b,day_b1,time_b,time_b1,time_b2,time_b3,day_c,day_c1,time_c,time_c1,time_c2,time_c3,day_d,day_d1,time_d,time_d1,time_d2,time_d3,day_e,day_e1,time_e,time_e1,time_e2,time_e3,day_f,day_f1,time_f,time_f1,Cuisine,Price,clusters
0,0,Restaurant,Little Dim Sum,4.0,4.0,"[{'days': 'Sun - Sat', 'times': ['7:00 AM - 9:...",49.04866,-122.28979,-,"33766 Essendene Ave, Abbotsford, British Colum...",+1 604-758-0888,0.0,0,0.0,0.0,3.6639,0.0,6.0,7.0,21.0,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,not_available,5
1,1,Restaurant,Freshii,3.5,9.0,"[{'days': 'Mon - Fri', 'times': ['7:00 AM - 8:...",49.036045,-122.293106,-,"125-1878 McCallum Rd McCallum Junction, Abbots...",+1 604-425-1055,4.0,4.5},0.0,0.0,3.610702,1.0,5.0,7.0,20.0,,,6.0,0.0,11.0,19.0,,,,,,,,,,,,,,,,,,,,,,,,,canadian,lowcost,1
2,2,Restaurant,Pantry Family Restaurant,4.0,10.0,"[{'days': 'Sun - Thu', 'times': ['6:30 AM - 9:...",49.04936,-122.31277,-,"32900 South Fraser Way # 127, Abbotsford, Brit...",+1 604-504-1636,0.0,0,0.0,0.0,3.701244,0.0,4.0,6.3,21.3,,,5.0,6.0,6.3,22.0,,,,,,,,,,,,,,,,,,,,,,,,,canadian,highcost,1


In [None]:
def cluster_predict(str_input):
    Y = tfv.transform(str_input)
    prediction = kmeans.predict(Y)
    return prediction

In [None]:
user = pd.read_csv('/content/modified.csv')
res = pd.read_csv('/content/restaurants_1.csv')
users_id = list(user['user_id'])

In [None]:
new_user = 100
fav_cus = 'Italian'

In [None]:
if new_user not in users_id:
  print('new user')
  f = fav_cus
  prediction = cluster_predict([f])
  data1 =  data[data['clusters'] == prediction[0]]
  list2 = list(data1['Name'])

  my_favorite_restaurants = list2[:5]
  # get recommends
  r2 = recommends = make_recommendation(
      best_model_params={'iterations': 10, 'rank': best_rank, 'lambda_': best_reg}, 
      ratings_data=rating_data, 
      df_res=rest, 
      fav_movie_list=my_favorite_restaurants, 
      n_recommendations=10, 
      spark_context=sc)
  
  result = list2[:7]
  for i in r2.collect():
    result.append(i)

else:
  print('existing user')
  x = list(user[(user['user_id'] == new_user) & (user['Rating'] > 3.0)]['restaurant_id'])
  my_favorite_restaurants = [(res[res['resId'] == i]['title'].values)[0] for i in x]

  r2 = recommends = make_recommendation(
      best_model_params={'iterations': 10, 'rank': best_rank, 'lambda_': best_reg}, 
      ratings_data=rating_data, 
      df_res=rest, 
      fav_movie_list=my_favorite_restaurants, 
      n_recommendations=10, 
      spark_context=sc)
  
  result = []
  for i in r2.collect():
    result.append(i)

  cus = [(data[data['Name'] == i]['Cuisine'].values)[0] for i in result[:3]]
  prediction = cluster_predict(cus)
  data1 =  data[data['clusters'] == prediction[0]]
  list2 = list(data['Name'])
  result+=list2
  

new user


In [None]:
gh = [(data[data['Name'] == i][['Name', 'Cuisine', 'clusters']]).values[0] for i in result[:20]]
dh = pd.DataFrame(gh)
dh.columns = ['Name','Cuisine', 'Cluster']
dh

Unnamed: 0,Name,Cuisine,Cluster
0,Antonio's Restaurant,italian,15
1,Pizza Garden,'italian',15
2,Vincenti's Italian Restaurant,'italian',15
3,Lac des Roches Restaurant,italian,15
4,Lac Des Roches Resort Restaurant,italian,15
5,Bayshore Resort Restaurant,italian,15
6,Pazzo Chow,italian,15
7,Subway,,5
8,Tim Hortons,,5
9,Mandarin Garden,chinese asian,10
