### Lab Assignment: Commercial Data Analysis

### University of Virginia
### DS 7200: Distributed Computing
### Last Updated: August 20, 2023

---

### INSTRUCTIONS  
In this assignment, you will work with a dataset containing information about businesses.  
Each record is a business location.  Follow the steps below, writing and running the code in blocks, and displaying the solutions.  

Each question part is worth 1 POINT, for a total of 15 POINTS.

Hint: reaching deeper fields in json hierarchy can be done like this:  

`df.select('address.street_number')`

---

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
        .appName("comm") \
        .getOrCreate()

/opt/conda/lib/python3.7/site-packages/pyspark/bin/load-spark-env.sh: line 68: ps: command not found
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


24/09/20 14:55:32 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/09/20 14:55:34 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [2]:
# note that read.json can read a zipped JSON directly

**1. (1 PT) Read in the dataset and show the number of records**

In [2]:
df = spark.read.json("/standard/ds7200-apt4c/large_datasets/part-00000-a159c41a-bc58-4476-9b78-c437667f9c2b-c000.json.gz")
df.count()

                                                                                

24/09/20 14:55:50 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


                                                                                

154679

**2. (1 PT) Show the first 5 records**

In [4]:
df.show(5)

+--------------------+--------------------+--------------------+----------------+----+--------------------+--------------------+--------------------+
|             address|       business_tags|               hours|              id|menu|             reviews|                urls|             webpage|
+--------------------+--------------------+--------------------+----------------+----+--------------------+--------------------+--------------------+
|{Woodburn, {45.15...|                null|                null|000023995a540868|null|                  []|{woodburn.k12.or....|{Educational Tech...|
|{Hialeah, {25.884...|{[], [{has_atm, Y...|{null, 1900, null...|0000821a1394916e|null|                null|{null, [yelp.com]...|                null|
|{Rochester, {43.1...|{[], [{accepts_cr...|{null, 1700, null...|000136e65d50c3b7|null|[{New (to me) qui...|{usps.com, [yelp....|{Welcome | USPS G...|
|{West Palm Beach,...|                null|                null|00014329a70b9869|null|              

**3. (1 PT) Show the first 5 street addresses which are not null**  

In [5]:
# column operator
from pyspark.sql.functions import col

df.filter(col("address.street_address").isNotNull()).show(5) 

+--------------------+--------------------+-----+----------------+--------------------+--------------------+--------------------+--------------------+
|             address|       business_tags|hours|              id|                menu|             reviews|                urls|             webpage|
+--------------------+--------------------+-----+----------------+--------------------+--------------------+--------------------+--------------------+
|{Glen Burnie, {39...|                null| null|0005f8f0b2beeac4|                null|                null|{ecoopercontracti...|{24 HR Contractin...|
|{Clintwood, null,...|                null| null|003a0db3040b3677|                null|                null|{dickenson.k12.va...|{Home - Dickenson...|
|{San Francisco, {...|{[takes_reservati...| null|004afc1f90734d81|{1, https://www.y...|[{As you can see ...|{null, [yelp.com,...|                null|
|{Oxford, null, US...|                null| null|005044ced143cf64|                null|       

**4. (1 PT) Location**  

Count the number of records where the city is Phoenix

In [6]:
df.filter(col("address.city")=="Phoenix").count()

                                                                                

762

**5. (1 PT) Hours**  

Count the number of records where closing time on Thursday is 8pm

In [7]:
df.filter(col("hours.thursday_close")=="2000").count()

                                                                                

3313

**6. (1 PT) Location and Hours**  

Count the number of records where city is Phoenix and closing time on Thursday is 8pm

In [8]:
#df.filter((col("address.city")=="Phoenix") & (col("hours.thursday_close")=="2000")).count()
df.where((df.address.city == "Phoenix") & (df.hours.thursday_close == "2000")).count()

                                                                                

12

**7. (1 PT) Price Range**  

Price range is quoted in number of dollar signs.  Count the number of records with price range greater than or equal to two.

In [9]:
df.where(df.menu.price_range >=0).count()

                                                                                

2487

**8. (1 PT) COMPANY HEADQUARTERS**  

For the `address.is_headquarters` field:  
how many locations are HQ / are NOT HQ / are null?

In [10]:
print(f"{df.where(df.address.is_headquarters == True).count()} locations are HQ")
print(f"{df.where(df.address.is_headquarters == False).count()} locations not are HQ")
print(f"{df.where(df.address.is_headquarters.isNull()).count()} locations are null")

                                                                                

318 locations are HQ


                                                                                

66736 locations not are HQ


[Stage 25:>                                                         (0 + 1) / 1]

87625 locations are null


                                                                                

**9. (1 PT) Webpage URLs**  

Register the dataframe as a temp table.  
Next, use Spark SQL to select only the webpage title column, filtering on rows where the webpage url (accessed under `webpage.url`) is *Target.com*. 

Show only one resulting row and don't truncate the output.

In [11]:
# Register df as temp table
df.createOrReplaceTempView("company")

# query the view
sqlDF = spark.sql("SELECT webpage.title FROM company where webpage.url == 'Target.com'")

#show only one record
sqlDF.show(1, truncate=False)

+-------------------------------+
|title                          |
+-------------------------------+
|Target : Expect More. Pay Less.|
+-------------------------------+
only showing top 1 row



**10. (1 PT) Analysis on Ratings**  

The reviews contains information such as the number of stars for each review (the *rating*).  
The ratings are stored in an array (`reviews.stars`) for each business location (you should check for yourself). Return the top five most common rating arrays.  For example, an array might look like: 
[5, 5]



In [12]:
from pyspark.sql import functions as F

sql_rating_DF = spark.sql("SELECT reviews.stars, Count(reviews.stars) AS Count FROM company Group By reviews.stars ORDER BY Count DESC")
#show only five records
sql_rating_DF.show(5)

[Stage 29:>                                                         (0 + 1) / 1]

+------+-----+
| stars|Count|
+------+-----+
|    []|42419|
|   [5]| 4258|
|[null]| 3067|
|[5, 5]| 1610|
|   [1]| 1559|
+------+-----+
only showing top 5 rows



                                                                                

**11. More work with Ratings**  

For this question, you will filter out null ratings and then compute the average rating for each business location (using the field: `id`).


a) (1 PT) Create a new dataframe retaining two fields: `id`, `reviews.stars`


In [13]:
sqlDF2 = spark.sql("SELECT id, reviews.stars FROM company")

b) (1 PT) Create a row for each rating  
hint: use the `withColumn()` and `explode()` functions  
you will need to import the `explode()` function by issuing:

`from pyspark.sql.functions import explode`


In [14]:
from pyspark.sql.functions import explode

explode_df = sqlDF2.withColumn("stars", explode(sqlDF2.stars))

c) (1 PT) Return a count of the number of ratings in this dataframe

In [15]:
explode_df.count()

                                                                                

600082

d) (1 PT) Drop rows where the rating is null, and return a count of the number of non-null ratings

In [16]:
explode_df = explode_df.dropna()
print(f"There are {explode_df.count()} non-null ratings")

[Stage 35:>                                                         (0 + 1) / 1]

There are 538241 non-null ratings


                                                                                

e) (1 PT) Compute the average rating, grouped by `id`. After the average is computed, sort by `id` in ascending order and show the top 10 records.  
 
hint:   
this can all be done in one line using the `agg()` function  
this `id` should be at the top: 000136e65d50c3b7

In [17]:
from pyspark.sql.functions import asc

explode_df.groupBy("id").agg(F.avg("stars")).sort(asc("id")).show(10)

[Stage 38:>                                                         (0 + 1) / 1]

+----------------+------------------+
|              id|        avg(stars)|
+----------------+------------------+
|000136e65d50c3b7|               4.0|
|0003b7589a4e12a0|               5.0|
|00059519f0dba1b4|3.3333333333333335|
|000a1df4c8e0ecd2|               4.6|
|000c7b7a30623083|               5.0|
|000c9ffc8b89af03|               3.0|
|000de20baa847ecc|1.6666666666666667|
|001064359d9f162f|               5.0|
|0010c9f495d87dd7|               3.0|
|0017774db5e6400a| 4.333333333333333|
+----------------+------------------+
only showing top 10 rows



                                                                                

In [27]:
#from pyspark.sql.functions import grouping, count, desc

#df.cube("name").agg(grouping("name"), sum("age")).orderBy("name").show()
#explode_df.cube("stars").agg(grouping("stars"), count("stars")).sort(desc(count("stars"))).show()
#explode_df.groupBy("id").sort(asc("id")).show(10)