In [1]:
data = '/Users/gauravsharma/CMPE256/Project/yelp_dataset'
from pyspark import SparkConf, SparkContext, SQLContext
from pyspark.ml.feature import *
from pyspark.sql import *
from pyspark.sql.functions import desc
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

In [2]:
conf = SparkConf()
sc = SparkContext(conf=conf)
sqlCon = SQLContext(sc)

In [3]:
business = sqlCon.read.json(data + "/business.json")
business.count()

192609

In [4]:
#view schema for the businesses
#business.printSchema()
business.groupBy("city").count().sort(desc("count")).show()

+---------------+-----+
|           city|count|
+---------------+-----+
|      Las Vegas|29370|
|        Toronto|18906|
|        Phoenix|18766|
|      Charlotte| 9509|
|     Scottsdale| 8837|
|        Calgary| 7736|
|     Pittsburgh| 7017|
|       Montréal| 6449|
|           Mesa| 6080|
|      Henderson| 4892|
|          Tempe| 4550|
|       Chandler| 4309|
|      Cleveland| 3605|
|       Glendale| 3543|
|        Madison| 3494|
|        Gilbert| 3462|
|    Mississauga| 3112|
|         Peoria| 1919|
|        Markham| 1766|
|North Las Vegas| 1548|
+---------------+-----+
only showing top 20 rows



In [5]:
#only going to keep business address, business_id, categories, name, stars, city, state
#filtering out only Phoenix businesses
business_restaurants = business.select('address', 'business_id', 'categories', 'city', 'stars', 'state').filter(business.categories.contains("Restaurants")).filter(business.city == 'Phoenix')


In [6]:
print('# of businesses in Phoenix:', business_restaurants.count())

# of businesses in Phoenix: 3999


In [7]:
read_reviews = sqlCon.read.json(data + "/review.json")
read_reviews.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- cool: long (nullable = true)
 |-- date: string (nullable = true)
 |-- funny: long (nullable = true)
 |-- review_id: string (nullable = true)
 |-- stars: double (nullable = true)
 |-- text: string (nullable = true)
 |-- useful: long (nullable = true)
 |-- user_id: string (nullable = true)



In [8]:
reviews = read_reviews.select("user_id", "business_id", "stars")
reviews.printSchema()
reviews.show()

root
 |-- user_id: string (nullable = true)
 |-- business_id: string (nullable = true)
 |-- stars: double (nullable = true)

+--------------------+--------------------+-----+
|             user_id|         business_id|stars|
+--------------------+--------------------+-----+
|hG7b0MtEbXx5QzbzE...|ujmEBvifdJM6h6RLv...|  1.0|
|yXQM5uF2jS6es16SJ...|NZnhc2sEQy3RmzKTZ...|  5.0|
|n6-Gk65cPZL6Uz8qR...|WTqjgwHlXbSFevF32...|  5.0|
|dacAIZ6fTM6mqwW5u...|ikCg8xy5JIg_NGPx-...|  5.0|
|ssoyf2_x0EQMed6fg...|b1b1eb3uo-w561D0Z...|  1.0|
|w31MKYsNFMrjhWxxA...|eU_713ec6fTGNO4Be...|  4.0|
|jlu4CztcSxrKx56ba...|3fw2X5bZYeW9xCz_z...|  3.0|
|d6xvYpyzcfbF_AZ8v...|zvO-PJCpNk4fgAVUn...|  1.0|
|sG_h0dIzTKWa3Q6fm...|b2jN2mm9Wf3RcrZCg...|  2.0|
|nMeCE5-xsdleyxYuN...|oxwGyA17NL6c5t1Et...|  3.0|
|FIk4lQQu1eTe2EpzQ...|8mIrX_LrOnAqWsB5J...|  4.0|
|-mA3-1mN4JIEkqOtd...|mRUVMJkUGxrByzMQ2...|  1.0|
|GYNnVehQeXjty0xH7...|FxLfqxdYPA6Z85PFK...|  4.0|
|bAhqAPoWaZYcyYi7b...|LUN6swQYa4xJKaM_U...|  4.0|
|TpyOT5E16YASd7EWj...|Aak

In [9]:
reviews.count()

6685900

In [10]:
#Phoenix_only_reviews = reviews.join(business_restaurants, reviews.business_id == business_restaurants.business_id, "outer")
Phoenix_only_reviews = reviews.join(business_restaurants, reviews.business_id == business_restaurants.business_id).select(reviews["*"])
Phoenix_only_reviews.count()

427491

In [11]:
Phoenix_only_reviews.printSchema()
type(Phoenix_only_reviews)

root
 |-- user_id: string (nullable = true)
 |-- business_id: string (nullable = true)
 |-- stars: double (nullable = true)



pyspark.sql.dataframe.DataFrame

In [12]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.ml import Pipeline
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(Phoenix_only_reviews) for column in list(set(Phoenix_only_reviews.columns)-set(['stars'])) ]

pipeline = Pipeline(stages=indexers)
data = pipeline.fit(Phoenix_only_reviews).transform(Phoenix_only_reviews)

In [13]:
data.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- business_id: string (nullable = true)
 |-- stars: double (nullable = true)
 |-- business_id_index: double (nullable = false)
 |-- user_id_index: double (nullable = false)



In [14]:
(training, test) = data.randomSplit([0.8, 0.2])

In [15]:
als = ALS(maxIter=10,regParam=0.09,rank=10,userCol="user_id_index",itemCol="business_id_index",ratingCol="stars",coldStartStrategy="drop",nonnegative=True)
model = als.fit(training)   

In [16]:
evaluator=RegressionEvaluator(metricName="rmse",labelCol="stars",predictionCol="prediction")
predictions=model.transform(test)
rmse=evaluator.evaluate(predictions)
print("RMSE="+str(rmse))

RMSE=1.557922916929912


In [17]:
als = ALS(maxIter=15,regParam=0.09,rank=15,userCol="user_id_index",itemCol="business_id_index",ratingCol="stars",coldStartStrategy="drop",nonnegative=True)
model = als.fit(training)

In [18]:
evaluator=RegressionEvaluator(metricName="rmse",labelCol="stars",predictionCol="prediction")
predictions=model.transform(test)
rmse=evaluator.evaluate(predictions)
print("RMSE="+str(rmse))

RMSE=1.5233345840638375


In [15]:
als = ALS(maxIter=20,regParam=0.09,rank=20,userCol="user_id_index",itemCol="business_id_index",ratingCol="stars",coldStartStrategy="drop",nonnegative=True)
model = als.fit(training)

In [16]:
evaluator=RegressionEvaluator(metricName="rmse",labelCol="stars",predictionCol="prediction")
predictions=model.transform(test)
rmse=evaluator.evaluate(predictions)
print("RMSE="+str(rmse))
predictions.show()

RMSE=1.4962786386107436
+--------------------+--------------------+-----+-----------------+-------------+----------+
|             user_id|         business_id|stars|business_id_index|user_id_index|prediction|
+--------------------+--------------------+-----+-----------------+-------------+----------+
|gmgQ4HnzCKeZpAa_g...|LtNgP4FqXp5nMFOHE...|  2.0|            148.0|       5300.0| 3.3272412|
|E-VrONfypxIWIf_jx...|LtNgP4FqXp5nMFOHE...|  2.0|            148.0|      12006.0| 3.7092657|
|jt2BvQSjk3fDp8qDx...|LtNgP4FqXp5nMFOHE...|  5.0|            148.0|      19729.0| 3.2537029|
|QURuwUr6U4nzpBneb...|LtNgP4FqXp5nMFOHE...|  5.0|            148.0|      39420.0| 3.8487353|
|lCQ-2TSe8E9rxZxd6...|LtNgP4FqXp5nMFOHE...|  5.0|            148.0|      10506.0| 3.0340836|
|ZbTLIxe1i2qsBJJJR...|LtNgP4FqXp5nMFOHE...|  5.0|            148.0|      17983.0| 4.1562223|
|-3UR6uwjT2kRjbQv1...|LtNgP4FqXp5nMFOHE...|  4.0|            148.0|       9387.0| 3.9020534|
|2bmNcJA9EQFSu0Tmv...|LtNgP4FqXp5nMFOHE...|  4