## Spark + Yelp Exercise
Spark exercise leveraging the Yelp Academic Dataset to improve familiarity with PySpark and work on basics of Spark UDFs.

`%%time` functions included, but results may vary by operating system configuration.
_________________________

First, spin up a Spark session.

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName('test1') \
    .getOrCreate() 

sc = spark.sparkContext

Load datasets (this will take a few minutes)

In [2]:
# business = spark.read.json('../assets/yelp_academic/yelp_academic_dataset_business.json.gz')
# checkin = spark.read.json('../assets/yelp_academic/yelp_academic_dataset_checkin.json.gz')
# review = spark.read.json('../assets/yelp_academic/yelp_academic_dataset_review.json.gz')
# tip = spark.read.json('../assets/yelp_academic/yelp_academic_dataset_tip.json.gz')
# user = spark.read.json('../assets/yelp_academic/yelp_academic_dataset_user.json.gz')

business = spark.read.json('../non_auto_assignments/data/yelp_academic/yelp_academic_dataset_business.json.gz')
checkin = spark.read.json('../non_auto_assignments/data/yelp_academic/yelp_academic_dataset_checkin.json.gz')
review = spark.read.json('../non_auto_assignments/data/yelp_academic/yelp_academic_dataset_review.json.gz')
tip = spark.read.json('../non_auto_assignments/data/yelp_academic/yelp_academic_dataset_tip.json.gz')
user = spark.read.json('../non_auto_assignments/data/yelp_academic/yelp_academic_dataset_user.json.gz')

#### How many users have received more than 5000 "cool" compliments?

In [3]:
user.printSchema()

root
 |-- average_stars: double (nullable = true)
 |-- compliment_cool: long (nullable = true)
 |-- compliment_cute: long (nullable = true)
 |-- compliment_funny: long (nullable = true)
 |-- compliment_hot: long (nullable = true)
 |-- compliment_list: long (nullable = true)
 |-- compliment_more: long (nullable = true)
 |-- compliment_note: long (nullable = true)
 |-- compliment_photos: long (nullable = true)
 |-- compliment_plain: long (nullable = true)
 |-- compliment_profile: long (nullable = true)
 |-- compliment_writer: long (nullable = true)
 |-- cool: long (nullable = true)
 |-- elite: string (nullable = true)
 |-- fans: long (nullable = true)
 |-- friends: string (nullable = true)
 |-- funny: long (nullable = true)
 |-- name: string (nullable = true)
 |-- review_count: long (nullable = true)
 |-- useful: long (nullable = true)
 |-- user_id: string (nullable = true)
 |-- yelping_since: string (nullable = true)



In [4]:
res1 = user.filter(user['compliment_cool'] > 5000).count()
res1

79

#### What are the top 10 most complimented businesses?

In [5]:
%%time
from pyspark.sql.functions import col

# Reduce the tip dataset to the top 10 most complimented
top_10 = tip.sort(col("compliment_count").desc()).head(10)
top_10 = spark.createDataFrame(top_10)

# Join the two on a left join (just top 10), using the business id, and return the name and compliment count
res2 = top_10.join(business, top_10.business_id == business.business_id, how='left') \
           .select(business.name, top_10.compliment_count)

# Show DataFrame
#q2.show()
res2.toPandas()

CPU times: user 254 ms, sys: 39 ms, total: 293 ms
Wall time: 6.08 s


Unnamed: 0,name,compliment_count
0,A Peaceful Farewell,15
1,1st Pet Veterinary Centers,12
2,Department of Motor Vehicles,11
3,Baladie Café,9
4,Moko Ramen Bar,8
5,Dr. William M. Jacobsen MD,7
6,Bomboba,7
7,Southwest Airlines,7
8,Burgh'ers Brewing,7
9,Pineapple Park,6


Interestingly, the top number of compliments is only 15. I would have thought with a large dataset like this it would have yielded at least one or two with some outlier amount, especially with large companies included such as Southwest Airlines. Then again...typically the loudest (and angriest) voices are the ones posted. Just an interesting observation!

#### What are the top 10 most useful reviews?

In [6]:
%%time
# Sort dataset by most useful, descending, and take top 10
most_useful = review.sort(col("useful").desc()).head(10)

# Put back into dataframe form
most_useful = spark.createDataFrame(most_useful)

# Select desired columns (useful ranking, and text of the review)
most_useful = most_useful.select(most_useful.useful, most_useful.text)

# Show dataframe
#most_useful.show()
most_useful.toPandas()

CPU times: user 4.07 ms, sys: 10.1 ms, total: 14.2 ms
Wall time: 59 s


Unnamed: 0,useful,text
0,1241,Dinner for 1.\n\n- Preface\nI went to Amy's Ba...
1,1122,"In retrospect, I should have known better than..."
2,970,This restaurant is horrible. \n\nFirst off the...
3,846,This is the second time I've been here and the...
4,808,I actually suspect that a lot of people who ha...
5,781,"""scary that people like this can own a biz; sa..."
6,694,I really wanted to like this place.\nIt's clea...
7,668,A few years ago after attending a movie across...
8,650,I'd put zero stars if it was an option- the fo...
9,578,We were very disappointed with this restaurant...


At the midpoint of this exercise, I saw that most were lower-starred ratings. I found that interesting as it would appear that unfavorable reviews are often found to be more helpful - realistically, because they are probably more honest about what is wrong instead of everything being as someone expected. Funnily enough, after further research, Amy's Baking Company was featured on Kitchen Nightmares - so the public exposure may have resulted in more reviews.

#### Top 10 most useful <i>positive</i> reviews?
- 5-star review considered "positive"
- top ten 5 star reviews by order, did not consider ties

In [7]:
%%time
# Sort dataset by most useful, descending, and take top 10
most_useful_pos = review['stars','text','useful'].filter(review['stars'] == 5) \
                                                 .sort(col("useful").desc()).head(10)

# Put back into dataframe form
most_useful_pos = spark.createDataFrame(most_useful_pos)

# Select desired columns (useful ranking, and text of the review)
most_useful_pos = most_useful_pos.select(most_useful_pos.useful, most_useful_pos.text)

# Show dataframe
#most_useful.show()
most_useful_pos.toPandas()

CPU times: user 11.2 ms, sys: 4.62 ms, total: 15.8 ms
Wall time: 54.4 s


Unnamed: 0,useful,text
0,358,"The Wynn Hotel. One word. Opulent. \n\nWynn, i..."
1,278,"Wayne does a fantastic Job, always on time, my..."
2,241,This is one of the most interesting classic lo...
3,215,"Wenn man auf dem Strip zu Fuß unterwegs ist, s..."
4,215,I stopped by Echo and Rig during lunch time on...
5,210,After spending a bunch of money on lunches and...
6,208,Why spend hundreds of dollars bribing doucheba...
7,207,Unser eher bescheidenes Motel war in der Nähe ...
8,207,Auf unserer Rundreise haben wir häufig die Res...
9,203,Auf unserer Casino Besichtigungstour sind wir ...


#### During what hour of the day do most checkins occur?

In [8]:
checkin.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- date: string (nullable = true)



In [9]:
%%time
import pyspark.sql.functions as f
from pyspark.sql.functions import udf, explode
from pyspark.sql.types import ArrayType,StringType
from pyspark.sql.functions import hour, col

datesplit = udf(lambda x: x.split(','),ArrayType(StringType()))

checkin_hour = checkin.withColumn("date_exploded", f.explode(datesplit(col("date")))) \
                      .withColumn('hour', hour(col("date_exploded"))) \
                      .groupby(['hour']).count().sort('count', ascending=False)

checkin_hour.show(25) # show 24 hours + nulls

+----+--------+
|hour|   count|
+----+--------+
|null|18927198|
|  19|   13481|
|  23|   13207|
|  22|   13191|
|  18|   13177|
|  21|   12960|
|  20|   12553|
|  17|   12304|
|   0|   11577|
|  16|   10416|
|   1|    9803|
|   2|    7258|
|  15|    7000|
|   3|    5225|
|  14|    4340|
|   4|    3547|
|  13|    2910|
|   5|    2348|
|  12|    1597|
|   6|    1450|
|   7|    1020|
|  11|     824|
|   8|     809|
|  10|     483|
|   9|     470|
+----+--------+

CPU times: user 25.7 ms, sys: 8.02 ms, total: 33.7 ms
Wall time: 3min 1s


The PySpark `hour` function used above generates null values because there are some spacing issues in the data. We will convert the string into an array, and group from there.

In [10]:
%%time
from pyspark.sql.types import IntegerType

new_datesplit = udf(lambda x: int(x.split()[1].split(':')[0]), IntegerType())

checkin_hour_v2 = checkin.withColumn("date_exploded", f.explode(datesplit(col("date")))) \
                         .withColumn("hour", new_datesplit(col("date_exploded"))) \
                         .groupby(['hour']).count() \
                         .sort('hour')

checkin_hour_v2.show(24)

+----+-------+
|hour|  count|
+----+-------+
|   0|1491176|
|   1|1561788|
|   2|1411255|
|   3|1078939|
|   4| 747453|
|   5| 485129|
|   6| 321764|
|   7| 231417|
|   8| 151065|
|   9| 100568|
|  10|  88486|
|  11| 111769|
|  12| 178910|
|  13| 270145|
|  14| 418340|
|  15| 617830|
|  16| 852076|
|  17|1006102|
|  18|1272108|
|  19|1502271|
|  20|1350195|
|  21|1238808|
|  22|1257437|
|  23|1344117|
+----+-------+

CPU times: user 26.5 ms, sys: 2.14 ms, total: 28.7 ms
Wall time: 3min 16s


The additional udf adds a few seconds to the execution time, but for the additional value add it is worthwhile.

#### Sentiment Analysis

List the  50 most common non-stopword words that are unique to <i>positive</i> reviews, and 50 most common unique to <i>negative</i> reviews. We will consider `1` star as negative and `5` as positive.

<b>Note:</b> There are some considerations here, such as embellishment and typos, but the data will be left as-is at this juncture so as to not assume intent of the writer.

In [11]:
from pyspark.sql.types import MapType 

def stop_word_cleaning(text):
    STOPWORDS = {'i', 'we', 'ourselves', 'hers', 'between', 'yourself', 'but', 'again', 'there', 
                 'about', 'once', 'during', 'out', 'very', 'having', 'with', 'they', 'own', 'an', 
                 'be', 'some', 'for', 'do', 'its', 'yours', 'such', 'into', 'of', 'most', 'itself', 
                 'other', 'off', 'is', 's', 'am', 'or', 'who', 'as', 'from', 'him', 'each', 'the', 
                 'themselves', 'until', 'below', 'are', 'we', 'these', 'your', 'his', 'through', 
                 'don', 'nor', 'me', 'were', 'her', 'more', 'himself', 'this', 'down', 'should', 
                 'our', 'their', 'while', 'above', 'both', 'up', 'to', 'ours', 'had', 'she', 'all', 
                 'no', 'when', 'at', 'any', 'before', 'them', 'same', 'and', 'been', 'have', 'in', 
                 'will', 'on', 'does', 'yourselves', 'then', 'that', 'because', 'what', 'over', 
                 'why', 'so', 'can', 'did', 'not', 'now', 'under', 'he', 'you', 'herself', 'has', 
                 'just', 'where', 'too', 'only', 'myself', 'which', 'those', 'i', 'after', 'few', 
                 'whom', 't', 'being', 'if', 'theirs', 'my', 'against', 'a', 'by', 'doing', 'it',
                 'how', 'further', 'was', 'here', 'than'}
    WORD_RE = re.compile(r"[\w']+") 

    r_list = []
    for word in WORD_RE.findall(text.lower()):
        if word not in STOPWORDS:
            r_list.append(word)
            
    return r_list

stop_words_udf = udf(lambda x: stop_word_cleaning(x), ArrayType(StringType()))

In [12]:
reduced_pos_data = review['stars','text'].filter(review['stars'] == 5)
reduced_neg_data = review['stars','text'].filter(review['stars'] == 1)

In [13]:
# positive word count
pos_cnt = reduced_pos_data.withColumn('word', f.explode(stop_words_udf(col('text')))) \
                          .groupBy('word').count() \
                          .alias("pos_cnt")

# negative word count
neg_cnt = reduced_neg_data.withColumn('word', f.explode(stop_words_udf(col('text')))) \
                          .groupBy('word').count() \
                          .alias("neg_cnt")

# Reduce computational time for overlapping words
# intersections = pos_cnt.select('word').intersect(neg_cnt.select('word')).rdd.flatMap(lambda x:x).collect()

In [14]:
# top 50 positive words
top_50_pos = pos_cnt.sort(col('count').desc(), ascending=False)#.show(50)

In [15]:
# top 50 negative words
top_50_net = neg_cnt.sort(col('count').desc(), ascending=False)#.show(50)

As of 12/2021, the `.show()` function is returning a Java error. Looking into this, as it may be a dependency or package version issue.

Spun up this alternative solution in the meantime:

In [16]:
STOPWORDS = {'i', 'we', 'ourselves', 'hers', 'between', 'yourself', 'but', 'again', 'there', 'about', 'once', 'during', 'out', 'very', 'having', 'with', 'they', 'own', 'an', 'be', 'some', 'for', 'do', 'its', 'yours', 'such', 'into', 'of', 'most', 'itself', 'other', 'off', 'is', 's', 'am', 'or', 'who', 'as', 'from', 'him', 'each', 'the', 'themselves', 'until', 'below', 'are', 'we', 'these', 'your', 'his', 'through', 'don', 'nor', 'me', 'were', 'her', 'more', 'himself', 'this', 'down', 'should', 'our', 'their', 'while', 'above', 'both', 'up', 'to', 'ours', 'had', 'she', 'all', 'no', 'when', 'at', 'any', 'before', 'them', 'same', 'and', 'been', 'have', 'in', 'will', 'on', 'does', 'yourselves', 'then', 'that', 'because', 'what', 'over', 'why', 'so', 'can', 'did', 'not', 'now', 'under', 'he', 'you', 'herself', 'has', 'just', 'where', 'too', 'only', 'myself', 'which', 'those', 'i', 'after', 'few', 'whom', 't', 'being', 'if', 'theirs', 'my', 'against', 'a', 'by', 'doing', 'it', 'how', 'further', 'was', 'here', 'than'}

# Define UDFs for use in the analysis
# For this exercise, for now, we will consider 3+ stars as "positive" and 1 or 2 stars as negative.
review_type = udf(lambda x: '+' if (x >= 5.0) else ('-' if (x <= 1.0) else 'Neutral'),StringType())
rev_wrd_lst = udf(lambda x: [w.lower() for w in x.split() if w.lower() not in STOPWORDS], ArrayType(StringType()))

# Create columns assigning positive and negative sentiment, as well as
# a list of words contained in the text that excludes stopwords
rev = review.select('review_id', review_type('stars').alias('pos_neg'), rev_wrd_lst('text').alias('words'))
#rev = spark.createDataFrame(rev) # Sometimes this is required to run without errors, sometimes it is not.... ?

# Explode so each word gets its own row and retains its assignment of positive and negative.
rev = rev.withColumn('word',explode('words'))

# Groupby sentiment and word, with an aggregate count of the word's use.
v2 = rev.groupby(['pos_neg','word']).agg(f.count('word').alias('wrd_cnt'))

# Create separate list for positive sentiment
v2_pos = v2.filter(v2['pos_neg'] == '+').sort(col('wrd_cnt').desc()).head(50)
v2_pos = spark.createDataFrame(v2_pos)

# Create separate list for negative sentiment
v2_neg = v2.filter(v2['pos_neg'] == '-').sort(col('wrd_cnt').desc()).head(50)
v2_neg = spark.createDataFrame(v2_neg)

# Show dataframes
v2_pos.show(50)
v2_neg.show(50)

+-------+----------+-------+
|pos_neg|      word|wrd_cnt|
+-------+----------+-------+
|      +|     great|1336194|
|      +|     place| 992555|
|      +|      food| 830427|
|      +|      good| 712097|
|      +|       get| 691053|
|      +|      like| 660552|
|      +|      best| 648208|
|      +|       one| 643653|
|      +|      time| 640396|
|      +|   service| 619802|
|      +|      it's| 611242|
|      +|      love| 602691|
|      +|    always| 579908|
|      +|    really| 573165|
|      +|        go| 565067|
|      +|     would| 555440|
|      +|      also| 531716|
|      +|      back| 515453|
|      +|definitely| 479365|
|      +|      i've| 475280|
|      +|      even| 414864|
|      +|     staff| 413054|
|      +|       got| 411035|
|      +| recommend| 407038|
|      +|         -| 400456|
|      +|       i'm| 382370|
|      +|        us| 371761|
|      +|  friendly| 362067|
|      +|     first| 359682|
|      +|      nice| 359092|
|      +|      made| 347581|
|      +|   am

______________________
<div style="text-align: right"><sub>Exercise adapted and modified from UMSI homework assignment for SIADS 516.</sub></div>