### Lab Assignment: Commercial Data Analysis

### University of Virginia
### DS 7200: Distributed Computing
### Last Updated: August 20, 2023

---

### INSTRUCTIONS  
In this assignment, you will work with a dataset containing information about businesses.  
Each record is a business location.  Follow the steps below, writing and running the code in blocks, and displaying the solutions.  

Each question part is worth 1 POINT, for a total of 15 POINTS.

Hint: reaching deeper fields in json hierarchy can be done like this:  

`df.select('address.street_number')`

---

**NOTE: I worked with Zoe on portions of this assignment**

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
        .appName("comm") \
        .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/09/20 13:11:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/09/20 13:11:40 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [2]:
# note that read.json can read a zipped JSON directly

**1. (1 PT) Read in the dataset and show the number of records**

In [3]:
# define the file path
file_path = '/standard/ds7200-apt4c/large_datasets/part-00000-a159c41a-bc58-4476-9b78-c437667f9c2b-c000.json.gz'

# create the data frame
df = spark.read.json(file_path)

df.count()

24/09/20 13:11:56 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

154679

**2. (1 PT) Show the first 5 records**

In [4]:
# use the show command
df.show(5)

+--------------------+--------------------+--------------------+----------------+----+--------------------+--------------------+--------------------+
|             address|       business_tags|               hours|              id|menu|             reviews|                urls|             webpage|
+--------------------+--------------------+--------------------+----------------+----+--------------------+--------------------+--------------------+
|{Woodburn, {45.15...|                null|                null|000023995a540868|null|                  []|{woodburn.k12.or....|{Educational Tech...|
|{Hialeah, {25.884...|{[], [{has_atm, Y...|{null, 1900, null...|0000821a1394916e|null|                null|{null, [yelp.com]...|                null|
|{Rochester, {43.1...|{[], [{accepts_cr...|{null, 1700, null...|000136e65d50c3b7|null|[{New (to me) qui...|{usps.com, [yelp....|{Welcome | USPS G...|
|{West Palm Beach,...|                null|                null|00014329a70b9869|null|              

**3. (1 PT) Show the first 5 street addresses which are not null**  

In [5]:
from pyspark.sql.functions import col

In [6]:
df.filter( col('address').isNotNull() ).show(5)

+--------------------+--------------------+--------------------+----------------+----+--------------------+--------------------+--------------------+
|             address|       business_tags|               hours|              id|menu|             reviews|                urls|             webpage|
+--------------------+--------------------+--------------------+----------------+----+--------------------+--------------------+--------------------+
|{Woodburn, {45.15...|                null|                null|000023995a540868|null|                  []|{woodburn.k12.or....|{Educational Tech...|
|{Hialeah, {25.884...|{[], [{has_atm, Y...|{null, 1900, null...|0000821a1394916e|null|                null|{null, [yelp.com]...|                null|
|{Rochester, {43.1...|{[], [{accepts_cr...|{null, 1700, null...|000136e65d50c3b7|null|[{New (to me) qui...|{usps.com, [yelp....|{Welcome | USPS G...|
|{West Palm Beach,...|                null|                null|00014329a70b9869|null|              

**4. (1 PT) Location**  

Count the number of records where the city is Phoenix

In [7]:
# print the schema for myself
df.printSchema()

root
 |-- address: struct (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- coordinates: struct (nullable = true)
 |    |    |-- lat: double (nullable = true)
 |    |    |-- lon: double (nullable = true)
 |    |-- country: string (nullable = true)
 |    |-- county: string (nullable = true)
 |    |-- full_address: string (nullable = true)
 |    |-- highway_number: string (nullable = true)
 |    |-- is_headquarters: boolean (nullable = true)
 |    |-- is_parsed: boolean (nullable = true)
 |    |-- post_direction: string (nullable = true)
 |    |-- pre_direction: string (nullable = true)
 |    |-- secondary_number: string (nullable = true)
 |    |-- state: string (nullable = true)
 |    |-- street: string (nullable = true)
 |    |-- street_address: string (nullable = true)
 |    |-- street_number: string (nullable = true)
 |    |-- street_type: string (nullable = true)
 |    |-- type_of_address: string (nullable = true)
 |    |-- zip: string (nullable = true)
 |    |-- 

In [8]:
# filtering and checking that city is pheonix
df.filter( col('address.city') == 'Phoenix' ).count()

                                                                                

762

**5. (1 PT) Hours**  

Count the number of records where closing time on Thursday is 8pm

In [9]:
# look at what the values are
# They're in military time
df.select( col('hours.thursday_close') ).distinct().show()

[Stage 9:>                                                          (0 + 1) / 1]

+--------------+
|thursday_close|
+--------------+
|          1159|
|          1436|
|          1500|
|          1445|
|          2200|
|          1435|
|          2115|
|          2012|
|          1620|
|          0430|
|          2201|
|          1735|
|          1100|
|          1915|
|          0630|
|          1215|
|          1300|
|        000000|
|          2100|
|          1530|
+--------------+
only showing top 20 rows



                                                                                

In [10]:
# actually filtering the values now
# 8pm is 1200 + 800 = 2000
df.filter( col('hours.thursday_close') == '2000' ).count()

                                                                                

3313

**6. (1 PT) Location and Hours**  

Count the number of records where city is Phoenix and closing time on Thursday is 8pm

In [11]:
# code to filter based on two conditions
df.filter( ( col('address.city') == 'Phoenix' ) & ( col('hours.thursday_close') == '2000' ) ).count()

                                                                                

12

**7. (1 PT) Price Range**  

Price range is quoted in number of dollar signs.  Count the number of records with price range greater than or equal to two.

In [12]:
# code to just look at the possible values
df.select( col('menu.price_range') ).distinct().show()

[Stage 18:>                                                         (0 + 1) / 1]

+-----------+
|price_range|
+-----------+
|          3|
|       null|
|          1|
|          4|
|          2|
+-----------+



                                                                                

In [13]:
# code to answer this question
df.filter( col('menu.price_range') >= 2 ).count()

                                                                                

1135

**8. (1 PT) COMPANY HEADQUARTERS**  

For the `address.is_headquarters` field:  
how many locations are HQ / are NOT HQ / are null?

In [14]:
# can use group by and count
df.groupby( col('address.is_headquarters') ).count().show()

[Stage 24:>                                                         (0 + 1) / 1]

+---------------+-----+
|is_headquarters|count|
+---------------+-----+
|           null|87625|
|           true|  318|
|          false|66736|
+---------------+-----+



                                                                                

**9. (1 PT) Webpage URLs**  

Register the dataframe as a temp table.  
Next, use Spark SQL to select only the webpage title column, filtering on rows where the webpage url (accessed under `webpage.url`) is *Target.com*. 

Show only one resulting row and don't truncate the output.

In [15]:
# create a temp table
df.createOrReplaceTempView("df")

sql_command = '''

SELECT webpage.title as web_title
FROM df
WHERE webpage.url == 'Target.com'

'''

# selection
webpage_title = spark.sql(
    sql_command
)

# show 1 and don't truncate
webpage_title.show(1, truncate = False)

+-------------------------------+
|web_title                      |
+-------------------------------+
|Target : Expect More. Pay Less.|
+-------------------------------+
only showing top 1 row



**10. (1 PT) Analysis on Ratings**  

The reviews contains information such as the number of stars for each review (the *rating*).  
The ratings are stored in an array (`reviews.stars`) for each business location (you should check for yourself). Return the top five most common rating arrays.  For example, an array might look like: 
[5, 5]



In [16]:
# look at the possible values
df.select(col('reviews.stars')).distinct().show(5)

[Stage 28:>                                                         (0 + 1) / 1]

+--------------------+
|               stars|
+--------------------+
|[1, 5, 1, 5, 5, 1...|
|[3, 2, 5, 5, 5, 4...|
|[null, null, null...|
|[4, 5, 5, 1, null...|
|[null, null, null...|
+--------------------+
only showing top 5 rows



                                                                                

In [17]:
df.groupby( col('reviews.stars') ).count().sort('count', ascending = False).show(5)

[Stage 31:>                                                         (0 + 1) / 1]

+------+-----+
| stars|count|
+------+-----+
|  null|74679|
|    []|42419|
|   [5]| 4258|
|[null]| 3067|
|[5, 5]| 1610|
+------+-----+
only showing top 5 rows



                                                                                

**11. More work with Ratings**  

For this question, you will filter out null ratings and then compute the average rating for each business location (using the field: `id`).


a) (1 PT) Create a new dataframe retaining two fields: `id`, `reviews.stars`


In [18]:
# use spark sql to get a new data frame with only id and reviews.stars

# we already defined the temp view so don't have to do it again

sql_command = '''

SELECT id, reviews.stars
FROM df

'''

new_df = spark.sql(sql_command)

new_df.show(5)

+----------------+------+
|              id| stars|
+----------------+------+
|000023995a540868|    []|
|0000821a1394916e|  null|
|000136e65d50c3b7|[4, 4]|
|00014329a70b9869|  null|
|00031c0a83f00657|  null|
+----------------+------+
only showing top 5 rows



b) (1 PT) Create a row for each rating  
hint: use the `withColumn()` and `explode()` functions  
you will need to import the `explode()` function by issuing:

`from pyspark.sql.functions import explode`


In [19]:
# import line
from pyspark.sql.functions import explode

In [20]:
# now explode
exploded_df = new_df.withColumn('exploded_stars', explode('stars'))
exploded_df.show(5)

+----------------+--------------------+--------------+
|              id|               stars|exploded_stars|
+----------------+--------------------+--------------+
|000136e65d50c3b7|              [4, 4]|             4|
|000136e65d50c3b7|              [4, 4]|             4|
|0003b7589a4e12a0|                 [5]|             5|
|00045f958e4bb02a|[null, null, null...|          null|
|00045f958e4bb02a|[null, null, null...|          null|
+----------------+--------------------+--------------+
only showing top 5 rows



c) (1 PT) Return a count of the number of ratings in this dataframe

In [21]:
# use exploded df to check
# there are rows per rating, so can just count the total rows
exploded_df.count()

                                                                                

600082

d) (1 PT) Drop rows where the rating is null, and return a count of the number of non-null ratings

In [22]:
non_null_df = exploded_df.filter( col('exploded_stars').isNotNull() )
non_null_df.count()

                                                                                

538241

e) (1 PT) Compute the average rating, grouped by `id`. After the average is computed, sort by `id` in ascending order and show the top 10 records.  
 
hint:   
this can all be done in one line using the `agg()` function  
this `id` should be at the top: 000136e65d50c3b7

In [23]:
from pyspark.sql import functions as F

In [24]:
# get the average rating per id
# sorting based on ID
avg_rating = non_null_df \
                .groupby( col('id') ) \
                .agg( F.avg('exploded_stars') ) \
                .sort('id', ascending = True)

# show the top 10 records
avg_rating.show(10)

[Stage 42:>                                                         (0 + 1) / 1]

+----------------+-------------------+
|              id|avg(exploded_stars)|
+----------------+-------------------+
|000136e65d50c3b7|                4.0|
|0003b7589a4e12a0|                5.0|
|00059519f0dba1b4| 3.3333333333333335|
|000a1df4c8e0ecd2|                4.6|
|000c7b7a30623083|                5.0|
|000c9ffc8b89af03|                3.0|
|000de20baa847ecc| 1.6666666666666667|
|001064359d9f162f|                5.0|
|0010c9f495d87dd7|                3.0|
|0017774db5e6400a|  4.333333333333333|
+----------------+-------------------+
only showing top 10 rows



                                                                                