---
<h1><center> BrainStation Capstone - Recommender System</center></h1>

--- 

In [1]:
 # For Google Drive, mounting your local hard drive
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


# Installing Pyspark & Other Libraries in Google Colab

In [2]:
# Download and install Pyspark
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://apache.mirror.iweb.ca/spark/spark-2.4.0/spark-2.4.0-bin-hadoop2.7.tgz
!tar xf spark-2.4.0-bin-hadoop2.7.tgz
!pip install -q findspark

# Java
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.0-bin-hadoop2.7"

# Initiating Spark Session
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

# Initiating spark context
from pyspark import SparkConf
from pyspark import SparkContext
sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))

# Evaluation metrics for recommender system 
!pip install ml_metrics
!pip install recmetrics

Collecting ml_metrics
  Downloading https://files.pythonhosted.org/packages/c1/e7/c31a2dd37045a0c904bee31c2dbed903d4f125a6ce980b91bae0c961abb8/ml_metrics-0.1.4.tar.gz
Building wheels for collected packages: ml-metrics
  Building wheel for ml-metrics (setup.py) ... [?25ldone
[?25h  Stored in directory: /root/.cache/pip/wheels/b3/61/2d/776be7b8a4f14c5db48c8e5451451cabc58dc6aa7ee3801163
Successfully built ml-metrics
Installing collected packages: ml-metrics
Successfully installed ml-metrics-0.1.4
Collecting recmetrics
  Downloading https://files.pythonhosted.org/packages/c7/6b/f56a11c953b8d7c5431fc990a5864615f5bd4315f303f91ce4f44a2da787/recmetrics-0.0.12.tar.gz
Collecting surprise (from recmetrics)
  Downloading https://files.pythonhosted.org/packages/61/de/e5cba8682201fcf9c3719a6fdda95693468ed061945493dea2dd37c5618b/surprise-0.1-py2.py3-none-any.whl
Collecting scikit-surprise (from surprise->recmetrics)
[?25l  Downloading https://files.pythonhosted.org/packages/4d/fc/cd4210b247d1dca42

# Packages Used

In [0]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

import folium
import html

# NLP
from operator import add
from pyspark.ml.feature import RegexTokenizer, CountVectorizer
from pyspark.ml.feature import StopWordsRemover, VectorAssembler
from pyspark.ml.feature import Word2Vec, Word2VecModel
from pyspark.ml.feature import IDF
from pyspark.ml import Pipeline, PipelineModel

# SQL
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import Row

# Collaborative
from pyspark.ml.recommendation import ALS, ALSModel
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder, CrossValidator, CrossValidatorModel
from pyspark.ml.evaluation import RegressionEvaluator

# Evaluation metrics
import ml_metrics as metrics
import recmetrics as met

from sklearn.preprocessing import MinMaxScaler


#  Data Preparation

## Loading Data

In [0]:
data_path = '/content/drive/My Drive/BrainStation/Data Science FT/7 - Capstone/Data/'
feature_path = '/content/drive/My Drive/BrainStation/Data Science FT/7 - Capstone/Feature/'
model_path = '/content/drive/My Drive/BrainStation/Data Science FT/7 - Capstone/Model/'

In [0]:
# Loading csvs
all_restaurant_df = spark.read.format("csv").option("header", "true").load(data_path + "/TO_restaurant.csv")
all_user_df = spark.read.format("csv").option("header", "true").load(data_path + "/TO_user.csv")
all_review_df = spark.read.format("csv").option("header", "true").load(data_path + "/TO_review.csv")
friend_df = spark.read.format("csv").option("header", "true").load(data_path + "/TO_friends.csv")

# Selecting columns that are gonna be needed
all_review_df = all_review_df.dropna()
restaurant_df = all_restaurant_df.select("business_id","name","categories","restaurants_star",
                                     "review_count","latitude","longitude")
user_df = all_user_df.select("user_id","user_name","user_average_stars","user_review_count","elite",
                           "fans","friends","yelping_since")
review_df = all_review_df.select("business_id","name","review_id","user_id","date","review_stars","text")

In [0]:
# Changing the column types of the dataframe to the appropriate ones: 
restaurant_df = restaurant_df.withColumn("latitude", restaurant_df.latitude.cast('float'))
restaurant_df = restaurant_df.withColumn("longitude", restaurant_df.longitude.cast('float'))
restaurant_df = restaurant_df.withColumn("restaurants_star", restaurant_df.restaurants_star.cast('float'))
restaurant_df = restaurant_df.withColumn("review_count", restaurant_df.review_count.cast('integer'))

user_df = user_df.withColumn("user_average_stars", user_df.user_average_stars.cast('float'))
user_df = user_df.withColumn("user_review_count", user_df.user_review_count.cast('integer'))
user_df = user_df.withColumn("fans", user_df.fans.cast('integer'))

review_df = review_df.withColumn("review_stars", review_df.review_stars.cast('integer'))

## Creating Indexed User and Restaurant ID

In [7]:
# Create index for each user ID
user_index_df = spark.createDataFrame(user_df.rdd.map(lambda x: x[0]).zipWithIndex(), 
                                      StructType([StructField("user_id", StringType(), True), 
                                                  StructField("user_index", IntegerType(), True)]))

user_index_df.show(2)

# Add the new user_index column the user dataframe

a = user_df.alias("a")
b = user_index_df.alias("b")
    
user_df_tmp = a.join(b, col("a.user_id") == col("b.user_id"), 'inner') \
             .select([col('a.'+xx) for xx in a.columns] + [col('b.user_index')])

user_df = user_df_tmp

user_df.show(2)

+--------------------+----------+
|             user_id|user_index|
+--------------------+----------+
|fd25NWbvLdNFJV_Mo...|         0|
|AyxTBqPJjYqlY55vf...|         1|
+--------------------+----------+
only showing top 2 rows

+--------------------+---------+------------------+-----------------+-----+----+--------------------+-------------------+----------+
|             user_id|user_name|user_average_stars|user_review_count|elite|fans|             friends|      yelping_since|user_index|
+--------------------+---------+------------------+-----------------+-----+----+--------------------+-------------------+----------+
|-4Anvj46CWf57KWI9...|   Cookie|               3.5|                2| null|   1|kUWW9YR-2xC9YUSav...|2016-08-17 14:02:45|     25581|
|-BUamlG3H-7yqpAl1...|    Jason|               1.5|                2| null|   0|AZNbOt_Rm7M5LxfBb...|2016-02-09 21:49:40|     44556|
+--------------------+---------+------------------+-----------------+-----+----+--------------------+-----

In [8]:
# Create index for each restaurant ID
res_index_df = spark.createDataFrame(restaurant_df.rdd.map(lambda x: x[0]).zipWithIndex(),
                                     StructType([StructField("business_id", StringType(), True),
                                                 StructField("business_index", IntegerType(), True)]))

res_index_df.show(2)

# Add the new restaurant index column the restaurant dataframe
a = restaurant_df.alias("a")
b = res_index_df.alias("b")
    
res_df_tmp = a.join(b, col("a.business_id") == col("b.business_id"), 'inner') \
             .select([col('a.'+xx) for xx in a.columns] + [col('b.business_index')])

restaurant_df = res_df_tmp

restaurant_df.show(2)

+--------------------+--------------+
|         business_id|business_index|
+--------------------+--------------+
|NDuUMJfrWk52RA-H-...|             0|
|SP_YXIEwkFPPl_9an...|             1|
+--------------------+--------------+
only showing top 2 rows

+--------------------+--------------------+--------------------+----------------+------------+---------+---------+--------------+
|         business_id|                name|          categories|restaurants_star|review_count| latitude|longitude|business_index|
+--------------------+--------------------+--------------------+----------------+------------+---------+---------+--------------+
|NDuUMJfrWk52RA-H-...|      Bolt Fresh Bar|Juice Bars & Smoo...|             3.0|          57|43.642887|-79.42543|             0|
|SP_YXIEwkFPPl_9an...|The Steady Cafe &...|Restaurants, Nigh...|             3.5|          29|43.660492| -79.4321|             1|
+--------------------+--------------------+--------------------+----------------+------------+---

In [9]:
# Combine all the restaurant, user and review dataframes into 1

# Choose the relevant columns from the review_df
review_tmp_df = review_df.select('business_id','user_id','review_stars')

# Combine the review and user dataframes based on user_id
a = review_tmp_df.alias("a")
b = user_df.alias("b")
rev_user_df = a.join(b, col("a.user_id") == col("b.user_id"), 'inner') \
                     .select([col('a.'+xx) for xx in a.columns] + [col('b.user_index')])

# Combine the review-user with restaurant dataframes based on business id
a = rev_user_df.alias("a")
b = restaurant_df.alias("b")
all_df = a.join(b, col("a.business_id") == col("b.business_id"), 'inner') \
                         .select([col('a.'+xx) for xx in a.columns] + [col('b.business_index')])

all_df.show(2, truncate = False)

+----------------------+----------------------+------------+----------+--------------+
|business_id           |user_id               |review_stars|user_index|business_index|
+----------------------+----------------------+------------+----------+--------------+
|1RFIVcZYV77tGIwVVGGClw|u642WP1g6Z3oRA9qdP39PQ|5           |33554     |5199          |
|1RFIVcZYV77tGIwVVGGClw|CGmWH1Nwx1hbasHqoLlpBQ|4           |8339      |5199          |
+----------------------+----------------------+------------+----------+--------------+
only showing top 2 rows



## Creating SQL Views

In [0]:
# Create SQL views
restaurant_df.createOrReplaceTempView("restaurants")
user_df.createOrReplaceTempView("users")
review_df.createOrReplaceTempView("reviews")
friend_df.createOrReplaceTempView("friends")
all_df.createOrReplaceTempView("all_indexed")

# Text Processing and Featurization

In [11]:
# Create review dataframe 
review_text = spark.sql("SELECT business_id, text FROM reviews")
review_text.show(3)


# Concatenating all review texts for each restaurant
review_text_rdd = review_text.rdd
review_concat_rdd = review_text_rdd.map(tuple).reduceByKey(add)  
review_concat_df = spark.createDataFrame(review_concat_rdd)
review_concat_df = review_concat_df \
                            .withColumnRenamed('_1', 'business_id') \
                            .withColumnRenamed('_2', 'text')
review_concat_df.show(3)

+--------------------+--------------------+
|         business_id|                text|
+--------------------+--------------------+
|NDuUMJfrWk52RA-H-...|Pretty solid vega...|
|NDuUMJfrWk52RA-H-...|"What a great spo...|
|NDuUMJfrWk52RA-H-...|I really do love ...|
+--------------------+--------------------+
only showing top 3 rows

+--------------------+--------------------+
|         business_id|                text|
+--------------------+--------------------+
|NPHZkn1e-tSJAbo8Z...|After spending th...|
|gyFYZV4b_9TxG1ulQ...|I have eaten midd...|
|aql8K6zVoJDGRJ3P-...|I had delivery, f...|
+--------------------+--------------------+
only showing top 3 rows



In [12]:
'''# DON'T RUN THIS IF YOU ALREADY HAVE THE PIPELINE MODEL

# Create text-processing pipeline


# Build the pipeline 
regexTokenizer = RegexTokenizer(gaps = False, pattern = '\w+', inputCol = 'text', outputCol = 'token')
stopWordsRemover = StopWordsRemover(inputCol = 'token', outputCol = 'nostopwrd')
countVectorizer = CountVectorizer(inputCol="nostopwrd", outputCol="rawFeature")
iDF = IDF(inputCol="rawFeature", outputCol="idf_vec")
word2Vec = Word2Vec(vectorSize = 100, minCount = 5, inputCol = 'nostopwrd', outputCol = 'word_vec', seed=123)
vectorAssembler = VectorAssembler(inputCols=['idf_vec', 'word_vec'], outputCol='comb_vec')
pipeline = Pipeline(stages=[regexTokenizer, stopWordsRemover, countVectorizer, iDF, word2Vec, vectorAssembler])

# Fit the model
pipeline_mdl = pipeline.fit(review_concat_df)

# Save the pipeline model 
pipeline_mdl.write().overwrite().save(model_path + 'pipe_txt')
'''

'# DON\'T RUN THIS IF YOU ALREADY HAVE THE PIPELINE MODEL\n\n# Create text-processing pipeline\n\n\n# Build the pipeline \nregexTokenizer = RegexTokenizer(gaps = False, pattern = \'\\w+\', inputCol = \'text\', outputCol = \'token\')\nstopWordsRemover = StopWordsRemover(inputCol = \'token\', outputCol = \'nostopwrd\')\ncountVectorizer = CountVectorizer(inputCol="nostopwrd", outputCol="rawFeature")\niDF = IDF(inputCol="rawFeature", outputCol="idf_vec")\nword2Vec = Word2Vec(vectorSize = 100, minCount = 5, inputCol = \'nostopwrd\', outputCol = \'word_vec\', seed=123)\nvectorAssembler = VectorAssembler(inputCols=[\'idf_vec\', \'word_vec\'], outputCol=\'comb_vec\')\npipeline = Pipeline(stages=[regexTokenizer, stopWordsRemover, countVectorizer, iDF, word2Vec, vectorAssembler])\n\n# Fit the model\npipeline_mdl = pipeline.fit(review_concat_df)\n\n# Save the pipeline model \npipeline_mdl.write().overwrite().save(model_path + \'pipe_txt\')\n'

In [0]:
# Load the pipeline trained model 

pipeline_mdl = PipelineModel.load(model_path + 'pipe_txt')

In [0]:
# Transform the review data

review_pipeline_df = pipeline_mdl.transform(review_concat_df)

In [15]:
# Show the transformed review data

review_pipeline_df.select( 'text', 'nostopwrd', 'idf_vec', 'word_vec', 'comb_vec').show(10)

+--------------------+--------------------+--------------------+--------------------+--------------------+
|                text|           nostopwrd|             idf_vec|            word_vec|            comb_vec|
+--------------------+--------------------+--------------------+--------------------+--------------------+
|After spending th...|[spending, day, w...|(179949,[0,1,2,3,...|[0.08068626681052...|(180049,[0,1,2,3,...|
|I have eaten midd...|[eaten, middle, e...|(179949,[0,1,2,3,...|[0.11516802900923...|(180049,[0,1,2,3,...|
|I had delivery, f...|[delivery, food, ...|(179949,[0,1,2,3,...|[-0.0153703065401...|(180049,[0,1,2,3,...|
|I've been here th...|[ve, three, times...|(179949,[0,1,2,5,...|[0.02640753209898...|(180049,[0,1,2,5,...|
|Not worth it. My ...|[worth, friend, f...|(179949,[0,2,3,4,...|[0.02037788828788...|(180049,[0,2,3,4,...|
|Excellent value.....|[excellent, value...|(179949,[0,1,2,3,...|[0.05887368812146...|(180049,[0,1,2,3,...|
|Decent dim sum bu...|[decent, dim, s

# Content-Based Recommender

## Functions:

In [0]:
# Let's gather all the vectors in a list
restaurant_vecs = review_pipeline_df.select('business_id', 'word_vec').rdd.map(lambda x: (x[0], x[1])).collect()

In [17]:
# Take a look of the first 5 vectors
restaurant_vecs[:5]

[('NPHZkn1e-tSJAbo8Zm9rYw',
  DenseVector([0.0807, 0.016, 0.0037, 0.0495, 0.02, 0.0073, -0.0355, 0.0177, 0.0028, -0.004, 0.0333, -0.0499, -0.0343, 0.0583, 0.0408, -0.0311, -0.0758, -0.0082, 0.0347, -0.0342, 0.0572, -0.0095, -0.0101, 0.0285, 0.026, -0.0715, 0.0312, -0.0333, -0.0478, -0.0171, -0.0808, 0.0094, 0.0138, 0.0538, 0.0044, -0.0503, -0.0445, 0.0358, -0.0084, -0.0096, 0.0059, -0.0404, 0.0051, 0.0589, 0.0193, -0.0603, 0.0749, 0.08, -0.0502, 0.0116, -0.002, -0.0453, -0.0794, 0.0019, 0.0846, -0.0061, 0.0168, -0.0155, 0.0146, -0.0086, 0.0291, 0.0368, 0.0106, 0.0121, -0.0429, -0.007, -0.0531, 0.0156, 0.06, 0.0413, -0.0139, 0.0392, 0.0156, -0.0084, -0.0023, 0.0716, -0.0016, 0.0756, -0.0175, -0.0057, 0.0139, 0.0051, -0.008, 0.0865, 0.0894, -0.0171, 0.0075, -0.0145, -0.0004, -0.0004, -0.0554, 0.0093, 0.0291, -0.1809, -0.0473, -0.0521, -0.0451, 0.035, -0.0037, -0.0129])),
 ('gyFYZV4b_9TxG1ulQNi0Ig',
  DenseVector([0.1152, 0.02, -0.0388, 0.0677, 0.0465, -0.0116, -0.0176, 0.032, -0.0236, 0.

### Cosine Similarity

In [0]:
# Function to calculate the cosine similarity of two vectors

def CosineSim(vec1, vec2): 
  return np.dot(vec1, vec2) / np.sqrt(np.dot(vec1, vec1)) / np.sqrt(np.dot(vec2, vec2)) 

### Getting Similar Restaurants

In [0]:
# A function to find similar restaurants to the given input 

# Note that restaurant_vecs[i][0] is the business_id and restaurant_vecs[i][1] is the vector

def GetSimilarRestaurants(b_ids, sim_res_limit=10):
  
  # Define a schema for the similar restaurants dataframe
  schema = StructType([StructField("input_business_id", StringType(), True),
                       StructField("business_id", StringType(), True),
                       StructField("score", IntegerType(), True)])
  
  # Create an empty dataframe to initialize 
  sim_restaurants_df = spark.createDataFrame([], schema)
  
  # Get all the business id and their vectors from the input then score their similarities
  for b_id in b_ids:
    
    # Get the input restaurant vector by finding the same business id in the restaurant vectors variable
    input_vec = [(r[1]) for r in restaurant_vecs if r[0] == b_id][0]
    
    # Get the cosine similarity values and their business id and parallelize them in rdd
    sim_res_rdd = sc.parallelize((i[0], float(CosineSim(input_vec, i[1]))) for i in restaurant_vecs)

    # Create dataframe of the similar business id with their scores
    # Rename the columns accordingly and order them by their similarity scores
    sim_res_df = spark.createDataFrame(sim_res_rdd) \
                .withColumnRenamed('_1', 'business_id') \
                .withColumnRenamed('_2', 'score') \
                .orderBy("score", ascending = False)
    
    # Get the input business id, 'lit' is the literal value
    sim_res_df = sim_res_df.withColumn('input_business_id', lit(b_id))
    
    # Filter: don't include the input restaurant and limit to certain number of restaurants (default is 10)
    sim_res_df = sim_res_df.filter(col("business_id") != b_id).limit(sim_res_limit)
    
    # Now fill these values in to the big dataframe
    sim_restaurants_df = sim_res_df.union(sim_restaurants_df)
     

  return sim_restaurants_df.select("input_business_id","business_id","score")


In [20]:
# Test GetSimilarRestaurants function
test = GetSimilarRestaurants(['gyFYZV4b_9TxG1ulQNi0Ig'])
test.show(5)

+--------------------+--------------------+------------------+
|   input_business_id|         business_id|             score|
+--------------------+--------------------+------------------+
|gyFYZV4b_9TxG1ulQ...|hdqhXOzPrwskT55T9...|0.9836209839469366|
|gyFYZV4b_9TxG1ulQ...|8QwAWJJ1FtICnE266...|0.9749664416213984|
|gyFYZV4b_9TxG1ulQ...|8Ul8_OsM2Xxg8sC5x...|0.9728009435398479|
|gyFYZV4b_9TxG1ulQ...|aPAljlYaHdDDKZngS...|0.9725079985076595|
|gyFYZV4b_9TxG1ulQ...|reBk7BlqFC3eoE695...|0.9694352585453385|
+--------------------+--------------------+------------------+
only showing top 5 rows



### Getting Restaurant Details

In [0]:
# A function to get the restaurant details
def GetRestaurantDetails(input_res):
  
  a = input_res.alias("a")
  b = restaurant_df.alias("b")
  
  return a.join(b, col("a.business_id") == col("b.business_id"), 'inner') \
        .select([col('a.'+xx) for xx in a.columns] + [col('b.name'),col('b.categories'),
                                                      col('b.restaurants_star'),col('b.review_count'),
                                                      col('b.latitude'),col('b.longitude')])

In [22]:
# Test getting restaurant details with GetSimilarRestaurants
GetRestaurantDetails(test).show(5)

+--------------------+--------------------+------------------+--------------------+--------------------+----------------+------------+---------+---------+
|   input_business_id|         business_id|             score|                name|          categories|restaurants_star|review_count| latitude|longitude|
+--------------------+--------------------+------------------+--------------------+--------------------+----------------+------------+---------+---------+
|gyFYZV4b_9TxG1ulQ...|2V8r_VDFqbGNZclTW...|0.9528853824302909|           Mr. Greek|  Greek, Restaurants|             2.0|           7|43.769344|-79.46845|
|gyFYZV4b_9TxG1ulQ...|aPAljlYaHdDDKZngS...|0.9725079985076595|Paramount Fine Foods|Mediterranean, Mi...|             3.0|          49|43.736427|-79.34427|
|gyFYZV4b_9TxG1ulQ...|8QwAWJJ1FtICnE266...|0.9749664416213984|            Tzatziki|  Greek, Restaurants|             2.0|          55|43.683964|-79.34668|
|gyFYZV4b_9TxG1ulQ...|HdA60YKlC1HQ7MfkM...|0.9519252613076906|        

### Testing Functions

In [23]:
# Create restaurant summary dataframe

a = review_pipeline_df.alias("a")
b = restaurant_df.alias("b")
    
summary_df = a.join(b, a.business_id == b.business_id)\
             .select([col('a.'+xx) for xx in a.columns] + [col('b.name'),col('b.categories'),
                                                           col('b.restaurants_star'),col('b.review_count'),
                                                           col('b.latitude'),col('b.longitude')])
summary_df.show(10)    

+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------+------------+---------+----------+
|         business_id|                text|               token|           nostopwrd|          rawFeature|             idf_vec|            word_vec|            comb_vec|                name|          categories|restaurants_star|review_count| latitude| longitude|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------+------------+---------+----------+
|1RFIVcZYV77tGIwVV...|So I've never bee...|[so, i, ve, never...|[ve, never, shop,...|(179949,[1,3,5,6,...|(179949,[1,3,5,6,...|[0.02784062431993...|(180049,[1,3,5,6,...|      More Than Pies|Food, Restaurants...|

In [24]:
# Test with the top restaurants in Toronto

# Get a list of the business id of the top restaurants
b_id = summary_df.select('business_id') \
        .filter((summary_df.restaurants_star == 5) & (summary_df.review_count > 50)) \
        .rdd.flatMap(lambda x: x).collect()

# Get the details of the input restaurants:
print('\nInput restaurants details:')
restaurant_df.select('name', 'categories') \
    .filter(restaurant_df.business_id.isin(b_id) == True).show(truncate=False)


    
# Get the top 5 similar business
sims = GetRestaurantDetails(GetSimilarRestaurants(b_id, 2))
print('Top 5 similar restaurants for each input restaurant are:"')
sims.select('input_business_id','name','restaurants_star','score','categories').show()


Input restaurants details:
+--------------------------------+-----------------------------------------------+
|name                            |categories                                     |
+--------------------------------+-----------------------------------------------+
|New Orleans Seafood & Steakhouse|Steakhouses, Cajun/Creole, Restaurants, Seafood|
|Baretto Caffe                   |Restaurants, Italian, Cafes                    |
|Veghed                          |Restaurants, Vegan, Vegetarian                 |
|Zeal Burgers                    |Restaurants, Burgers                           |
+--------------------------------+-----------------------------------------------+

Top 5 similar restaurants for each input restaurant are:"
+--------------------+--------------------+----------------+------------------+--------------------+
|   input_business_id|                name|restaurants_star|             score|          categories|
+--------------------+--------------------+----

## Previous User Reviews

In [0]:
def ContentRecommender(u_id, sim_res_limit=10, return_map=True):  
  # Select restaurants previously reviewed (4+) by the user
  query = """
  SELECT DISTINCT business_id FROM reviews  
  where review_stars >= 4.0 
  and user_id = "{}"
  """.format(u_id)

  usr_rev_bus = spark.sql(query).sample(False, 0.5).limit(sim_res_limit)
  
  usr_rev_bus_det = GetRestaurantDetails(usr_rev_bus)
    
  # Show details of restaurants previously reviewed by user
  print('\nRestaurants previously rated 4+ by user:')
  usr_rev_bus_det.select(['name', 'categories']).limit(sim_res_limit).show(truncate = False)
  
  res_list = [i.business_id for i in usr_rev_bus.collect()]
  
  # Get restaurants similar to the sample
  sim_res_df = GetSimilarRestaurants(res_list, sim_res_limit)
  
  # Filter out restaurants that have been reviewed before by the use                   r
  s = sim_res_df.alias("s")
  r = usr_rev_bus.alias("r")
  rec = s.join(r, col("s.business_id") == col("r.business_id"), 'left_outer') \
        .where(col("r.business_id").isNull()) \
        .select([col('s.business_id'),col('s.score')])
                
  # Remove duplicates
  rec = rec.groupby('business_id').agg(max('score').alias('score'))
   
  # Sort it by the score
  rec_list = rec.orderBy("score", ascending = False).limit(sim_res_limit)
  
  df = GetRestaurantDetails(rec_list)
  df.limit(sim_res_limit)
  
  df = df.withColumn('map_marker_colour', lit('red')) \
         .withColumn('rec_type', lit('Content-Based'))
    
  if (return_map == True):
    mp = folium.Map(location=[43.70011, -79.4163], zoom_start=12)

    for i, r in df.toPandas().iterrows():
      folium.Marker(location =[r.latitude, r.longitude], 
                    popup = html.escape(r["name"]) + '<br>' + 'Stars: ' 
                    + str(r.restaurants_star) + '<br>' + 'Reviews: ' + str(r.review_count), 
                    icon = folium.Icon(color=r.map_marker_colour)).add_to(mp)
    return mp
        
  else:
    print("Restaurants recommended to user based on their previously reviewed businesses:")  
    df.select(["name","score","categories","restaurants_star","review_count"]).show(truncate = False)
        
    return df

In [26]:
# Test content recommender for a user

u_id = 'GvIJqIr7GcbGUUxxrgXg2A'
ContentRecommender(u_id, return_map = False)


Restaurants previously rated 4+ by user:
+------------------------+------------------------------------------------------------------------------------------+
|name                    |categories                                                                                |
+------------------------+------------------------------------------------------------------------------------------+
|College Falafel         |Middle Eastern, Restaurants, Mediterranean, Sandwiches, Vegetarian                        |
|Byblos                  |Restaurants, Middle Eastern, Mediterranean                                                |
|KINKA IZAKAYA BLOOR     |Tapas Bars, Pubs, Japanese, Tapas/Small Plates, Nightlife, Local Flavor, Restaurants, Bars|
|Curry Twist Restaurant  |Restaurants, Indian                                                                       |
|Delux                   |Restaurants, Women's Clothing, Cuban, Fashion, French, Shopping                           |
|Boom Breakfas

DataFrame[business_id: string, score: string, name: string, categories: string, restaurants_star: float, review_count: int, latitude: float, longitude: float, map_marker_colour: string, rec_type: string]

## Keyword-Based

In [0]:
def KeywordRecommender(key_words, sim_res_limit=10, return_map=True):
    
  print('\nBusinesses similar to key words: "' + key_words + '"')
    
  input_words_df = sc.parallelize([(0, key_words)]).toDF(['business_id', 'text'])
    
  # Transform the keywords to vectors
  input_words_df = pipeline_mdl.transform(input_words_df)
    
  # Choose word2vec vectors
  input_key_words_vec = input_words_df.select('word_vec').collect()[0][0]
    
  # Get similarity
  sim_res_byword_rdd = sc.parallelize((i[0], float(CosineSim(input_key_words_vec, i[1]))) for i in restaurant_vecs)

  sim_res_byword_df = spark.createDataFrame(sim_res_byword_rdd) \
         .withColumnRenamed('_1', 'business_id') \
         .withColumnRenamed('_2', 'score') \
         .orderBy("score", ascending = False)
  
  # Return top 10 similar businesses
  res_det = GetRestaurantDetails(sim_res_byword_df)
  res_det.createOrReplaceTempView("tmp")
    
  # Filter out recommended restaurants   
  query = '''SELECT * FROM tmp
  WHERE restaurants_star >= 4.0 AND review_count >=10 AND score >= 0.6
  '''
  
  filtered = spark.sql(query)
  df = filtered.orderBy("score", ascending = False).limit(sim_res_limit)
  
  df = df.withColumn('map_marker_colour', lit('orange')) \
         .withColumn('rec_type', lit('Keyword'))
    
  if (return_map == True):
    mp = folium.Map(location=[43.70011, -79.4163], zoom_start=12)

    for i, r in df.toPandas().iterrows():
      folium.Marker(location =[r.latitude, r.longitude], 
                    popup = html.escape(r["name"]) + '<br>' + 'Stars: ' 
                    + str(r.restaurants_star) + '<br>' + 'Reviews: ' + str(r.review_count), 
                    icon = folium.Icon(color=r.map_marker_colour)).add_to(mp)
    return mp
        
  else:
            
    return df

In [28]:
# Test key word similarity to review text

key_words = 'sushi sashimi'

KeywordRecommender(key_words, return_map = False).show()


Businesses similar to key words: "sushi sashimi"
+--------------------+------------------+-------------------+--------------------+----------------+------------+---------+----------+-----------------+--------+
|         business_id|             score|               name|          categories|restaurants_star|review_count| latitude| longitude|map_marker_colour|rec_type|
+--------------------+------------------+-------------------+--------------------+----------------+------------+---------+----------+-----------------+--------+
|plLSP5HwCfCWOh_7A...|0.6775575866603375|        Kappo Sushi|Sushi Bars, Resta...|             4.0|          10|43.656116| -79.39239|           orange| Keyword|
|bTPwIYhcPG_kQGC0Y...|0.6663122134544127|  Sushi Bar Sushiya|Japanese, Sushi B...|             4.5|          17|43.663555|-79.370674|           orange| Keyword|
|wtftZecNakP5WKwi-...|0.6405305102452118|         Kiro Sushi|Sushi Bars, Japan...|             4.0|          47|43.672268| -79.38744|           o

# Collaborative Recommender

  ## Rating Dataframe


In [29]:
# Create the rating dataframe required by the ALS model

rating_df = all_df.select('user_index', 'business_index', all_df.review_stars.cast('float').alias('rating'))
rating_df.show(5)
print('Rating matrix no. of rows:', rating_df.count())
rating_df.printSchema()
rating_df.count()

+----------+--------------+------+
|user_index|business_index|rating|
+----------+--------------+------+
|     33554|          5199|   5.0|
|      8339|          5199|   4.0|
|     69609|          5199|   5.0|
|      6204|          5199|   4.0|
|      4378|          1814|   4.0|
+----------+--------------+------+
only showing top 5 rows

Rating matrix no. of rows: 376593
root
 |-- user_index: integer (nullable = true)
 |-- business_index: integer (nullable = true)
 |-- rating: float (nullable = true)



376593

## ALS

In [0]:
# Split the data into train and test dataset
train, test = rating_df.randomSplit([0.8, 0.2], seed=123)

In [31]:
'''
# Hyperparameter tuning using cross-validation
als = ALS(userCol="user_index", itemCol="business_index", ratingCol="rating", coldStartStrategy="drop")

# Define the parameter to do grid search on 
param_grid = ParamGridBuilder().addGrid(als.rank,[10, 15, 20]).addGrid(als.maxIter, [10, 15, 20]).build()
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating")
cross_val_als = CrossValidator(estimator=als, estimatorParamMaps=param_grid, 
                    evaluator=evaluator, numFolds=5, seed=123)

# Fit the model on train data
cross_val_als_model = cross_val_als.fit(train)

# Save the model
cross_val_als_model.write().overwrite().save(model_path + 'als_model')
'''

'\n# Hyperparameter tuning using cross-validation\nals = ALS(userCol="user_index", itemCol="business_index", ratingCol="rating", coldStartStrategy="drop")\n\n# Define the parameter to do grid search on \nparam_grid = ParamGridBuilder().addGrid(als.rank,[10, 15, 20]).addGrid(als.maxIter, [10, 15, 20]).build()\nevaluator = RegressionEvaluator(metricName="rmse", labelCol="rating")\ncross_val_als = CrossValidator(estimator=als, estimatorParamMaps=param_grid, \n                    evaluator=evaluator, numFolds=5, seed=123)\n\n# Fit the model on train data\ncross_val_als_model = cross_val_als.fit(train)\n\n# Save the model\ncross_val_als_model.write().overwrite().save(model_path + \'als_model\')\n'

In [32]:
# Load previously cross-validated ALS model
cross_val_als_model = CrossValidatorModel.load(model_path + 'als_model')

# Evaluate the model by computing the RMSE on the test data
als_predictions = cross_val_als_model.bestModel.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")

# Find the best model 
best_model = cross_val_als_model.bestModel

# Print the parameters for the best model: 
print(f' Rank: {best_model.rank}')

# Get the rmse
rmse = evaluator.evaluate(als_predictions)
print("Root-mean-square error = " + str(rmse))

 Rank: 20
Root-mean-square error = 1.335875196837811


In [33]:
'''
# Try regulating more
alsb = ALS(rank=20, maxIter=20, regParam=0.3, userCol="user_index", itemCol="business_index", 
           ratingCol="rating", coldStartStrategy="drop", seed=123)

alsb_model = alsb.fit(train)
alsb_predictions = alsb_model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse = evaluator.evaluate(alsb_predictions)
print("RMSE = " + str(rmse))

# Save the ALS model
alsb_model.write().overwrite().save(model_path + 'als_best')
'''

'\n# Try regulating more\nalsb = ALS(rank=20, maxIter=20, regParam=0.3, userCol="user_index", itemCol="business_index", \n           ratingCol="rating", coldStartStrategy="drop", seed=123)\n\nalsb_model = alsb.fit(train)\nalsb_predictions = alsb_model.transform(test)\nevaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")\nrmse = evaluator.evaluate(alsb_predictions)\nprint("RMSE = " + str(rmse))\n\n# Save the ALS model\nalsb_model.write().overwrite().save(model_path + \'als_best\')\n'

In [34]:
# Load the best ALS model
als_model = ALSModel.load(model_path + 'als_best')

# Generate top 10 restaurant recommendations for all users
user_recs = als_model.recommendForAllUsers(10)

# Add the column user_id, cache the recommendaton dataframe and show recommedations sample
a = user_recs.alias("a")
b = user_df.alias("b")
    
all_user_recs = a.join(b, col("a.user_index") == col("b.user_index"), 'inner') \
             .select([col('a.'+xx) for xx in a.columns] + [col('b.user_id')])

all_user_recs.cache()   
all_user_recs.show(1, truncate=False)

+----------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------+
|user_index|recommendations                                                                                                                                                                             |user_id               |
+----------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------+
|148       |[[5738, 3.6491408], [2898, 3.6238441], [7003, 3.572372], [6423, 3.5670912], [2289, 3.5079298], [6137, 3.4774523], [3065, 3.4553866], [3877, 3.4468915], [325, 3.4312227], [6603, 3.4248624]]|VrMaL32wWNed_DjOcsO3Ng|
+----------+----------------------------------------------------------------------------------------

In [35]:
# Show recommendations a sample user id

u_id = 'q9iOvVsQU7V2_cIHYnKAcw'

user_flat_rec =  spark.createDataFrame(all_user_recs.filter(col('user_id') == u_id).rdd.flatMap(lambda p: p[1]))
user_flat_rec.show()

+--------------+-----------------+
|business_index|           rating|
+--------------+-----------------+
|          5738|4.876997470855713|
|          2898|4.793347358703613|
|          7003| 4.71449613571167|
|          1827|4.670722007751465|
|          6423|4.654711723327637|
|           325| 4.63226842880249|
|          2289|4.620192527770996|
|          7233|4.594776153564453|
|          3877|4.569246768951416|
|           395|4.552909851074219|
+--------------+-----------------+



In [36]:
# Show the recommended restaurant details

a = restaurant_df.alias("a")
b = user_flat_rec.alias("b")

user_collab_df = a.join(b, col("a.business_index") == col("b.business_index"), 'inner') \
                         .select([col('a.'+xx) for xx in a.columns] + [col('b.rating')])
    
user_collab_df.show()

+--------------------+--------------------+--------------------+----------------+------------+---------+----------+--------------+-----------------+
|         business_id|                name|          categories|restaurants_star|review_count| latitude| longitude|business_index|           rating|
+--------------------+--------------------+--------------------+----------------+------------+---------+----------+--------------+-----------------+
|LcIgUlWaJJwtOfPoP...|       Souppe Shoppe|Food, Soup, Food ...|             5.0|           4|43.651424| -79.40412|          7233|4.594776153564453|
|fKmxQe2HPANmWkFiM...|             Freshii|Fast Food, Salad,...|             3.5|           3|43.688972| -79.41286|           325| 4.63226842880249|
|otsjAjxf0PNQ99xcm...|Sushi Making For ...|Japanese, Educati...|             4.5|           3|43.656235| -79.39232|          2289|4.620192527770996|
|chwG4PHe_wwyLIGIa...|Nantana Thai Food...|Restaurants, Dess...|             4.5|           3|43.651764| -

In [0]:
all_flat_rec = spark.createDataFrame(all_user_recs.rdd.flatMap(lambda p: p[1]))

# Scaling the predicted to be from 0 to 1
all_panda_rec = all_flat_rec.toPandas()
scaler = MinMaxScaler()
scaler.fit(all_panda_rec["rating"].values.reshape(-1,1))
all_panda_rec["scaled_rating"] = scaler.transform(all_panda_rec["rating"].values.reshape(-1,1))
all_scaled_rec = spark.createDataFrame(all_panda_rec).rdd.collect()


## Collaborative-Filtering

In [0]:
# Make a function for collaborative recommendations

def CollaborativeRecommender(u_id, return_map=True):
      
  user_flat_rec = spark.createDataFrame(all_user_recs.filter(col('user_id') == u_id).rdd.flatMap(lambda p: p[1]))
                                         
  a = user_flat_rec.alias("a")
  b = restaurant_df.alias("b")
  
  df = a.join(b, col("a.business_index") == col("b.business_index"), 'inner') \
        .select([col('b.business_index'), col('a.rating'), col('b.name'),col('b.categories'),
                                                           col('b.restaurants_star'),col('b.review_count'),
                                                           col('b.latitude'),col('b.longitude')]) \
        .orderBy("rating", ascending = False)
  
  df = df.withColumnRenamed('rating', 'score') \
           .withColumn('map_marker_colour', lit('blue')) \
           .withColumn('rec_type', lit('Collaborative'))
  

  if (return_map == True):
    mp = folium.Map(location=[43.70011, -79.4163], zoom_start=12)

    for i, r in df.toPandas().iterrows():
      folium.Marker(location =[r.latitude, r.longitude], 
                    popup = html.escape(r["name"]) + '<br>' + 'Stars: ' 
                    + str(r.restaurants_star) + '<br>' + 'Reviews: ' + str(r.review_count), 
                    icon = folium.Icon(color=r.map_marker_colour)).add_to(mp)
    return mp
        
  else:
    
    print("\nRestaurants recommended to user based on collaborative filtering:")
    df.select(["name","score","categories","restaurants_star","review_count"]).show(truncate = False)
    
    return df

In [39]:
u_id = 'GvIJqIr7GcbGUUxxrgXg2A'
CollaborativeRecommender(u_id, return_map = False)


Restaurants recommended to user based on collaborative filtering:
+----------------------------+------------------+--------------------------------------------------------------------------------------------------+----------------+------------+
|name                        |score             |categories                                                                                        |restaurants_star|review_count|
+----------------------------+------------------+--------------------------------------------------------------------------------------------------+----------------+------------+
|Papa John's                 |4.54070520401001  |Restaurants, Pizza                                                                                |3.5             |3           |
|Away Kitchen and Cafe       |4.503584384918213 |Vegan, Ice Cream & Frozen Yogurt, Food, Restaurants                                               |5.0             |3           |
|Sugar Miracles              |4.501307

DataFrame[business_index: int, score: double, name: string, categories: string, restaurants_star: float, review_count: int, latitude: float, longitude: float, map_marker_colour: string, rec_type: string]

# Friends Recommender

In [0]:
def FriendsRecommender(u_id, sim_res_limit = 10, return_map=True):

  """ 
  Get top-N friends recommendations for a specific user. 
            
  """  
  query = """
  SELECT business_id, COUNT(*) AS 4_5_count 
  FROM reviews
  WHERE user_id IN (SELECT f.friends FROM friends f
                    INNER JOIN users u on f.friends = u.user_id
                    WHERE f.user_id = "{}") 
  AND review_stars >= 4 
  AND business_id NOT IN (SELECT business_id from reviews
                          WHERE user_id = "{}")
  GROUP BY business_id
  ORDER BY COUNT(*) DESC LIMIT 100
  """.format(u_id, u_id)

  friend_rec_df = spark.sql(query)

  a = friend_rec_df.sample(False, 0.5).limit(sim_res_limit)

  df = GetRestaurantDetails(a).orderBy("4_5_count", ascending = False)
        
  df = df.withColumnRenamed('4_5_count', 'score') \
         .withColumn('map_marker_colour', lit('green')) \
         .withColumn('rec_type', lit('Friend'))    
  

  if (return_map == True):
    mp = folium.Map(location=[43.70011, -79.4163], zoom_start=12)

    for i, r in df.toPandas().iterrows():
      folium.Marker(location =[r.latitude, r.longitude], 
                    popup = html.escape(r["name"]) + '<br>' + 'Stars: ' 
                    + str(r.restaurants_star) + '<br>' + 'Reviews: ' + str(r.review_count), 
                    icon = folium.Icon(color=r.map_marker_colour)).add_to(mp)
    return mp
        
  else:
    
    print("Restaurants recommended to user from their social network:")    
    df.select(["name","score","categories", "restaurants_star","review_count"]).show(truncate = False)

    return df

In [41]:
u_id = 'GvIJqIr7GcbGUUxxrgXg2A'

FriendsRecommender(u_id, return_map = False).show()

Restaurants recommended to user from their social network:
+----------------------------------------+-----+--------------------------------------------------------------------------------+----------------+------------+
|name                                    |score|categories                                                                      |restaurants_star|review_count|
+----------------------------------------+-----+--------------------------------------------------------------------------------+----------------+------------+
|KINTON RAMEN                            |4    |Restaurants, Japanese, Ramen                                                    |3.5             |110         |
|Grand Electric                          |4    |Nightlife, Mexican, Restaurants, Bars                                           |3.5             |645         |
|White Brick Kitchen                     |3    |Restaurants, Comfort Food, Breakfast & Brunch                                   |4.0         

#Mapping Result

In [42]:
# Test content recommender for a user

u_id = 'q9iOvVsQU7V2_cIHYnKAcw'
ContentRecommender(u_id, 10, return_map = True)


Restaurants previously rated 4+ by user:
+-----------------+-------------------------------------------------------------------------------------------------------+
|name             |categories                                                                                             |
+-----------------+-------------------------------------------------------------------------------------------------------+
|The Hogtown Vegan|Restaurants, Southern, Beer, Wine & Spirits, Food, Vegan                                               |
|Bar Neon         |Beer, Wine & Spirits, Restaurants, Food, Mediterranean, Nightlife, Bars, Tapas/Small Plates, Tapas Bars|
|The Beaconsfield |Pubs, Nightlife, American (New), Canadian (New), Bars, Restaurants                                     |
|The Rushton      |American (New), Restaurants, French                                                                    |
|Holy Oak         |Restaurants, Breakfast & Brunch, Food, Coffee & Tea, Cafes             

In [43]:
# Test key word similarity to review text

key_words = 'sushi sashimi'

KeywordRecommender(key_words, return_map = True)


Businesses similar to key words: "sushi sashimi"


In [44]:
# Test the collaborative recommender
u_id = 'q9iOvVsQU7V2_cIHYnKAcw'
CollaborativeRecommender(u_id, return_map = True)

In [45]:
# Test the friends recommender
u_id = 'q9iOvVsQU7V2_cIHYnKAcw'
FriendsRecommender(u_id, return_map = True)

# Hybrid Recommender

In [0]:
def HybridRecommender(u_id, content_size=4, collab_size=4, friend_size=2, return_map=True):
    
  """ 
  Get hybrid recommendations for a specific user by selecting top-N from each sub-recommender
     
  """  
        
  content_recs = ContentRecommender(u_id, return_map=False)
  collaborative_recs = CollaborativeRecommender(u_id, return_map=False)
  friends_recs = FriendsRecommender(u_id, return_map=False)
        
  df = content_recs.limit(content_size) \
                   .union(collaborative_recs.limit(collab_size)) \
                   .union(friends_recs.limit(friend_size))
  df.select(["name","score","categories"]).show(truncate = False)
  
  df = df.withColumn("latitude", df.latitude.cast('float'))
  df = df.withColumn("longitude", df.longitude.cast('float'))
  print("\nRestaurants recommended by the hybrid recommender")
  df.select("name","categories","restaurants_star","review_count").show(truncate = False)
  
  if (return_map == True):
            
    mp = folium.Map(location=[43.70011, -79.4163], zoom_start=12)

    for i, r in df.toPandas().iterrows():
        folium.Marker(location =[r.latitude, r.longitude], 
                  popup = html.escape(r["name"]) + '<br>' + 'Stars: ' + str(r.restaurants_star) + '<br>' + 'Reviews: ' + str(r.review_count), 
                      icon = folium.Icon(color=r.map_marker_colour)).add_to(mp)
    return mp
        
  else:
            
    return df     

In [47]:
# Test hybrid recommender for a user

u_id = 'GvIJqIr7GcbGUUxxrgXg2A'
HybridRecommender(u_id)


Restaurants previously rated 4+ by user:
+----------------------+----------------------------------------------------------------------------+
|name                  |categories                                                                  |
+----------------------+----------------------------------------------------------------------------+
|Bolt Fresh Bar        |Juice Bars & Smoothies, Food, Restaurants, Fast Food, Vegan                 |
|The Hogtown Vegan     |Restaurants, Southern, Beer, Wine & Spirits, Food, Vegan                    |
|Rock Lobster Food Co  |Restaurants, Seafood                                                        |
|Byblos                |Restaurants, Middle Eastern, Mediterranean                                  |
|Pizzeria Libretto     |Restaurants, Italian, Pizza                                                 |
|Bellwoods Brewery     |Canadian (New), Breweries, Food, Restaurants                                |
|Dog & Bear Pub        |Sports Bars, Bri