### Commercial Data Analysis

### University of Virginia
### DS 5110: Big Data Systems
### Last Updated: February 9, 2022

### INSTRUCTIONS  
In this assignment, you will work with a dataset containing information about businesses.  Each record is a business location.  Follow the steps below, writing and running the code in blocks, and displaying the solutions.  

Each question part is worth 1 POINT, for a total of 15 POINTS.

Hint: reaching deeper fields in json hierarchy can be done like this:  

`df.select('address.street_number')`

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
        .appName("comm") \
        .getOrCreate()

ModuleNotFoundError: No module named 'pyspark'

In [2]:
pwd

'/gpfs/gpfs0/project/SDS/instructional/ds5559/large_datasets'

In [3]:
# note that read.json can read a zipped JSON directly
df = spark.read.json('part-00000-a159c41a-bc58-4476-9b78-c437667f9c2b-c000.json.gz')

In [4]:
df.show(1)

+--------------------+-------------+-----+----------------+----+-------+--------------------+--------------------+
|             address|business_tags|hours|              id|menu|reviews|                urls|             webpage|
+--------------------+-------------+-----+----------------+----+-------+--------------------+--------------------+
|{Woodburn, {45.15...|         null| null|000023995a540868|null|     []|{woodburn.k12.or....|{Educational Tech...|
+--------------------+-------------+-----+----------------+----+-------+--------------------+--------------------+
only showing top 1 row



**1. (1 PT) Read in the dataset and show the number of records**

In [5]:
record_count = df.count()
print("Count of records:", record_count)

Count of records: 154679


**2. (1 PT) Print the schema**

In [6]:
df.printSchema()

root
 |-- address: struct (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- coordinates: struct (nullable = true)
 |    |    |-- lat: double (nullable = true)
 |    |    |-- lon: double (nullable = true)
 |    |-- country: string (nullable = true)
 |    |-- county: string (nullable = true)
 |    |-- full_address: string (nullable = true)
 |    |-- highway_number: string (nullable = true)
 |    |-- is_headquarters: boolean (nullable = true)
 |    |-- is_parsed: boolean (nullable = true)
 |    |-- post_direction: string (nullable = true)
 |    |-- pre_direction: string (nullable = true)
 |    |-- secondary_number: string (nullable = true)
 |    |-- state: string (nullable = true)
 |    |-- street: string (nullable = true)
 |    |-- street_address: string (nullable = true)
 |    |-- street_number: string (nullable = true)
 |    |-- street_type: string (nullable = true)
 |    |-- type_of_address: string (nullable = true)
 |    |-- zip: string (nullable = true)
 |    |-- 

**3. (1 PT) Show the first 5 records**  

In [7]:
df.show(5)

+--------------------+--------------------+--------------------+----------------+----+--------------------+--------------------+--------------------+
|             address|       business_tags|               hours|              id|menu|             reviews|                urls|             webpage|
+--------------------+--------------------+--------------------+----------------+----+--------------------+--------------------+--------------------+
|{Woodburn, {45.15...|                null|                null|000023995a540868|null|                  []|{woodburn.k12.or....|{Educational Tech...|
|{Hialeah, {25.884...|{[], [{has_atm, Y...|{null, 1900, null...|0000821a1394916e|null|                null|{null, [yelp.com]...|                null|
|{Rochester, {43.1...|{[], [{accepts_cr...|{null, 1700, null...|000136e65d50c3b7|null|[{New (to me) qui...|{usps.com, [yelp....|{Welcome | USPS G...|
|{West Palm Beach,...|                null|                null|00014329a70b9869|null|              

**4. (1 PT) Location**  

Count the number of records where the city is Houston

In [8]:
from pyspark.sql.functions import col, length

In [9]:
houston_count = df.filter(col("address.city") == "Houston").count()
print("Count of records where the city is Houston:", houston_count)

Count of records where the city is Houston: 1668


**5. (1 PT) Hours**  

Count the number of records where closing time on Friday is 7pm

In [10]:
friday_closing_count = df.filter(col("hours.friday_close") == "1900").count()
print("Count of records where closing time on Friday is 7pm:", friday_closing_count)

Count of records where closing time on Friday is 7pm: 3305


**6. (1 PT) Location and Hours**  

Count the number of records where city is Houston and closing time on Friday is 7pm

In [11]:
houston_friday_closing_count = df.filter((col("address.city") == "Houston") & (col("hours.friday_close") == "1900")).count()
print("Count of records where city is Houston and closing time on Friday is 7pm:", houston_friday_closing_count)

Count of records where city is Houston and closing time on Friday is 7pm: 42


**7. (1 PT) Price Range**  

Price range is quoted in number of dollar signs.  Count the number of records with price range greater than or equal to three.

In [12]:
price_range_count = df.filter((col("menu.price_range")) >= 3).count()
print("Count of records with price range greater than or equal to three:", price_range_count)

Count of records with price range greater than or equal to three: 115


**8. (1 PT) COMPANY HEADQUARTERS**  

For the `address.is_headquarters` field:  
how many locations are HQ / are NOT HQ / are null?

In [13]:
# Count the number of locations that are HQ
hq_count = df.filter(col("address.is_headquarters") == True).count()

# Count the number of locations that are NOT HQ
not_hq_count = df.filter(col("address.is_headquarters") == False).count()

# Count the number of locations where is_headquarters field is null
null_hq_count = df.filter(col("address.is_headquarters").isNull()).count()

print("Count of locations that are HQ:", hq_count)
print("Count of locations that are NOT HQ:", not_hq_count)
print("Count of locations where is_headquarters field is null:", null_hq_count)

Count of locations that are HQ: 318
Count of locations that are NOT HQ: 66736
Count of locations where is_headquarters field is null: 87625


**9. (1 PT) Webpage URLs**  

Register the dataframe as a temp table.  
Next, use Spark SQL to select only the webpage title column, filtering on rows where the webpage url (accessed under `webpage.url`) is *Target.com*. 

Show only one resulting row and don't truncate the output.

In [14]:
df.createOrReplaceTempView("business_data")

In [15]:
from pyspark.sql import SparkSession

In [16]:
spark = SparkSession.builder.getOrCreate()
query = "SELECT webpage.title FROM business_data WHERE webpage.url = 'Target.com' LIMIT 1"
result = spark.sql(query)
result.show(truncate=False)

+-------------------------------+
|title                          |
+-------------------------------+
|Target : Expect More. Pay Less.|
+-------------------------------+



**10. (1 PT) Analysis on Ratings**  

The reviews contains information such as the number of stars for each review (the *rating*).  
The ratings are stored in an array (`reviews.stars`) for each business location (you should check for yourself). Return the top five most common rating arrays.  For example, an array might look like: 
[5, 5]

In [17]:
from pyspark.sql.functions import explode

In [18]:
exploded_ratings = df.select(explode("reviews.stars").alias("rating"))
exploded_ratings.show()

+------+
|rating|
+------+
|     4|
|     4|
|     5|
|  null|
|  null|
|  null|
|  null|
|  null|
|  null|
|  null|
|  null|
|  null|
|  null|
|  null|
|  null|
|     1|
|     5|
|     2|
|     4|
|     5|
+------+
only showing top 20 rows



In [19]:
from pyspark.sql.functions import col, array_sort, desc

In [20]:
rating_arrays = df.groupBy("reviews.stars").count()
rating_arrays = rating_arrays.sort(desc("count"))
top_five_ratings = rating_arrays.limit(5)
top_five_ratings.show(truncate=False)

+------+-----+
|stars |count|
+------+-----+
|null  |74679|
|[]    |42419|
|[5]   |4258 |
|[null]|3067 |
|[5, 5]|1610 |
+------+-----+



**11. More work with Ratings**  

For this question, you will filter out null ratings and then compute the average rating for each business location (using the field: `id`).


a) (1 PT) Create a new dataframe retaining two fields: `id`, `reviews.stars`


In [21]:
filtered_df = df.filter(col("reviews.stars").isNotNull()).select("id", "reviews.stars")
filtered_df.show()

+----------------+--------------------+
|              id|               stars|
+----------------+--------------------+
|000023995a540868|                  []|
|000136e65d50c3b7|              [4, 4]|
|0003b7589a4e12a0|                 [5]|
|00045f958e4bb02a|[null, null, null...|
|00059519f0dba1b4|[null, null, null...|
|0006d5aa170bae22|                  []|
|0008bc70f8ba62bf|              [null]|
|000a1df4c8e0ecd2|[null, null, 4, 5...|
|000bf1e934ac9cb6|                  []|
|000c4037ef6d4b3b|                  []|
|000c7b7a30623083|                 [5]|
|000c9ffc8b89af03|[5, 2, 5, 3, 3, 1...|
|000ca67c3cf252e5|                  []|
|000de20baa847ecc|  [1, 1, 1, 1, 5, 1]|
|000e439e7667839d|                  []|
|001064359d9f162f|     [5, 5, 5, 5, 5]|
|0010c9f495d87dd7|[5, 1, 1, 5, 3, 5...|
|0012eac5aaf0bd45|                  []|
|0013cd52c783f818|                  []|
|0017774db5e6400a|[null, 5, 5, 5, 5...|
+----------------+--------------------+
only showing top 20 rows



b) (1 PT) Create a row for each rating  
hint: use the `withColumn()` and `explode()` functions  
you will need to import the `explode()` function by issuing:

`from pyspark.sql.functions import explode`


In [22]:
from pyspark.sql.functions import explode

In [23]:
expanded_df = df.withColumn("rating", explode("reviews.stars"))
expanded_df.show()

+--------------------+--------------------+--------------------+----------------+--------------------+--------------------+--------------------+--------------------+------+
|             address|       business_tags|               hours|              id|                menu|             reviews|                urls|             webpage|rating|
+--------------------+--------------------+--------------------+----------------+--------------------+--------------------+--------------------+--------------------+------+
|{Rochester, {43.1...|{[], [{accepts_cr...|{null, 1700, null...|000136e65d50c3b7|                null|[{New (to me) qui...|{usps.com, [yelp....|{Welcome | USPS G...|     4|
|{Rochester, {43.1...|{[], [{accepts_cr...|{null, 1700, null...|000136e65d50c3b7|                null|[{New (to me) qui...|{usps.com, [yelp....|{Welcome | USPS G...|     4|
|{Birmingham, {33....|                null|                null|0003b7589a4e12a0|                null|[{Dr Cox and his ...|{null, [goog

c) (1 PT) Return a count of the number of ratings in this dataframe

In [24]:
rating_count = expanded_df.count()
print("Count of ratings:", rating_count)

Count of ratings: 600082


d) (1 PT) Drop rows where the rating is null, and return a count of the number of non-null ratings

In [25]:
non_null_rating_count = expanded_df.na.drop(subset=["rating"]).count()
print("Count of non-null ratings:", non_null_rating_count)

Count of non-null ratings: 538241


e) (1 PT) Compute the average rating, grouped by `id`. After the average is computed, sort by `id` in ascending order and show the top 10 records.  
 
hint:   
this can all be done in one line using the `agg()` function  
this `id` should be at the top: 000136e65d50c3b7|

In [26]:
from pyspark.sql.functions import avg

In [27]:
result = expanded_df.groupBy("id").agg(avg("rating").alias("average_rating")).orderBy("id")
result.show(10)

+----------------+------------------+
|              id|    average_rating|
+----------------+------------------+
|000136e65d50c3b7|               4.0|
|0003b7589a4e12a0|               5.0|
|00045f958e4bb02a|              null|
|00059519f0dba1b4|3.3333333333333335|
|0008bc70f8ba62bf|              null|
|000a1df4c8e0ecd2|               4.6|
|000c7b7a30623083|               5.0|
|000c9ffc8b89af03|               3.0|
|000de20baa847ecc|1.6666666666666667|
|001064359d9f162f|               5.0|
+----------------+------------------+
only showing top 10 rows

