# SIADS 516 Homework 4: Spark SQL
Version 1.0.20200221.1
### Dr. Chris Teplovs, School of Information, University of Michigan
<small><a rel="license" href="http://creativecommons.org/licenses/by-nc-sa/4.0/"><img alt="Creative Commons License" style="border-width:0" src="https://i.creativecommons.org/l/by-nc-sa/4.0/88x31.png" /></a>This work is licensed under a <a rel="license" href="http://creativecommons.org/licenses/by-nc-sa/4.0/">Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International License</a>.

This homework assignment uses the Yelp Academic dataset, with which you should now be familiar.
We have created a few cells to get you started, but you're largely on your own to devise solutions to the
"real-world" questions below.

The best solutions will use spark.sql() calls as a preferred way to query the dataset and also use the fewest number of steps.  For example, to find the answer to "How many users have more than 100 "cool" votes?", this:
```
query = """
SELECT count(*) FROM user WHERE cool > 100
"""
spark.sql(query).show()
```
is preferable to:
```
user.filter('cool > 100').show()
```
or 
```
query = """
SELECT * FROM user
"""
df = spark.sql(query)
df.filter('cool > 100').show()
```
(Note that the last number is somewhat ridiculous.)

Our usual Spark mantra:

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName('My First Spark application') \
    .getOrCreate() 

sc = spark.sparkContext

Load the JSON files:

In [2]:
review = spark.read.json('data/yelp_academic/yelp_academic_dataset_review.json.gz')

In [3]:
business = spark.read.json('data/yelp_academic/yelp_academic_dataset_business.json.gz')
checkin = spark.read.json('data/yelp_academic/yelp_academic_dataset_checkin.json.gz')
tip = spark.read.json('data/yelp_academic/yelp_academic_dataset_tip.json.gz')
user = spark.read.json('data/yelp_academic/yelp_academic_dataset_user.json.gz')

Create temp views for the DataFrames:

In [4]:
business.createOrReplaceTempView("business")
checkin.createOrReplaceTempView("checkin")
tip.createOrReplaceTempView("tip")
review.createOrReplaceTempView("review")
user.createOrReplaceTempView("user")

## Q1. How many users have more than 500 fans?

In [4]:
#To get an idea of how to structure this query, let's print the schema of the users df

user.printSchema()

root
 |-- average_stars: double (nullable = true)
 |-- compliment_cool: long (nullable = true)
 |-- compliment_cute: long (nullable = true)
 |-- compliment_funny: long (nullable = true)
 |-- compliment_hot: long (nullable = true)
 |-- compliment_list: long (nullable = true)
 |-- compliment_more: long (nullable = true)
 |-- compliment_note: long (nullable = true)
 |-- compliment_photos: long (nullable = true)
 |-- compliment_plain: long (nullable = true)
 |-- compliment_profile: long (nullable = true)
 |-- compliment_writer: long (nullable = true)
 |-- cool: long (nullable = true)
 |-- elite: string (nullable = true)
 |-- fans: long (nullable = true)
 |-- friends: string (nullable = true)
 |-- funny: long (nullable = true)
 |-- name: string (nullable = true)
 |-- review_count: long (nullable = true)
 |-- useful: long (nullable = true)
 |-- user_id: string (nullable = true)
 |-- yelping_since: string (nullable = true)



In [6]:
query = """
SELECT count(*) FROM user WHERE fans > 500
"""
spark.sql(query).show()

+--------+
|count(1)|
+--------+
|     185|
+--------+



There are a total of 185 users than have over 500 fans.

## Q2. How many businesses from Madison, Wisconsin are represented in the dataset?

In [None]:
#To get an idea of how to structure this query, let's print the schema of the business df

In [31]:
business.printSchema()

root
 |-- address: string (nullable = true)
 |-- attributes: struct (nullable = true)
 |    |-- AcceptsInsurance: string (nullable = true)
 |    |-- AgesAllowed: string (nullable = true)
 |    |-- Alcohol: string (nullable = true)
 |    |-- Ambience: string (nullable = true)
 |    |-- BYOB: string (nullable = true)
 |    |-- BYOBCorkage: string (nullable = true)
 |    |-- BestNights: string (nullable = true)
 |    |-- BikeParking: string (nullable = true)
 |    |-- BusinessAcceptsBitcoin: string (nullable = true)
 |    |-- BusinessAcceptsCreditCards: string (nullable = true)
 |    |-- BusinessParking: string (nullable = true)
 |    |-- ByAppointmentOnly: string (nullable = true)
 |    |-- Caters: string (nullable = true)
 |    |-- CoatCheck: string (nullable = true)
 |    |-- Corkage: string (nullable = true)
 |    |-- DietaryRestrictions: string (nullable = true)
 |    |-- DogsAllowed: string (nullable = true)
 |    |-- DriveThru: string (nullable = true)
 |    |-- GoodForDancing: str

In [17]:
#It seems like the relevant data may be in the city column. Let's see how it's formatted.

query = """
SELECT city FROM business LIMIT 10
"""
spark.sql(query).show()

+-------------+
|         city|
+-------------+
|      Phoenix|
|  Mississauga|
|    Charlotte|
|     Goodyear|
|    Charlotte|
|  Mississauga|
|      Calgary|
|    Las Vegas|
|     Glendale|
|Fairview Park|
+-------------+



In [18]:
#Now let's get the count of businesses based in Madison
query = """
SELECT count(*) from business WHERE city = 'Madison' AND state = 'WI'
"""
spark.sql(query).show()

+--------+
|count(1)|
+--------+
|    3493|
+--------+



There are 3493 businesses that are based in Madison, WI.

## Q3: How many users have more than 500 fans?

In [19]:
# This question is a duplicate, so we will ignore it

## Q4: Which US states are represented in the data set?  Use full names of states (you will need to look up the list of state abbreviations is you don't know them).

In [None]:
#In order to answer this question, let's utilize the business dataframe, given that it contains a state column

In [20]:
query = """
SELECT DISTINCT state FROM business
"""
spark.sql(query).show()

+-----+
|state|
+-----+
|   AZ|
|   SC|
|  BAS|
|   NJ|
|   VA|
|   QC|
|   BC|
|   NV|
|   WI|
|   CA|
|   NE|
|   CT|
|   NC|
|   VT|
|   IL|
|  XGL|
|   WA|
|   AL|
|  XGM|
|   OH|
+-----+
only showing top 20 rows



Selecting all distinct values from states returns this list. After disregarding the abbreviations that don't reference US administered locales, there appears to be 15 states present within the dataset. 

## Q5: What is the text of the funniest review?

In [25]:
#To structure this query, we'll utilize the review dataframe
review.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- cool: long (nullable = true)
 |-- date: string (nullable = true)
 |-- funny: long (nullable = true)
 |-- review_id: string (nullable = true)
 |-- stars: double (nullable = true)
 |-- text: string (nullable = true)
 |-- useful: long (nullable = true)
 |-- user_id: string (nullable = true)



In [31]:
query = """
SELECT text, MAX(funny) AS maximum_funny
FROM review
GROUP BY text
ORDER BY maximum_funny DESC
LIMIT 1
"""
spark.sql(query).show()

+--------------------+-------------+
|                text|maximum_funny|
+--------------------+-------------+
|Flew to Arizona a...|         1290|
+--------------------+-------------+



Based on the output, the funniest review was rated as such a total of 1290 times.

## Q6: Which review(s) has/have the most words?  What do you notice about the maximum word count?

In [137]:
#In order to answer this question, we will try to create two user defined function
#The first udf will get the total word count for row of text
#The second udf will strip the square brackets from the word_count column


from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType

def count_words_in_each_review(review):
    return [len(x.split()) for x in review]

def list_to_string_without_brackets(row):
    return str(row).replace('[','').replace(']','')

count_words = F.udf(lambda x: count_words_in_each_review(x.split('. | ! | ? ')))

no_brackets = F.udf(lambda x : list_to_string_without_brackets(x))

review =  review.withColumn('word_count', count_words(review['text']))

review =  review.withColumn('word_count_no_brackets', no_brackets(review['word_count']))

review = review.withColumn("word_count_no_brackets", review["word_count_no_brackets"].cast(IntegerType()))

review.select('word_count_no_brackets').show(10)



+----------------------+
|word_count_no_brackets|
+----------------------+
|                    39|
|                   274|
|                   109|
|                    68|
|                   673|
|                   295|
|                   244|
|                    72|
|                   263|
|                    19|
+----------------------+
only showing top 10 rows



In [133]:
#So we avoid overwriting the original 'review' view, let's create another temp one

review_temp = review
review_temp.createOrReplaceTempView("review_temp")

review_temp.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- cool: long (nullable = true)
 |-- date: string (nullable = true)
 |-- funny: long (nullable = true)
 |-- review_id: string (nullable = true)
 |-- stars: double (nullable = true)
 |-- text: string (nullable = true)
 |-- useful: long (nullable = true)
 |-- user_id: string (nullable = true)
 |-- word_count: string (nullable = true)
 |-- word_count_no_brackets: integer (nullable = true)



In [136]:
# Now that we've created a word count column, lets get the review with the highest word count, along with its id
query = """
SELECT review_id, MAX(word_count_no_brackets) AS MAX_COUNT
FROM review_temp
GROUP BY review_id
ORDER BY MAX_COUNT DESC
LIMIT 5
"""
spark.sql(query).show()


+--------------------+---------+
|           review_id|MAX_COUNT|
+--------------------+---------+
|6-7Krt1orAWzvEEGw...|     1056|
|zskhSqDePgk7NWUV_...|     1052|
|LFQgXFHWlYmMCqICp...|     1051|
|HUr1-JjbUwbp5GKAS...|     1051|
|GmnfN9kvRx_KMS6jH...|     1049|
+--------------------+---------+



The review with the max word count, based on our word count calculation logic, dials in at 1056 words in total.

## Q7: What are the names of the top 10 users who provided the most tips?

In [139]:
# It is likely that the tip dataframe will be useful here. Let's print the schema
tip.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- compliment_count: long (nullable = true)
 |-- date: string (nullable = true)
 |-- text: string (nullable = true)
 |-- user_id: string (nullable = true)



In [21]:
#In order to answer this, we can likely just get a count of the user IDs that appear the most often in this dataframe
#Naturally, the more a user_id appears, the more tips that user is writing. We can create a new column called tip_count.
#We can pull in the name column by joining tip on user, since these dataframes have a one to many relationship.



inner_join = user.join(tip, user.user_id == tip.user_id)

inner_join.createOrReplaceTempView("inner_join")

inner_join.printSchema()

root
 |-- average_stars: double (nullable = true)
 |-- compliment_cool: long (nullable = true)
 |-- compliment_cute: long (nullable = true)
 |-- compliment_funny: long (nullable = true)
 |-- compliment_hot: long (nullable = true)
 |-- compliment_list: long (nullable = true)
 |-- compliment_more: long (nullable = true)
 |-- compliment_note: long (nullable = true)
 |-- compliment_photos: long (nullable = true)
 |-- compliment_plain: long (nullable = true)
 |-- compliment_profile: long (nullable = true)
 |-- compliment_writer: long (nullable = true)
 |-- cool: long (nullable = true)
 |-- elite: string (nullable = true)
 |-- fans: long (nullable = true)
 |-- friends: string (nullable = true)
 |-- funny: long (nullable = true)
 |-- name: string (nullable = true)
 |-- review_count: long (nullable = true)
 |-- useful: long (nullable = true)
 |-- user_id: string (nullable = true)
 |-- yelping_since: string (nullable = true)
 |-- business_id: string (nullable = true)
 |-- compliment_count: long

In [22]:
#Now we can structure the query
query = """
SELECT name, COUNT(*) as NAME_COUNT
FROM inner_join 
GROUP BY name
ORDER BY NAME_COUNT DESC
LIMIT 10
"""

spark.sql(query).show()

+--------+----------+
|    name|NAME_COUNT|
+--------+----------+
| Michael|     11007|
|    John|     10819|
|    Mike|     10320|
|Jennifer|     10057|
|   Chris|      9966|
|   David|      9032|
|Michelle|      7466|
|   Jason|      7226|
| Jessica|      6969|
|    Lisa|      6011|
+--------+----------+



The output displays the top ten tip writers within the yelp dataset, with Michael taking the number one spot.

## Q8: List the names, number of reviews of businesses in Arizona and total number of reviews of the top 5 users (as determined by who has created the most number of reviews of businesses in Arizona).  Include a column that shows the percentage of reviews that are of businesses from Arizona.  The first row of the results should be:
```
+--------+--------+-----------+---------+
|    name|az_count|total_count|  percent|
+--------+--------+-----------+---------+
|    Brad|    1637|       1642|99.695496|
+--------+--------+-----------+---------+
```

In [5]:
#For this question, we'll likely have to create another inner join. We'll join review.business_id on business.business_id
#since the 'name' columns contain different data in both dataframes, we can rename one of them so there is no confusion
#after that, we can create another inner join, of inner_join_2.user_id == user.user_id, to pull in the users' names.
#we'll make sure to also rename the review_count column in business so there is no ambiguity


business_temp = business.withColumnRenamed("name","business_name").withColumnRenamed("review_count", "business_review_count")

inner_join_2 = business_temp.join(review, business_temp.business_id == review.business_id)

inner_join_2.createOrReplaceTempView("inner_join_2")

inner_join_3 = inner_join_2.join(user, inner_join_2.user_id == user.user_id)

inner_join_3.createOrReplaceTempView("inner_join_3")


In [6]:
query = """
SELECT name, COUNT(state) AS az_count, review_count AS total_count, (COUNT(state)/review_count)*100 AS percent
FROM inner_join_3 
WHERE state = 'AZ'
GROUP BY name, total_count
ORDER BY az_count DESC
LIMIT 5
"""

spark.sql(query).show()

+--------+--------+-----------+-----------------+
|    name|az_count|total_count|          percent|
+--------+--------+-----------+-----------------+
|    Brad|    1637|       1642|99.69549330085262|
|   Karen|    1559|       2340|66.62393162393163|
|Jennifer|    1250|       1929|64.80041472265422|
|    Gabi|    1151|       1932|59.57556935817805|
|    Judy|    1059|       1193|88.76781223805533|
+--------+--------+-----------+-----------------+



This output displays the top five users, in terms of total number of establishments reviewed that are located within Arizona, ordered by total count of their Arizona-based reviews. In terms of users with the largest proportion of reviews based on Arizona-based establishments, Brad takes the number one spot, at close to 100%, while Judy takes second place, at about 89%.