In [1]:
!export PATH=/usr/lib/jvm/java-8-openjdk-amd64/jre/bin:/u3/cs451/packages/spark/bin:/u3/cs451/packages/hadoop/bin:/u3/cs451/packages/maven/bin:/u3/cs451/packages/scala/bin:$PATH
!export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64/jre

In [2]:
import findspark
findspark.init("/u/cs451/packages/spark")

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [3]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from pyspark.ml import Pipeline
from pyspark.sql.functions import *
from pyspark.sql.types import *


from pyspark.sql import Row
from pyspark.sql import SQLContext

from pyspark.ml.recommendation import ALS, ALSModel
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator

# import folium
import html

In [4]:
model_path = 'ALS/'
outout_path = 'output/'

### Data Loading

In [5]:
# create the business dataframe
business_df = spark.read.parquet("Data/yelp_business.parquet")

# print the schema of the review dataframe
business_df.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- business_name: string (nullable = true)
 |-- address: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- postal_code: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- stars: double (nullable = true)
 |-- review_count: long (nullable = true)
 |-- categories: string (nullable = true)



In [6]:
# create the business dataframe
user_df = spark.read.parquet("Data/yelp_users.parquet")

# print the schema of the review dataframe
user_df.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- user_name: string (nullable = true)
 |-- review_count: long (nullable = true)
 |-- yelping_since: string (nullable = true)
 |-- useful: long (nullable = true)
 |-- fans: long (nullable = true)
 |-- average_stars: double (nullable = true)



In [7]:
# create the review dataframe

review_df = spark.read.parquet("Data/yelp_review.parquet")

# print the schema of the review dataframe
review_df.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- business_id: string (nullable = true)
 |-- stars: double (nullable = true)
 |-- review_date: string (nullable = true)
 |-- review_text: string (nullable = true)
 |-- useful: long (nullable = true)
 |-- funny: long (nullable = true)
 |-- cool: long (nullable = true)



In [8]:
# show some business records

business_df.select('business_id', 'business_name').show(2)

+--------------------+--------------------+
|         business_id|       business_name|
+--------------------+--------------------+
|NDuUMJfrWk52RA-H-...|      Bolt Fresh Bar|
|SP_YXIEwkFPPl_9an...|The Steady Cafe &...|
+--------------------+--------------------+
only showing top 2 rows



In [9]:
business_df.count()

7965

In [10]:
# show some user records

user_df.select('user_id', 'user_name').show(2)

+--------------------+---------+
|             user_id|user_name|
+--------------------+---------+
|gvXtMj3XuPr0xHjgm...|    Peter|
|pU6GoRTcl1rIOi6zM...|   Javier|
+--------------------+---------+
only showing top 2 rows



In [11]:
# show some review records

review_df.select('user_id', 'business_id', 'stars').show(2)

+--------------------+--------------------+-----+
|             user_id|         business_id|stars|
+--------------------+--------------------+-----+
|TpyOT5E16YASd7EWj...|AakkkTuGZA2KBodKi...|  1.0|
|NJlxGtouq06hhC7sS...|YvrylyuWgbP90RgMq...|  5.0|
+--------------------+--------------------+-----+
only showing top 2 rows



#### Spark ALS implementation requires the rating matrix to have the follwoing data types:

```
ratings_df_schema = StructType(
[StructField('userId', IntegerType()),
 StructField('businessId', IntegerType()),
 StructField('rating', DoubleType())]
)
```

So, we need to map existing string user_id, and business_id to integer values

In [12]:
# create a new userId column (integer)
sqlContext = SQLContext(spark.sparkContext)
user_newid_df = sqlContext.createDataFrame(user_df.rdd.map(lambda x: x[0]).zipWithIndex(), \
        StructType([StructField("user_id", StringType(), True),StructField("userId", IntegerType(), True)]))

user_newid_df.show(2)

+--------------------+------+
|             user_id|userId|
+--------------------+------+
|gvXtMj3XuPr0xHjgm...|     0|
|pU6GoRTcl1rIOi6zM...|     1|
+--------------------+------+
only showing top 2 rows



In [13]:
# add the new userId column the user dataframe

a = user_df.alias("a")
b = user_newid_df.alias("b")
    
user_new_df = a.join(b, col("a.user_id") == col("b.user_id"), 'inner') \
             .select([col('a.'+xx) for xx in a.columns] + [col('b.userId')])

user_new_df.select('userId','user_id', 'user_name').show(2)

+------+--------------------+---------+
|userId|             user_id|user_name|
+------+--------------------+---------+
|     0|gvXtMj3XuPr0xHjgm...|    Peter|
|     1|pU6GoRTcl1rIOi6zM...|   Javier|
+------+--------------------+---------+
only showing top 2 rows



In [14]:
# create a new businessId column (integer)

business_newid_df = sqlContext.createDataFrame(business_df.rdd.map(lambda x: x[0]).zipWithIndex(), \
        StructType([StructField("business_id", StringType(), True),StructField("businessId", IntegerType(), True)]))

business_newid_df.show(2)

+--------------------+----------+
|         business_id|businessId|
+--------------------+----------+
|NDuUMJfrWk52RA-H-...|         0|
|SP_YXIEwkFPPl_9an...|         1|
+--------------------+----------+
only showing top 2 rows



In [15]:
# add the new businessId column the business dataframe

a = business_df.alias("a")
b = business_newid_df.alias("b")
    
business_new_df = a.join(b, col("a.business_id") == col("b.business_id"), 'inner') \
             .select([col('a.'+xx) for xx in a.columns] + [col('b.businessId')])

business_new_df.select('businessId','business_id', 'business_name').show(2)

+----------+--------------------+--------------------+
|businessId|         business_id|       business_name|
+----------+--------------------+--------------------+
|         0|NDuUMJfrWk52RA-H-...|      Bolt Fresh Bar|
|         1|SP_YXIEwkFPPl_9an...|The Steady Cafe &...|
+----------+--------------------+--------------------+
only showing top 2 rows



In [16]:
# map new userId and businessId in the review dataframe

review_df = review_df.select('user_id', 'business_id', 'stars')


# map the userId
a = review_df.alias("a")
b = user_newid_df.alias("b")
    
review_userId_df = a.join(b, col("a.user_id") == col("b.user_id"), 'inner').select([col('a.'+xx) for xx in a.columns] + [col('b.userId')])

In [17]:
# # map the businessId
# Not working

# a = review_userId_df.alias("a")
# b = business_newid_df.alias("b")

# review_userId_businessId_df = a.join(b, col("a.business_id") == col("b.business_id"), 'inner') \
#                          .select([col('a.'+xx) for xx in a.columns] + [col('b.businessId')])

# review_userId_businessId_df.show(2)

In [18]:
# map the businessId
a = review_userId_df.alias("a")
b = business_df.alias("b")

review_userId_businessId_df = a.join(b, col("a.business_id") == col("b.business_id"), 'inner').select([col('a.'+xx) for xx in a.columns])

review_userId_businessId_df

DataFrame[user_id: string, business_id: string, stars: double, userId: int]

In [19]:
review_userId_businessId_df.show(5)

+--------------------+--------------------+-----+------+
|             user_id|         business_id|stars|userId|
+--------------------+--------------------+-----+------+
|-4Anvj46CWf57KWI9...|478TIlfHXfT3wvww5...|  3.0| 31855|
|-BUamlG3H-7yqpAl1...|MlKNIbEM-JL9WesSd...|  1.0| 29628|
|-CGdueQKCHM_KnHxO...|Ze4VPogvcD7inc3Qu...|  1.0| 28103|
|-LR8Z9Cun0VG8Rmju...|exs56JDSWmPWQ3dQO...|  5.0| 52708|
|-LyjHYhPha2loUaiM...|2PCz_uVX7GOXtGHNX...|  4.0| 68379|
+--------------------+--------------------+-----+------+
only showing top 5 rows



#### Joining the review_userId_df with the business_newid_df was giving error of FileNotFound.
Hence, We created the broadcase var of business_newid and then performed the join operation

In [20]:
# Get the rdd from the business_newid_df
bn_rdd = business_newid_df.rdd.map(lambda row: (row[0], row[1])).collectAsMap()


In [21]:
# Create Broadcast variable from bn_rdd
bn_brodcast = spark.sparkContext.broadcast(bn_rdd)

In [22]:
# Join the Review_user_business_df with the business_df using broadcast
rub_rdd = review_userId_businessId_df.rdd.map(lambda row: (row[0],row[1],row[2],row[3],bn_brodcast.value[row[1]]))

In [23]:
# Print the rdd
print(rub_rdd.take(5))

[('-4Anvj46CWf57KWI9UQDLg', '478TIlfHXfT3wvww54QsPg', 3.0, 31855, 1107), ('-BUamlG3H-7yqpAl1p-msw', 'MlKNIbEM-JL9WesSdwf_Lg', 1.0, 29628, 2442), ('-CGdueQKCHM_KnHxOoTJXg', 'Ze4VPogvcD7inc3QuvY_yg', 1.0, 28103, 1370), ('-LR8Z9Cun0VG8RmjuzA51w', 'exs56JDSWmPWQ3dQOdjHag', 5.0, 52708, 1788), ('-LyjHYhPha2loUaiMPnsrw', '2PCz_uVX7GOXtGHNXAPXhw', 4.0, 68379, 6179)]


In [24]:
# Create Dataframe from the rdd
review_userId_businessId_df = sqlContext.createDataFrame(rub_rdd,
                                    StructType([StructField("user_id", StringType(), True),
                                                StructField("business_id", StringType(), True),
                                                StructField("stars", DoubleType(), True),
                                                StructField("userId", IntegerType(), True),
                                                StructField("businessId", IntegerType(), True)]))

In [25]:
review_userId_businessId_df.show(5)

+--------------------+--------------------+-----+------+----------+
|             user_id|         business_id|stars|userId|businessId|
+--------------------+--------------------+-----+------+----------+
|-4Anvj46CWf57KWI9...|478TIlfHXfT3wvww5...|  3.0| 31855|      1107|
|-BUamlG3H-7yqpAl1...|MlKNIbEM-JL9WesSd...|  1.0| 29628|      2442|
|-CGdueQKCHM_KnHxO...|Ze4VPogvcD7inc3Qu...|  1.0| 28103|      1370|
|-LR8Z9Cun0VG8Rmju...|exs56JDSWmPWQ3dQO...|  5.0| 52708|      1788|
|-LyjHYhPha2loUaiM...|2PCz_uVX7GOXtGHNX...|  4.0| 68379|      6179|
+--------------------+--------------------+-----+------+----------+
only showing top 5 rows



### Collaborative Filtering

In [26]:
# create the rating dataframe required by the ALS model

rating_df = review_userId_businessId_df.select('userId', 'businessId', review_userId_businessId_df.stars.cast('float').alias('rating'))
rating_df.show(2)
print(' Rating matrx no. of rows :', rating_df.count())
rating_df.printSchema()

+------+----------+------+
|userId|businessId|rating|
+------+----------+------+
| 31855|      1107|   3.0|
| 29628|      2442|   1.0|
+------+----------+------+
only showing top 2 rows

 Rating matrx no. of rows : 376593
root
 |-- userId: integer (nullable = true)
 |-- businessId: integer (nullable = true)
 |-- rating: float (nullable = true)



In [27]:
# load a new instance of the saved ALS model
alsn_model = ALSModel.load(model_path + 'alsb')

In [28]:
# generate top 10 business recommendations for each user

userRecoms = alsn_model.recommendForAllUsers(10)


In [29]:
# add the column user_id, cache the recommendaton dataframe and show recommedations sample

a = userRecoms.alias("a")
b = user_newid_df.alias("b")
    
all_userRecoms = a.join(b, col("a.userId") == col("b.userId"), 'inner') \
             .select([col('a.'+xx) for xx in a.columns] + [col('b.user_id')])

all_userRecoms.cache()   
all_userRecoms.show(1, truncate=False)

+------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------+
|userId|recommendations                                                                                                                                                                       |user_id               |
+------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------+
|148   |[[2289, 4.502557], [6851, 4.5001936], [332, 4.337512], [6359, 4.329177], [3603, 4.3270016], [4891, 4.318773], [7003, 4.3168774], [660, 4.2920914], [4195, 4.264028], [7501, 4.259718]]|3xlBfGV9vFUmZapiT5NPfg|
+------+------------------------------------------------------------------------------------------------------------------------------------

In [30]:
# show recommendations for soem user id

u_id = 'ZWD8UH1T7QXQr0Eq-mcWYg'

userFlatRec =  sqlContext.createDataFrame(all_userRecoms.filter(col('user_id') == u_id).rdd.flatMap(lambda p: p[1]))
userFlatRec.show()

+----------+-----------------+
|businessId|           rating|
+----------+-----------------+
|      2289|4.356423377990723|
|      6359|4.317990303039551|
|       660|4.297473430633545|
|      7501|4.277865886688232|
|      4195|4.256645679473877|
|      5468|   4.204833984375|
|      6851|4.203225135803223|
|      7003|4.196661949157715|
|       332|4.180690288543701|
|      4090|4.160634994506836|
+----------+-----------------+



In [31]:
# show the recommeded restaurants details

a = business_new_df.alias("a")
b = userFlatRec.alias("b")

user_collab_df = a.join(b, col("a.businessId") == col("b.businessId"), 'inner') \
                         .select([col('a.'+xx) for xx in a.columns] + [col('b.rating')])
    
user_collab_df.select('business_id', 'business_name', 'rating', 'categories').toPandas()

Unnamed: 0,business_id,business_name,rating,categories
0,otsjAjxf0PNQ99xcmuj_LA,Sushi Making For the Soul,4.356423,"Japanese, Education, Restaurants, Local Flavor"
1,zsLKMCnwK_NmZJkI7TJk1A,Brando's Fried Chicken,4.160635,"Chicken Shop, Restaurants, American (Tradition..."
2,PT6tAoQxtCqsGc7r4nEXLQ,Trinity Square Cafe,4.31799,"Restaurants, Cafes"
3,bumAFxitMRHKAxZMijvUYg,Cuisine of India,4.18069,"Caterers, Event Planning & Services, Restauran..."
4,IM6pHgP2ewa6xhnDk6s2_g,Mikaku Izakaya,4.203225,"Japanese, Restaurants"
5,9GLN1xfck07CKfNfejKCwg,T-Sushi,4.204834,"Sushi Bars, Food, Food Delivery Services, Japa..."
6,v_OLzcpFA7vgVp30vxv2uQ,Silver Spoon,4.256646,"Restaurants, American (New), Canadian (New)"
7,fCZU04T_8lUdXX2aBYisEA,Freshii,4.277866,"Breakfast & Brunch, Specialty Food, Health Mar..."
8,Hn-bPW6z63BjA4XBAFsVgw,Sugar Miracles,4.196662,"Restaurants, Patisserie/Cake Shop, Chocolatier..."
9,1VAsBosvx02jpvIUxiKvmg,The Dumpling Shop,4.297473,"Specialty Food, Food, Chinese, Dim Sum, Restau..."


In [32]:
def getCollabRecom(u_id):
    
    userFlatRec =  sqlContext.createDataFrame(all_userRecoms.filter(col('user_id') == u_id).rdd.flatMap(lambda p: p[1]))

    a = userFlatRec.alias("a")
    b = business_new_df.alias("b")
    
    return a.join(b, col("a.businessId") == col("b.businessId"), 'inner') \
             .select([col('b.business_id'), col('a.rating'), col('b.business_name'),col('b.categories'),
                                                           col('b.stars'),col('b.review_count'),
                                                           col('b.latitude'),col('b.longitude')]) \
             .orderBy("rating", ascending = False)
    

In [33]:
u_id = 'ZWD8UH1T7QXQr0Eq-mcWYg'

getCollabRecom(u_id).toPandas()

Unnamed: 0,business_id,rating,business_name,categories,stars,review_count,latitude,longitude
0,otsjAjxf0PNQ99xcmuj_LA,4.356423,Sushi Making For the Soul,"Japanese, Education, Restaurants, Local Flavor",4.5,3,43.656233,-79.392319
1,PT6tAoQxtCqsGc7r4nEXLQ,4.31799,Trinity Square Cafe,"Restaurants, Cafes",5.0,6,43.654877,-79.38147
2,1VAsBosvx02jpvIUxiKvmg,4.297473,The Dumpling Shop,"Specialty Food, Food, Chinese, Dim Sum, Restau...",4.5,22,43.767971,-79.401363
3,fCZU04T_8lUdXX2aBYisEA,4.277866,Freshii,"Breakfast & Brunch, Specialty Food, Health Mar...",4.5,3,43.659574,-79.381027
4,v_OLzcpFA7vgVp30vxv2uQ,4.256646,Silver Spoon,"Restaurants, American (New), Canadian (New)",5.0,4,43.650883,-79.450832
5,9GLN1xfck07CKfNfejKCwg,4.204834,T-Sushi,"Sushi Bars, Food, Food Delivery Services, Japa...",5.0,20,43.644745,-79.390892
6,IM6pHgP2ewa6xhnDk6s2_g,4.203225,Mikaku Izakaya,"Japanese, Restaurants",4.5,3,43.793327,-79.419321
7,Hn-bPW6z63BjA4XBAFsVgw,4.196662,Sugar Miracles,"Restaurants, Patisserie/Cake Shop, Chocolatier...",5.0,4,43.716805,-79.400696
8,bumAFxitMRHKAxZMijvUYg,4.18069,Cuisine of India,"Caterers, Event Planning & Services, Restauran...",5.0,3,43.782522,-79.474959
9,zsLKMCnwK_NmZJkI7TJk1A,4.160635,Brando's Fried Chicken,"Chicken Shop, Restaurants, American (Tradition...",5.0,3,43.655111,-79.414505
