## Look at the data

In [1]:
#Add the current path into the system path
import sys,os
curPath = os.path.abspath(os.path.dirname("/home/murrawang/"))
sys.path.append(curPath)

# Import necessary libraries
from pyspark.sql import SQLContext
import graphframes
from graphframes import *
from AggregateMessages import *

import matplotlib.pyplot as plt
%matplotlib inline

from pyspark.sql.functions import col, lit, when, avg, collect_list, mean
from pyspark.sql import Row

from datetime import datetime
import numpy as np

from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

In [24]:
#Yelp business data
business_path = "hdfs:///yelp/yelp_academic_dataset_business.json"
yelp_business = spark.read.json(business_path)
# Check which states Yelp concentrates on
yelp_business.select("state").rdd.map(lambda x: (x, 1)).reduceByKey(lambda x,y : x+y).collect()

[(Row(state=u'IL'), 1),
 (Row(state=u'AZ'), 2),
 (Row(state=u'VA'), 2),
 (Row(state=u'ON'), 2),
 (Row(state=u'OR'), 25175),
 (Row(state=u'ABE'), 1),
 (Row(state=u'MN'), 1),
 (Row(state=u'BC'), 17298),
 (Row(state=u'NM'), 1),
 (Row(state=u'NC'), 1),
 (Row(state=u'NY'), 2),
 (Row(state=u'OH'), 11258),
 (Row(state=u'DE'), 1),
 (Row(state=u'DC'), 1),
 (Row(state=u'HI'), 1),
 (Row(state=u'AL'), 1),
 (Row(state=u'KY'), 1),
 (Row(state=u'NH'), 4),
 (Row(state=u'GA'), 18090),
 (Row(state=u'MA'), 36012),
 (Row(state=u'FL'), 21907),
 (Row(state=u'WY'), 1),
 (Row(state=u'CO'), 3198),
 (Row(state=u'CA'), 13),
 (Row(state=u'KS'), 1),
 (Row(state=u'TX'), 24485),
 (Row(state=u'ME'), 1),
 (Row(state=u'MI'), 1),
 (Row(state=u'OK'), 1),
 (Row(state=u'WI'), 1),
 (Row(state=u'WA'), 3121)]

In [25]:
# Here I select businesses in Washington state
yelp_business = yelp_business.filter(yelp_business["state"] == "WA").sort("review_count", ascending=False).limit(100)
yelp_business.show(5)

+--------------------+--------------------+--------------------+--------------------+---------+--------------------+-------+-------------+---------------+--------------------+-----------+------------+-----+-----+
|             address|          attributes|         business_id|          categories|     city|               hours|is_open|     latitude|      longitude|                name|postal_code|review_count|stars|state|
+--------------------+--------------------+--------------------+--------------------+---------+--------------------+-------+-------------+---------------+--------------------+-----------+------------+-----+-----+
|1919 SE Columbia ...|[,, 'full_bar', {...|R8fLQ6TLz06MQR69K...|Bars, Pizza, Amer...|Vancouver|[11:0-22:0, 0:0-0...|      1|   45.6148087|   -122.6520268|Beaches Restauran...|      98661|         719|  3.5|   WA|
|204 SE Park Plaza...|[,, 'beer_and_win...|0ipN94g6plg9LZIsD...|Noodles, Restaura...|Vancouver|[11:0-21:0, 0:0-0...|      1|   45.6197591|   -122.53

In [4]:
yelp_business.printSchema()

root
 |-- address: string (nullable = true)
 |-- attributes: struct (nullable = true)
 |    |-- AcceptsInsurance: string (nullable = true)
 |    |-- AgesAllowed: string (nullable = true)
 |    |-- Alcohol: string (nullable = true)
 |    |-- Ambience: string (nullable = true)
 |    |-- BYOB: string (nullable = true)
 |    |-- BYOBCorkage: string (nullable = true)
 |    |-- BestNights: string (nullable = true)
 |    |-- BikeParking: string (nullable = true)
 |    |-- BusinessAcceptsBitcoin: string (nullable = true)
 |    |-- BusinessAcceptsCreditCards: string (nullable = true)
 |    |-- BusinessParking: string (nullable = true)
 |    |-- ByAppointmentOnly: string (nullable = true)
 |    |-- Caters: string (nullable = true)
 |    |-- CoatCheck: string (nullable = true)
 |    |-- Corkage: string (nullable = true)
 |    |-- DietaryRestrictions: string (nullable = true)
 |    |-- DogsAllowed: string (nullable = true)
 |    |-- DriveThru: string (nullable = true)
 |    |-- GoodForDancing: str

In [26]:
yelp_business.count()

100

In [27]:
#Yelp review data
review_path = "hdfs:///yelp/yelp_academic_dataset_review.json"
yelp_review = spark.read.json(review_path)
# Here I only choose reviews on businesses in Washington state
# I apply "dropDuplicates" so that a user only gives one review to a particular business, otherwise our prediction may 
# diverge since the sum of the similarities can exceed one.
yelp_review = yelp_review.join(yelp_business, "business_id", "right").\
                    select("business_id", "user_id", yelp_review.stars).dropDuplicates(["business_id", "user_id"])
yelp_review.show(5)

+--------------------+--------------------+-----+
|         business_id|             user_id|stars|
+--------------------+--------------------+-----+
|kwHcdJttuelU1TTZV...|cBRHJfnMDQOmQ5Wv8...|  5.0|
|kwHcdJttuelU1TTZV...|fbOZSp-SSf01CXKLJ...|  5.0|
|kwHcdJttuelU1TTZV...|wSsDhmy-4fGr_JDq3...|  5.0|
|kwHcdJttuelU1TTZV...|NeTiqk1vFRE-UBDZq...|  5.0|
|kwHcdJttuelU1TTZV...|VVy0yyu1CkzJcPI6s...|  4.0|
+--------------------+--------------------+-----+
only showing top 5 rows



In [28]:
yelp_review.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- stars: double (nullable = true)



In [29]:
yelp_review.count()

31700

In [30]:
#save the review dataset
yelp_review.write.save("hdfs:///yelp/yelp_review1.json", format="json")

## Loading the small datasets

In [1]:
#Add the current path into the system path
import sys,os
curPath = os.path.abspath(os.path.dirname("/home/murrawang/"))
sys.path.append(curPath)

# Import necessary libraries
from pyspark.sql import SQLContext
import graphframes
from graphframes import *
from AggregateMessages import *

import matplotlib.pyplot as plt
%matplotlib inline

from pyspark.sql.functions import col, lit, when, avg, collect_list, mean
from pyspark.sql import Row

from datetime import datetime
import numpy as np

from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

In [2]:
#loading the target dataset
review_path = "hdfs:///yelp/yelp_review1.json"
yelp_review = spark.read.json(review_path)

In [3]:
# split the review data to training and test parts
(training_r, test_r) = yelp_review.randomSplit([0.8, 0.2], seed=11)
training_r.cache()
test_r.cache()
print('Number of partitions: ', training_r.rdd.getNumPartitions())
print('Number of workers:', int(sc._conf.get('spark.executor.instances')))

('Number of partitions: ', 6)
('Number of workers:', 2)


In [4]:
yelp_review.count()

31700

In [35]:
training_r.count()

25464

In [36]:
test_r.count()

6236

In [5]:
# Construct a dataframe for users with necessary information (average ratings and review counts)
yelp_user = yelp_review.select("user_id").distinct()
yelp_user = yelp_user.join(training_r.groupBy("user_id").avg("stars").withColumnRenamed("avg(stars)", "avg_stars"), "user_id", "left").\
                  join(training_r.groupBy("user_id").count().withColumnRenamed("count", "review_count"), "user_id", "left").na.fill(0)

In [38]:
yelp_user.show(5)

+--------------------+---------+------------+
|             user_id|avg_stars|review_count|
+--------------------+---------+------------+
|bC95AAAR3aTHqZ7Mh...|      3.0|           1|
|90xWufII45Dvk-_s9...|      5.0|           1|
|bUgPFR3-6jB-BGZ6k...|      0.0|           0|
|A3Rahgq3ufx81IjbE...|      3.0|           2|
|WsdIdnGRnr3vI4CRh...|      5.0|           2|
+--------------------+---------+------------+
only showing top 5 rows



In [39]:
yelp_user.count()

19379

In [6]:
# Construct a dataframe for businesses with necessary information (average ratings and review counts)
yelp_business = yelp_review.select("business_id").distinct()
yelp_business = yelp_business.join(training_r.groupBy("business_id").avg("stars").withColumnRenamed("avg(stars)", "avg_stars"), "business_id", "left").\
                  join(training_r.groupBy("business_id").count().withColumnRenamed("count", "review_count"), "business_id", "left").na.fill(0)

In [41]:
yelp_business.show(5)

+--------------------+------------------+------------+
|         business_id|         avg_stars|review_count|
+--------------------+------------------+------------+
|kwHcdJttuelU1TTZV...|3.8520408163265305|         196|
|oqr6tcO7fV32n5s4k...| 3.393939393939394|         198|
|eY3-ZE8-njGOglACq...| 4.328813559322034|         295|
|3XO0ZkbgRdLlLLDux...| 4.289752650176679|         283|
|yeV3ECApOd1PSe9Zd...| 4.112903225806452|         186|
+--------------------+------------------+------------+
only showing top 5 rows



In [42]:
yelp_business.count()

100

In [7]:
# look at the degree of sparseness
print("Average number of reviews of users: ", float(yelp_review.count())/float(yelp_user.count()))
print("Average number of reviews of businesses: ", float(yelp_review.count())/float(yelp_business.count()))

('Average number of reviews of users: ', 1.635791320501574)
('Average number of reviews of businesses: ', 317.0)


## Build the Bipartite Graph

In [8]:
# "U" is the prefix for user id and "B" is the prefix for business id to make the vertex ids unique 
# (i.e. to avoid an user id with the same value of a business id)
businessVertices = yelp_business.rdd.map(lambda r: Row(id="B"+str(r.business_id), vtype="business", avg_stars=r.avg_stars, review_count=r.review_count)).toDF()
userVertices = yelp_user.rdd.map(lambda r: Row(id="U"+str(r.user_id), vtype="user", avg_stars=r.avg_stars, review_count=r.review_count)).toDF()

# Pack the users' information into edges for further use! (calculate the similarity)
# Pack the businesses' information into edges for the prediction part!
reviewEdges = training_r.join(yelp_user, "user_id", "left")\
                      .rdd.map(lambda r: Row(src="U"+str(r.user_id),dst="B"+str(r.business_id), \
                       ustars=["U"+str(r.user_id), r.avg_stars*r.review_count, r.stars], \
                        bstars=["B"+str(r.business_id), r.stars])).toDF()
    
# build the graph
bipartiteVertices = userVertices.unionAll(businessVertices)
bipartiteGraph = GraphFrame(bipartiteVertices, reviewEdges)

In [45]:
reviewEdges.count()

25464

In [9]:
# aggregateMessages
msgtoDst = AM.edge['ustars']
business_withur = bipartiteGraph.aggregateMessages\
                            (collect_list(AM.msg).alias('ur_list'), sendToDst=msgtoDst).join(businessVertices, "id")

In [10]:
# Create pairs of businesses
yelp_business1 = business_withur.rdd.map(lambda r: \
                    Row(b1_id=r.id, b1_avg_stars=r.avg_stars, b1_review_count=r.review_count, b1_urlist=r.ur_list, dummy=1)).toDF()
yelp_business2 = business_withur.rdd.map(lambda r: \
                    Row(b2_id=r.id, b2_avg_stars=r.avg_stars, b2_review_count=r.review_count, b2_urlist=r.ur_list, dummy=1)).toDF()
business_pairs = yelp_business1.join(yelp_business2, "dummy", "outer").drop("dummy")

In [48]:
business_pairs.show(10)

+------------------+--------------------+---------------+--------------------+------------------+--------------------+---------------+--------------------+
|      b1_avg_stars|               b1_id|b1_review_count|           b1_urlist|      b2_avg_stars|               b2_id|b2_review_count|           b2_urlist|
+------------------+--------------------+---------------+--------------------+------------------+--------------------+---------------+--------------------+
|3.2666666666666666|BKqsk9xD-gucxHusL...|            195|[[U2nBxUx32MQ4cFQ...|3.2666666666666666|BKqsk9xD-gucxHusL...|            195|[[U2nBxUx32MQ4cFQ...|
|3.2666666666666666|BKqsk9xD-gucxHusL...|            195|[[U2nBxUx32MQ4cFQ...|  2.73568281938326|Bggqr9BEklylICKl7...|            227|[[UctRgajdIN6VGyx...|
|3.2666666666666666|BKqsk9xD-gucxHusL...|            195|[[U2nBxUx32MQ4cFQ...|3.3556485355648538|B9PnB4EYjBAgpMp3b...|            239|[[ULie-hv2Ec3KRfs...|
|3.2666666666666666|BKqsk9xD-gucxHusL...|            195|[[U2nBx

In [49]:
business_pairs.count()

10000

In [50]:
business_pairs.printSchema()

root
 |-- b1_avg_stars: double (nullable = true)
 |-- b1_id: string (nullable = true)
 |-- b1_review_count: long (nullable = true)
 |-- b1_urlist: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)
 |-- b2_avg_stars: double (nullable = true)
 |-- b2_id: string (nullable = true)
 |-- b2_review_count: long (nullable = true)
 |-- b2_urlist: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)



In [11]:
# Calculating the similarity
def similarity(row):
    sim = 0
    for i in range(len(row.b1_urlist)):
        for j in range(len(row.b2_urlist)):
            if (row.b1_urlist[i][0] == row.b2_urlist[j][0]):
                sim += (float(row.b1_urlist[i][2])*float(row.b2_urlist[j][2]))/(float(row.b1_urlist[i][1])*row.b1_avg_stars*row.b1_review_count)
    return Row(b1_id=row.b1_id, b2_id=row.b2_id, b1_avg_stars=row.b1_avg_stars, b2_avg_stars=row.b2_avg_stars, \
               b1_review_count=row.b1_review_count, b2_review_count=row.b2_review_count, b1_urlist=row.b1_urlist, b2_urlist=row.b2_urlist,\
              similarity=sim)

In [12]:
business_similarity = business_pairs.rdd.map(similarity).toDF().where((col("similarity").isNotNull()) & (col("similarity") != 0))

In [None]:
business_similarity.show(10)

In [None]:
business_similarity.count()

9516

## The weighted Bipartite Graph Projection

In [14]:
# Now we construct the weighted Bipartite Graph Projection for the business set where the weights are similarities between businesses
# Pack the business_2's information into edges for the prediction part!
weightEdges = business_similarity.rdd.map(lambda r: Row(src=r.b1_id, dst=r.b2_id, sim=[r.b2_id, r.b2_avg_stars, r.similarity])).toDF()

# build the graph
BGprojection = GraphFrame(businessVertices, weightEdges)

In [15]:
# Add the prefix "U" and "B" to match the columns in the graph
testr_withprefix = test_r.rdd.map(lambda r: Row(business_id="B"+str(r.business_id), user_id="U"+str(r.user_id), stars=r.stars)).toDF()

In [16]:
# aggregateMessages for similarities
msgtoSrc = AM.edge['sim']
testr_withsim = BGprojection.aggregateMessages\
                            (collect_list(AM.msg).alias('sim_list'), sendToSrc=msgtoSrc).withColumnRenamed("id", "business_id").\
                            join(testr_withprefix, "business_id", "right").\
                            join(businessVertices.select("id", "avg_stars").withColumnRenamed("id", "business_id"), "business_id", "left")

In [None]:
testr_withsim.cache()
testr_withsim.show(5)

+--------------------+--------------------+-----+--------------------+------------------+
|         business_id|            sim_list|stars|             user_id|         avg_stars|
+--------------------+--------------------+-----+--------------------+------------------+
|BKqsk9xD-gucxHusL...|[[Bggqr9BEklylICK...|  1.0|U8-NY40fdXUIuE_qb...|3.2666666666666666|
|BKqsk9xD-gucxHusL...|[[Bggqr9BEklylICK...|  1.0|UEDi1xp0MhGHCSsIs...|3.2666666666666666|
|BKqsk9xD-gucxHusL...|[[Bggqr9BEklylICK...|  1.0|UP2pKOuQvVSNQvfQ3...|3.2666666666666666|
|BKqsk9xD-gucxHusL...|[[Bggqr9BEklylICK...|  1.0|UQrmvIxbRKJbyEzLZ...|3.2666666666666666|
|BKqsk9xD-gucxHusL...|[[Bggqr9BEklylICK...|  1.0|UsITomiR7clEXeWPK...|3.2666666666666666|
+--------------------+--------------------+-----+--------------------+------------------+
only showing top 5 rows



In [18]:
# aggregateMessages for br_lists
msgtoSrc = AM.edge['bstars']
testr_withsimbr = bipartiteGraph.aggregateMessages\
                            (collect_list(AM.msg).alias('br_list'), sendToSrc=msgtoSrc).withColumnRenamed("id", "user_id").\
                            join(testr_withsim, "user_id", "right")

In [29]:
testr_withsimbr.cache()
testr_withsimbr.show(5)

[Row(user_id=u'UVpOelcT-TOgmxaxXR7z5qQ', br_list=None, business_id=u'BYXmYrqH__hwgO2aHWGmN9w', sim_list=[[u'BYXmYrqH__hwgO2aHWGmN9w', u'2.2758620689655173', u'1.0']], stars=5.0, avg_stars=2.2758620689655173),
 Row(user_id=u'UJJ3WxVE0lt8M9yfqpI88TA', br_list=None, business_id=u'BBAt03TGTq4V4371JUJRHmA', sim_list=[[u'BBAt03TGTq4V4371JUJRHmA', u'3.933333333333333', u'0.9999999999999999']], stars=5.0, avg_stars=3.933333333333333),
 Row(user_id=u'UPqV8oL8xQsJIVFPn1l-Dxw', br_list=None, business_id=u'B0L_JnBEWB1OH0KkafQ2OYQ', sim_list=[[u'B0L_JnBEWB1OH0KkafQ2OYQ', u'4.851851851851852', u'0.9999999999999994']], stars=5.0, avg_stars=4.851851851851852)]

In [19]:
# predict the ratings
def Prediction(row):
    pred = row.avg_stars
    if (row.sim_list is not None and row.br_list is not None):
        for i in range(len(row.sim_list)):
            for j in range(len(row.br_list)):
                if row.sim_list[i][0] == row.br_list[j][0]:
                    pred += float(row.sim_list[i][2])*(float(row.br_list[j][1])-float(row.sim_list[i][1]))
    return Row(business_id=row.business_id, user_id=row.user_id, actual_stars=row.stars, pred_stars=pred)

In [20]:
#Finally, we get the predictions.
result = testr_withsimbr.rdd.map(Prediction).toDF()
result.show(5)

+------------+--------------------+------------------+--------------------+
|actual_stars|         business_id|        pred_stars|             user_id|
+------------+--------------------+------------------+--------------------+
|         4.0|BeY3-ZE8-njGOglAC...| 4.328813559322034|U-08ESkTAS5j9SZDG...|
|         2.0|BRJBCFf7Ov6dRguTr...| 3.808147915072359|U2lMVWQYWb-_Wpj7C...|
|         5.0|BTDQ0lSTHW3RyfVWQ...|3.7987484266409144|U2lMVWQYWb-_Wpj7C...|
|         5.0|BrgGJMwcrNTuCcY49...| 4.153846153846154|U3xKbi6-gT4nY_V23...|
|         5.0|BIMYh4LJm1ubY5Lw3...| 4.709283865127501|UB94Qv61bCM9Ru5_O...|
+------------+--------------------+------------------+--------------------+
only showing top 5 rows



In [21]:
# evaluate the RMSE
evaluator = RegressionEvaluator(metricName="rmse", labelCol="actual_stars", predictionCol="pred_stars")
rmse = evaluator.evaluate(result)
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 1.30947407919


In [22]:
testr_withsim.count()

6236