In [2]:
#import findspark
#findspark.init()
from pyspark import SparkConf, SparkContext
from pyspark.sql.session import SparkSession
from pyspark import sql
from pyspark.sql import SQLContext
from pyspark.sql import functions as fun
from pyspark.ml.feature import StopWordsRemover,Tokenizer, HashingTF, IDF
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.sql.types import IntegerType
import re

#SparkContext
conf = SparkConf().setMaster("local").setAppName("final")
sc = SparkContext(conf=conf)
spark = SparkSession(sc)

# Business data set

In [2]:
bus = spark.read.csv("yelp-dataset/yelp_business.csv", header=True, escape = '"')
bus = bus.withColumn('categories',fun.lower(fun.col('categories')))
bus = bus.selectExpr('business_id as business_id','categories as categories','categories as cat_class')
#bus.show()

In [3]:
# bus.where(bus.cat_class.like('%restaurant%')).show()
bus = bus.withColumn('cat_class', fun.when(bus.cat_class.like('%restaurant%'), 'Resutaurant').otherwise(bus.cat_class))
bus = bus.withColumn('cat_class', fun.when(bus.cat_class.like('%dessert%') | bus.cat_class.like('%coffee%'), 'Cafe').otherwise(bus.cat_class))
bus = bus.withColumn('cat_class', fun.when(bus.cat_class.like('%hotel%'), 'Hotel').otherwise(bus.cat_class))
bus = bus.withColumn('cat_class', fun.when(bus.cat_class.like('%salon%'), 'Salon').otherwise(bus.cat_class))
bus = bus.withColumn('cat_class', fun.when(bus.cat_class.like('%arts & entertainment%'), 'Arts & Entertainment').otherwise(bus.cat_class))
#bus.show()

In [4]:
bus = bus.where(bus.categories != bus.cat_class)
bus = bus.select('business_id','cat_class')
bus.show()

+--------------------+-----------+
|         business_id|  cat_class|
+--------------------+-----------+
|He-G7vWjzVUysIKrf...|      Salon|
|PfOCPjBrlQAnz__NX...|Resutaurant|
|o9eMRCWt5PkpLDE0g...|Resutaurant|
|EsMcGiZaQuG1OOvL9...|       Cafe|
|XOSRcvtaKc_Q5H1SA...|Resutaurant|
|fNMVV_ZX7CJSDWQGd...|Resutaurant|
|l09JfMeQ6ynYs5MCJ...|Resutaurant|
|IQSlT5jGE6CCDhSG0...|      Salon|
|Gu-xs3NIQTj3Mj2xY...|Resutaurant|
|lHYiCS-y8AFjUitv6...|       Cafe|
|1K4qrnfyzKzGgJPBE...|Resutaurant|
|AtdXq_gu9NTE5rx4c...|       Cafe|
|Dj0S-Oe4ytRJzMGUP...|Resutaurant|
|gAy4LYpsScrj8POnC...|Resutaurant|
|nbhBRhZtdaZmMMeb2...|Resutaurant|
|1_3nOM7s9WqnJWTNu...|Resutaurant|
|FXHfcFVEfI1vVngW2...|Resutaurant|
|7gquCdaFoHZCcLYDt...|      Salon|
|lj0MiK5_fyv9df2tw...|      Salon|
|tRVx2c89coruPRwYh...|Resutaurant|
+--------------------+-----------+
only showing top 20 rows



# Yelp review data set

In [None]:
# yelp_11k = yelp.limit(11000) # subset for preprocessing (yelp_review_subset.csv)

In [None]:
# yelp_2m = yelp.limit(2000000) # subset for final resut

In [5]:
yelp = spark.read.csv("yelp_review_subset.csv", header=True, escape = '"', multiLine = True)
#join with business table
yelp = yelp.join(bus,['business_id'],how='inner')
#select the column that are needed
yelp = yelp.select('stars','date','text','useful','funny','cool','cat_class')
#turn stars rating to double type
yelp = yelp.withColumn("stars", yelp["stars"].cast("double"))
#turn text to lower case
yelp = yelp.withColumn('text',fun.lower(fun.col('text')))

In [6]:
#give each row a number 
yelp = yelp.withColumn('rownum', fun.monotonically_increasing_id())

In [7]:
#added the tone of the review based on stars ratings
yelp = yelp.withColumn('tone', fun.when(yelp.stars.between(1,2),'Negative').otherwise('Neutral'))
yelp = yelp.withColumn('tone', fun.when(yelp.stars.between(4,5),'Positive').otherwise(yelp.tone))
#give the each tone a number so we can run logistic regression
yelp = yelp.withColumn('tone_rating', fun.when(yelp.tone == 'Negative', 1).otherwise(2))
yelp = yelp.withColumn('tone_rating', fun.when(yelp.tone == 'Positive', 3).otherwise(yelp.tone_rating))
yelp.show()

+-----+----------+--------------------+------+-----+----+--------------------+------+--------+-----------+
|stars|      date|                text|useful|funny|cool|           cat_class|rownum|    tone|tone_rating|
+-----+----------+--------------------+------+-----+----+--------------------+------+--------+-----------+
|  3.0|2015-05-12|the town square s...|     3|    4|   4|         Resutaurant|     0| Neutral|          2|
|  5.0|2014-02-21|when blythe told ...|     4|    2|   6|         Resutaurant|     1|Positive|          3|
|  5.0|2015-09-22|this says it all....|     0|    0|   0|         Resutaurant|     2|Positive|          3|
|  5.0|2015-12-29|yum!! duck is a s...|     1|    0|   0|         Resutaurant|     3|Positive|          3|
|  4.0|2012-08-10|this place is a l...|     0|    0|   0|Arts & Entertainment|     4|Positive|          3|
|  3.0|2017-06-12|first time having...|     1|    0|   0|         Resutaurant|     5| Neutral|          2|
|  2.0|2011-02-22|i'm a sucker for ..

In [8]:
yelp.printSchema()

root
 |-- stars: double (nullable = true)
 |-- date: string (nullable = true)
 |-- text: string (nullable = true)
 |-- useful: string (nullable = true)
 |-- funny: string (nullable = true)
 |-- cool: string (nullable = true)
 |-- cat_class: string (nullable = true)
 |-- rownum: long (nullable = false)
 |-- tone: string (nullable = false)
 |-- tone_rating: integer (nullable = false)



# Text Manipulation

In [9]:
#create funcations 
token = Tokenizer(inputCol = 'clean_text', outputCol = 'clean_text_tok')
stop = StopWordsRemover(inputCol = 'clean_text_tok', outputCol = 'clean_text_nostop')

i_me = ['i','me']
count_all = fun.udf(lambda words : len(words), IntegerType())
count_i = fun.udf(lambda words: len([word for word in words if word not in i_me]), IntegerType())

In [10]:
#remove more than 1 white space
yelp = yelp.withColumn('clean_text', fun.regexp_replace(yelp.text, r'(\W+)',' '))
#remove the very first white space
yelp = yelp.withColumn('clean_text', fun.regexp_replace(yelp.clean_text, r'(^\s+)',''))

#Tokenization
yelp = token.transform(yelp)
# count the total words
yelp = yelp.withColumn('total_words', count_all(fun.col('clean_text_tok')))
# count i or me
yelp = yelp.withColumn('count_i_me', fun.col('total_words')-count_i(fun.col('clean_text_tok')))
#remove stopwords
yelp = stop.transform(yelp)
#count the stop words
yelp = yelp.withColumn('stop_words', fun.col('total_words')-count_all(fun.col('clean_text_nostop')))
yelp = yelp.drop('clean_text','clean_text_tok')
#find the % of stop words
yelp = yelp.withColumn('percent_stopwords', fun.col('stop_words')/fun.col('total_words')*100)

In [11]:
#classify fake reviews
yelp = yelp.withColumn('fake', fun.when(((fun.col('percent_stopwords')>=50.0) | (fun.col('count_i_me')>=10.0)), 1).otherwise(0))

In [12]:
# test the percetn
#df.groupBy('fake').count().show()

In [13]:
# rearrange table
yelp = yelp.select('rownum','stars','tone','tone_rating','date','text','useful','funny','cool','cat_class','clean_text_nostop','total_words','count_i_me','stop_words','percent_stopwords','fake')
yelp.show()

+------+-----+--------+-----------+----------+--------------------+------+-----+----+--------------------+--------------------+-----------+----------+----------+------------------+----+
|rownum|stars|    tone|tone_rating|      date|                text|useful|funny|cool|           cat_class|   clean_text_nostop|total_words|count_i_me|stop_words| percent_stopwords|fake|
+------+-----+--------+-----------+----------+--------------------+------+-----+----+--------------------+--------------------+-----------+----------+----------+------------------+----+
|     0|  3.0| Neutral|          2|2015-05-12|the town square s...|     3|    4|   4|         Resutaurant|[town, square, sh...|        180|         5|        84|46.666666666666664|   0|
|     1|  5.0|Positive|          3|2014-02-21|when blythe told ...|     4|    2|   6|         Resutaurant|[blythe, told, d,...|        306|        15|       156| 50.98039215686274|   1|
|     2|  5.0|Positive|          3|2015-09-22|this says it all....|   

# Filtering

In [14]:
resturant_noFake = yelp.where((yelp.cat_class == 'Resutaurant') & (yelp.fake == 0))

In [15]:
resturant_noFake.show()

+------+-----+--------+-----------+----------+--------------------+------+-----+----+-----------+--------------------+-----------+----------+----------+------------------+----+
|rownum|stars|    tone|tone_rating|      date|                text|useful|funny|cool|  cat_class|   clean_text_nostop|total_words|count_i_me|stop_words| percent_stopwords|fake|
+------+-----+--------+-----------+----------+--------------------+------+-----+----+-----------+--------------------+-----------+----------+----------+------------------+----+
|     0|  3.0| Neutral|          2|2015-05-12|the town square s...|     3|    4|   4|Resutaurant|[town, square, sh...|        180|         5|        84|46.666666666666664|   0|
|     2|  5.0|Positive|          3|2015-09-22|this says it all....|     0|    0|   0|Resutaurant|[says, husband, s...|         39|         1|        19|48.717948717948715|   0|
|     3|  5.0|Positive|          3|2015-12-29|yum!! duck is a s...|     1|    0|   0|Resutaurant|[yum, duck, speci.

# Logistic regression

In [16]:
df = resturant_noFake.select('rownum','tone_rating', 'clean_text_nostop')
df = df.withColumnRenamed('tone_rating','label')

In [17]:
train = df.where(df.rownum<=round(df.count()*.8)).select('label','clean_text_nostop')
test = df.where(df.rownum>round(df.count()*.8)).select('label','clean_text_nostop')

In [18]:
train.printSchema()

root
 |-- label: integer (nullable = false)
 |-- clean_text_nostop: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [19]:
TF = HashingTF(inputCol = 'clean_text_nostop', outputCol = "tfFeatures")
idf = IDF(inputCol = "tfFeatures",outputCol = "features")
lr = LogisticRegression(maxIter = 10, regParam = 0.001)

pipeline = Pipeline(stages = [TF,idf,lr])

model = pipeline.fit(train)

In [20]:
prediction = model.transform(test)

In [21]:
selected = prediction.select("clean_text_nostop","probability","prediction")

# selected = selected.withColumn('pred_tone', fun.when(selected.prediction == 1, 'Negative').otherwise('Neutral'))
# selected = selected.withColumn('pred_tone', fun.when(selected.prediction == 3, 'Positive').otherwise(selected.pred_tone))

count = 1
for row in selected.collect():
    text, prob, prediction = row
    print("Comment (%d): --> prediction = %f" %(count, prediction))
#     text, prob, prediction, pred_tone = row
#     print("Comment (%d): --> prediction = %f , %s" %(count, prediction, pred_tone))
    count = count + 1

Comment (1): --> prediction = 3.000000
Comment (2): --> prediction = 3.000000
Comment (3): --> prediction = 3.000000
Comment (4): --> prediction = 3.000000
Comment (5): --> prediction = 3.000000
Comment (6): --> prediction = 3.000000
Comment (7): --> prediction = 2.000000
Comment (8): --> prediction = 3.000000
Comment (9): --> prediction = 1.000000
Comment (10): --> prediction = 3.000000
Comment (11): --> prediction = 3.000000
Comment (12): --> prediction = 3.000000
Comment (13): --> prediction = 3.000000
Comment (14): --> prediction = 2.000000
Comment (15): --> prediction = 2.000000
Comment (16): --> prediction = 3.000000
Comment (17): --> prediction = 3.000000
Comment (18): --> prediction = 3.000000
Comment (19): --> prediction = 3.000000
Comment (20): --> prediction = 3.000000
Comment (21): --> prediction = 3.000000
Comment (22): --> prediction = 3.000000
Comment (23): --> prediction = 2.000000
Comment (24): --> prediction = 3.000000
Comment (25): --> prediction = 2.000000
Comment (

Comment (1923): --> prediction = 3.000000
Comment (1924): --> prediction = 3.000000
Comment (1925): --> prediction = 3.000000
Comment (1926): --> prediction = 3.000000
Comment (1927): --> prediction = 3.000000
Comment (1928): --> prediction = 3.000000
Comment (1929): --> prediction = 3.000000
Comment (1930): --> prediction = 3.000000
Comment (1931): --> prediction = 3.000000
Comment (1932): --> prediction = 3.000000
Comment (1933): --> prediction = 1.000000
Comment (1934): --> prediction = 2.000000
Comment (1935): --> prediction = 3.000000
Comment (1936): --> prediction = 3.000000
Comment (1937): --> prediction = 3.000000
Comment (1938): --> prediction = 3.000000
Comment (1939): --> prediction = 3.000000
Comment (1940): --> prediction = 3.000000
Comment (1941): --> prediction = 3.000000
Comment (1942): --> prediction = 1.000000
Comment (1943): --> prediction = 2.000000
Comment (1944): --> prediction = 3.000000
Comment (1945): --> prediction = 3.000000
Comment (1946): --> prediction = 3

In [22]:
test.show()

+-----+--------------------+
|label|   clean_text_nostop|
+-----+--------------------+
|    3|[bad, daddy, make...|
|    1|[believe, people,...|
|    3|[absolutely, stun...|
|    3|[anything, cheese...|
|    3|[complete, dining...|
|    3|[j, ai, l, pour, ...|
|    3|[depuis, le, temp...|
|    3|[good, idea, make...|
|    3|[salut, je, m, ap...|
|    3|[delicious, piero...|
|    3|[super, cute, pla...|
|    3|[always, crave, s...|
|    3|[ambiance, de, ma...|
|    2|[j, aime, bien, c...|
|    2|[m, really, 3, 5,...|
|    3|[second, trip, mo...|
|    1|[went, 9, 30pm, p...|
|    3|[two, scoops, gre...|
|    1|[ready, made, piz...|
|    3|[came, sunday, af...|
+-----+--------------------+
only showing top 20 rows



In [25]:
selected.show()

+--------------------+--------------------+----------+
|   clean_text_nostop|         probability|prediction|
+--------------------+--------------------+----------+
|[bad, daddy, make...|[2.64395100159604...|       3.0|
|[believe, people,...|[2.15315783599151...|       3.0|
|[absolutely, stun...|[7.15225472022820...|       3.0|
|[anything, cheese...|[3.58669227444054...|       3.0|
|[complete, dining...|[3.65052139848579...|       3.0|
|[j, ai, l, pour, ...|[2.25659076997629...|       3.0|
|[depuis, le, temp...|[1.66573968457134...|       2.0|
|[good, idea, make...|[8.23152961169429...|       3.0|
|[salut, je, m, ap...|[1.64279962931624...|       1.0|
|[delicious, piero...|[1.20181864011936...|       3.0|
|[super, cute, pla...|[6.37713350972203...|       3.0|
|[always, crave, s...|[5.31207215218006...|       3.0|
|[ambiance, de, ma...|[5.16178740684867...|       3.0|
|[j, aime, bien, c...|[2.94809043981103...|       2.0|
|[m, really, 3, 5,...|[1.55524247471535...|       2.0|
|[second, 

In [23]:
selected.groupBy('prediction').count().show()

+----------+-----+
|prediction|count|
+----------+-----+
|       1.0|  168|
|       3.0| 2048|
|       2.0|  202|
+----------+-----+



In [24]:
test.groupBy('label').count().show()

+-----+-----+
|label|count|
+-----+-----+
|    1|  337|
|    3| 1692|
|    2|  389|
+-----+-----+



In [37]:
result = selected.join(test, on = ['clean_text_nostop'], how = "inner")
result = result.select('prediction','label')
result.show()

+----------+-----+
|prediction|label|
+----------+-----+
|       3.0|    3|
|       3.0|    3|
|       1.0|    1|
|       3.0|    1|
|       3.0|    1|
|       3.0|    1|
|       3.0|    3|
|       3.0|    1|
|       3.0|    3|
|       3.0|    3|
|       2.0|    3|
|       2.0|    1|
|       3.0|    3|
|       3.0|    3|
|       3.0|    2|
|       3.0|    1|
|       3.0|    3|
|       3.0|    3|
|       3.0|    3|
|       3.0|    3|
+----------+-----+
only showing top 20 rows



In [40]:
a = result[(result.label == 1) & (result.prediction == 1)].count()
b = result[(result.label == 1) & (result.prediction == 2)].count()
c = result[(result.label == 1) & (result.prediction == 3)].count()
print(a,b,c)

119 64 154


In [41]:
d = result[(result.label == 2) & (result.prediction == 1)].count()
e = result[(result.label == 2) & (result.prediction == 2)].count()
f = result[(result.label == 2) & (result.prediction == 3)].count()
print(d,e,f)

24 76 289


In [42]:
g = result[(result.label == 3) & (result.prediction == 1)].count()
h = result[(result.label == 3) & (result.prediction == 2)].count()
i = result[(result.label == 3) & (result.prediction == 3)].count()
print(g,h,i)

25 62 1605


In [43]:
# correct prediction / total result
(a+e+i)/(a+b+c+d+e+f+g+h+i)

0.7444168734491315