# Big Data Final Project Yelp 
## Group Members: Haoning Liu, Jiaqi Chen, Mengqi Liu, Xiaolu Li

In [1]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("BD4").getOrCreate()
spark

### Data Cleaning

In [14]:
import pandas as pd
import csv

In [None]:
# Load one of four data files 'review' from local
review = pd.read_csv("/Users/lhnzm/yelp_review.csv")

In [None]:
# Remove comma, quotation mark, and changing line character from text column
review['text'] = review['text'].str.replace(',', '')
review['text'] = review['text'].str.replace('"', '')
review['text'] = review['text'].str.replace('\r\n', '')
review['text'] = review['text'].str.replace('\n', '')

In [None]:
# Write out separating with comma and upload to S3 bucket
review.to_csv('review1.csv', sep=',')

### Read in CSV files from S3 bucket

#### 1. Review data

In [2]:
review = spark.read\
  .format('csv')\
  .option('header', 'true')\
  .option('inferSchema', 'true')\
  .load('s3://bigdataclasslhn/Prj_Yelp/review1.csv')

In [3]:
review=review.drop('_c0')

#### Check schema

In [4]:
review.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- business_id: string (nullable = true)
 |-- stars: integer (nullable = true)
 |-- useful: integer (nullable = true)
 |-- funny: integer (nullable = true)
 |-- cool: integer (nullable = true)
 |-- text: string (nullable = true)
 |-- date: timestamp (nullable = true)



#### Check unique values for each column and whether data has been read in correct format

In [5]:
review.select('business_id').distinct().show()

+--------------------+
|         business_id|
+--------------------+
|f4mh1Y0rnvbJRfQ3j...|
|cKwg6HFaLYXl7Ar0r...|
|jcpgiXF0PCyS9hrvq...|
|R_M4P9XetEM-aLE7e...|
|DEBqmgxv2yhJ93LqG...|
|Cml4Yt5cTx64cOMan...|
|bo3SQVtErnMOqO6lk...|
|Cl-xl1vTUwHeaGgBx...|
|oIEmXWLtoh5blz-iw...|
|Op2IR4FffXZ5KXYFn...|
|yB5FMuc9Y3oyhsOmu...|
|cEqOh78v1g1RCWHyu...|
|lt8IW9Bpy9GMeKGxy...|
|uC3qwaxsOkdJzpOc0...|
|686oeWNsbc-aczplC...|
|gPuxh3HNvoVt8aWVW...|
|mA27CG2U3ytmkxIGV...|
|x6qH9HXhzuKM03jcZ...|
|74LU6K2ro5AQXKT0J...|
|TdefcbsFAj6WXHwlG...|
+--------------------+
only showing top 20 rows



In [6]:
review.select('business_id').distinct().count()

76755

In [7]:
review.select('stars').distinct().show()

+-----+
|stars|
+-----+
| null|
|    1|
|    3|
|    5|
|    4|
|    2|
+-----+



In [8]:
review.select('user_id').distinct().show()

+--------------------+
|             user_id|
+--------------------+
|rs3pq6wRmaSIADCIn...|
|xS6kmkMXp0PRrFwkS...|
|aNOSjqQFsrfcgmFtO...|
|-9da1xk7zgnnfO1uT...|
|PLjruA-EMskWfirBU...|
|O-frog8VhICKAT0gr...|
|7o473jeLWW-zgKN-Q...|
|L1XxGWFJ3S7xBQCT8...|
|D2ljL5ejuqpa4f8fn...|
|CzkWUMIYDxUSetfCR...|
|5avk-VCo_6Bx65ct1...|
|oKWVVqPWVzq5s6nS4...|
|e5kxYMksMVWApEJdO...|
|f-6oae7TltlfJicUi...|
|NL9jmu5jSkCdMM-i9...|
|z6gjzFENiQf-K3lPy...|
|Al2g2P9gt057Julh1...|
|midS4e50ZmuOeGyNm...|
|yTr8nlIjQCJWc0ZIC...|
|yb0AdKzhYwQIlt47r...|
+--------------------+
only showing top 20 rows



In [9]:
review.select('useful').distinct().show()

+------+
|useful|
+------+
|   148|
|    31|
|    85|
|   251|
|   808|
|   137|
|    65|
|    53|
|   255|
|   970|
|   133|
|    78|
|   362|
|   108|
|   155|
|    34|
|   193|
|   101|
|   126|
|   115|
+------+
only showing top 20 rows



In [10]:
review.select('funny').distinct().show()

+-----+
|funny|
+-----+
|  148|
|   31|
|   85|
|  137|
|   65|
|   53|
|  970|
|  133|
|   78|
|  322|
|  108|
|  155|
|   34|
|  101|
|  115|
|   81|
|   28|
|  183|
|  412|
|   76|
+-----+
only showing top 20 rows



In [11]:
review.select('date').distinct().show()

+-------------------+
|               date|
+-------------------+
|2010-10-05 19:12:35|
|2016-02-11 22:26:21|
|2015-01-18 16:00:39|
|2012-11-06 05:23:38|
|2013-11-14 04:08:35|
|2015-06-21 02:01:34|
|2018-09-25 05:26:16|
|2015-04-01 17:43:03|
|2014-04-19 12:06:45|
|2018-09-25 03:51:25|
|2014-02-23 21:39:30|
|2016-08-03 23:31:26|
|2016-07-25 07:00:01|
|2018-04-04 03:02:29|
|2017-08-28 19:16:03|
|2017-10-11 02:23:26|
|2016-06-22 21:16:05|
|2008-08-18 22:32:45|
|2007-12-09 15:16:21|
|2015-05-12 16:16:59|
+-------------------+
only showing top 20 rows



#### Select columns we need

In [5]:
review = review.drop('review_id','text','date')

In [37]:
review.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- business_id: string (nullable = true)
 |-- stars: integer (nullable = true)
 |-- useful: integer (nullable = true)
 |-- funny: integer (nullable = true)
 |-- cool: integer (nullable = true)



#### 2. Business data

In [7]:
business = spark.read\
  .format('csv')\
  .option('header', 'true')\
  .option('inferSchema', 'true')\
  .load('s3://bigdataclasslhn/Prj_Yelp/yelp_business.csv')

In [8]:
business = business.withColumnRenamed('stars', 'stars_business')

In [9]:
business.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- address: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- postal_code: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- stars_business: double (nullable = true)
 |-- review_count: double (nullable = true)
 |-- is_open: integer (nullable = true)
 |-- attributes.GoodForKids: string (nullable = true)
 |-- attributes.RestaurantsReservations: string (nullable = true)
 |-- attributes.GoodForMeal: string (nullable = true)
 |-- attributes.BusinessParking: string (nullable = true)
 |-- attributes.Caters: string (nullable = true)
 |-- attributes.NoiseLevel: string (nullable = true)
 |-- attributes.RestaurantsTableService: string (nullable = true)
 |-- attributes.RestaurantsTakeOut: string (nullable = true)
 |-- attributes.RestaurantsPriceRange2: string (nullable = true)
 |-- attributes.OutdoorS

#### Select columns we need

In [40]:
business = business.select('business_id','state','city','latitude','longitude',"stars_business","review_count")

In [41]:
business.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- state: string (nullable = true)
 |-- city: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- stars_business: double (nullable = true)
 |-- review_count: double (nullable = true)



#### 3. Tip data

In [12]:
tip = spark.read\
  .format('csv')\
  .option('header', 'true')\
  .option('inferSchema', 'true')\
  .load('s3://bigdataclasslhn/Prj_Yelp/yelp_tip.csv')

In [13]:
tip.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- business_id: string (nullable = true)
 |-- text: string (nullable = true)
 |-- date: string (nullable = true)
 |-- compliment_count: string (nullable = true)



#### Select columns we need

In [42]:
tip = tip.select('user_id','business_id','compliment_count')

In [43]:
tip.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- business_id: string (nullable = true)
 |-- compliment_count: string (nullable = true)



#### 4. User data

In [16]:
user = spark.read\
  .format('csv')\
  .option('header', 'true')\
  .option('inferSchema', 'true')\
  .load('s3://bigdataclasslhn/Prj_Yelp/yelp_user.csv')

In [17]:
user = user.withColumnRenamed('useful','useful_user')\
.withColumnRenamed('funny','funny_user')\
.withColumnRenamed('cool','cool_user')\
.withColumnRenamed('review_count','review_count_user')

#### Select columns we need

In [44]:
user = user.select('user_id','average_stars')

In [45]:
user.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- average_stars: double (nullable = true)



### Merge data by business ID and user ID

In [46]:
combined1 = review.join(business, on='business_id', how='left_outer')
#combined1.printSchema()

In [47]:
combined2 = combined1.join(user, on='user_id', how='left_outer')
#combined2.printSchema()

#### Print schema of combined data 

In [48]:
combined = combined2.join(tip, on=['business_id', 'user_id',], how='left_outer')
combined.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- stars: integer (nullable = true)
 |-- useful: integer (nullable = true)
 |-- funny: integer (nullable = true)
 |-- cool: integer (nullable = true)
 |-- state: string (nullable = true)
 |-- city: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- stars_business: double (nullable = true)
 |-- review_count: double (nullable = true)
 |-- average_stars: double (nullable = true)
 |-- compliment_count: string (nullable = true)



In [85]:
combined.count()

2555074

### Prepare data for Logistic model

#### Check null value in stars column

In [86]:
combined.groupby(combined.stars).count().show()

+-----+-------+
|stars|  count|
+-----+-------+
| null|    134|
|    1| 371228|
|    3| 281425|
|    5|1131301|
|    4| 567828|
|    2| 203158|
+-----+-------+



#### Remove rows with nulls in stars column because the number of nulls is only 134

In [49]:
combined = combined.filter(combined.stars.isNotNull())

#### Convert stars no more than 3 to 0 while stars more than 3 to 1

In [50]:
from pyspark.sql.functions import when

df = combined.withColumn('stars',when(combined.stars <= 3, '0').otherwise('1'))

#### Count boolean values in stars column

In [25]:
df.groupby(df.stars).count().show()

+-----+-------+
|stars|  count|
+-----+-------+
|    0| 855811|
|    1|1699129|
+-----+-------+



#### Create pipeline and train a model

In [51]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer, IndexToString, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline, Model

In [52]:
# Convert all the string fields to numeric ones
stringIndexer_label = StringIndexer(inputCol="stars", outputCol="label",handleInvalid='keep')

stringIndexer_city = StringIndexer(inputCol="city", outputCol="city_SI",handleInvalid='keep')
stringIndexer_state = StringIndexer(inputCol="state", outputCol="state_SI",handleInvalid='keep')
stringIndexer_compliment_count = StringIndexer(inputCol="compliment_count", outputCol="compliment_count_SI",handleInvalid='keep')


In [53]:
# Look at values for one of the re-encoded columns using fit method
si_label_fit = StringIndexer(inputCol="stars", outputCol="label",handleInvalid='keep').fit(df)

In [54]:
si_label_fit.labels

['1', '0']

In [55]:
# Create a feature vector by combining all features together

vectorAssembler_features = VectorAssembler(
    inputCols=["state_SI", 
               "city_SI",
               "compliment_count_SI", 
               "useful",
               "funny",
               "cool",
               "latitude",
               "longitude",
               "stars_business",
               "review_count",
               "average_stars"], 
    outputCol="features",
    handleInvalid='keep')
vectorAssembler_features

VectorAssembler_de90272bd2bc

###  Build a logistic regression model

In [56]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(labelCol="label", featuresCol="features")

### Index labels back to original labels

In [57]:
labelConverter = IndexToString(inputCol="prediction", 
                               outputCol="predictedLabel", 
                               labels=stringIndexer_label.fit(df).labels)

### Create a feature vector by combining all features together

In [58]:
pipeline_lr = Pipeline(stages=[stringIndexer_label,
                               stringIndexer_state,
                               stringIndexer_city,
                               stringIndexer_compliment_count,
                               vectorAssembler_features,
                               lr,labelConverter])

### Create 2 splits of combined data (train, test) by using the randomSplit method

In [62]:
splitted_data = df.randomSplit([0.8, 0.2])
train_data = splitted_data[0]
test_data = splitted_data[1]

print("Number of training records: " + str(train_data.count()))
print("Number of testing records : " + str(test_data.count()))

Number of training records: 2045032
Number of testing records : 509908


#### Check schema of training data

In [63]:
train_data.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- stars: string (nullable = false)
 |-- useful: integer (nullable = true)
 |-- funny: integer (nullable = true)
 |-- cool: integer (nullable = true)
 |-- state: string (nullable = true)
 |-- city: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- stars_business: double (nullable = true)
 |-- review_count: double (nullable = true)
 |-- average_stars: double (nullable = true)
 |-- compliment_count: string (nullable = true)



### Train model with training data

In [64]:
model_lr = pipeline_lr.fit(train_data)

### Do predictions with testing data

In [65]:
predictions = model_lr.transform(test_data)

### Area under the curve (AUC) for binary classifier

In [66]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluatorLR = BinaryClassificationEvaluator()
accuracy = evaluatorLR.evaluate(predictions)

print("Accuracy = %g" % accuracy)

Accuracy = 0.818937


In [None]:
spark.stop()