## UDA CHALLENGE

This notebook has been created to answer the below questions:

Using the business.json dataset, and filtering out all businesses that are closed can you calculate:

Q1 - Median and p95 opening time during the week, by postal code, city, and state triplet.

Q2 - Median and p95 closing time during the week, by postal code, city, and state triplet.

Q3 - The number of businesses that are open past 21:00, by city and state pair.

By combining the business.json and review.json dataset, can you calculate:

Q4 - For each postal code, city, and state triplet, the business with the highest number of “cool” review votes that are not open on Sunday.

Though this exercise could have been done in a Python file to ease production. For readability, it has been done in a Jupiter notebook.

## LIBRARIES

In [35]:
import os

import utils.mining_data as md 

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

## ACCESS

To access previous local folders

In [36]:
# path to the data folder where downloaded data has been saved
path_data = md.route(1) + os.sep + "data" + os.sep

## READ DATA

As data can grow exponentially PySpark is used to be able to process larger datasets, (i.e. pandas library wouldn't be able to cope with an extremely large amount of data).

In [37]:
# Start a SparkSession
spark = SparkSession.builder.appName("uda_test").getOrCreate()
sc = spark.sparkContext

The first question relates to the 'business' data. Therefore, this notebook will start working with this json first

In [38]:
# Read the json file
df_business = spark.read.json(path_data + "yelp_academic_dataset_business.json")

# Quick check the dataframe has been loaded correctly
df_business.show(3)

+-------------------+--------------------+--------------------+--------------------+--------+--------------------+-------+-------------+---------------+--------------------+-----------+------------+-----+-----+
|            address|          attributes|         business_id|          categories|    city|               hours|is_open|     latitude|      longitude|                name|postal_code|review_count|stars|state|
+-------------------+--------------------+--------------------+--------------------+--------+--------------------+-------+-------------+---------------+--------------------+-----------+------------+-----+-----+
|       921 Pearl St|{null, null, 'bee...|6iYb2HFDywm3zjuRg...|Gastropubs, Food,...| Boulder|{11:0-23:0, 11:0-...|      1|   40.0175444|   -105.2833481| Oskar Blues Taproom|      80302|          86|  4.0|   CO|
|7000 NE Airport Way|{null, null, u'be...|tCbdrRPZA0oiIYSmH...|Salad, Soup, Sand...|Portland|{5:0-18:0, 5:0-18...|      1|45.5889058992|-122.5933307507|Flyi

## UNDERSTANDING DATA

Filtering out all businesses that are closed, the first question asks for the median and p95 opening time during the week, by postal code, city, and state triplet. Therefore, the first thing that will be done will be to filter out all closed business. Then opening hours of the week will be extracted, converted to float, and grouped them by postal code, city and state to calculate the median and p95.


First, filtering out all businessess that are closed:

In [39]:
# Check which column describes if the business is opened or closed
df_business.describe()

DataFrame[summary: string, address: string, business_id: string, categories: string, city: string, is_open: string, latitude: string, longitude: string, name: string, postal_code: string, review_count: string, stars: string, state: string]

In [40]:
# The column 'is_open'= 0 or 1 for closed or opened respectively
df = df_business.filter("is_open == 1")

# Quick check to ensure only opened businesses are kept
df.select("is_open").head(5)

[Row(is_open=1),
 Row(is_open=1),
 Row(is_open=1),
 Row(is_open=1),
 Row(is_open=1)]

For clarity only the relevant business characteristics for this exercise are maintained in the dataframe. 

In [41]:
# Hours are extracted from the dictionary to show each day of the week per column
# Some days contain Null values. They can be replaced by zeros but in this case they will be ignored when calculating the median and p95 
# to avoid misleading results as zeros can also represent midnight

# The days of the week are not in order, however this isn't relevant for the calculations we are going to perform, so they won't be modified
df = df["hours.*", "postal_code", "city", "state"]
df.show()

+----------+----------+---------+---------+----------+----------+----------+-----------+--------------+-----+
|    Friday|    Monday| Saturday|   Sunday|  Thursday|   Tuesday| Wednesday|postal_code|          city|state|
+----------+----------+---------+---------+----------+----------+----------+-----------+--------------+-----+
| 11:0-23:0| 11:0-23:0|11:0-23:0|11:0-23:0| 11:0-23:0| 11:0-23:0| 11:0-23:0|      80302|       Boulder|   CO|
|  5:0-18:0|  5:0-18:0| 5:0-18:0| 5:0-18:0|  5:0-18:0|  5:0-17:0|  5:0-18:0|      97218|      Portland|   OR|
| 11:0-18:0|      null|11:0-18:0|11:0-18:0| 11:0-18:0|      null|      null|      97214|      Portland|   OR|
|      null|      null|     null|     null|      null|      null|      null|      32763|   Orange City|   FL|
| 16:0-19:0| 16:0-19:0| 9:0-11:0|     null| 16:0-19:0| 16:0-19:0| 16:0-19:0|      30316|       Atlanta|   GA|
| 17:0-21:0| 17:0-21:0|17:0-21:0|17:0-21:0| 17:0-21:0| 17:0-21:0| 17:0-21:0|        V5V|     Vancouver|   BC|
| 8:0-17:3

Check there aren't any Null values in the columns 'postal_code', 'city' or 'state' as those are the columns that will be used for our analysis

In [8]:
# These columns don't have any Nulls. Therefore, we can carry on the exercise without any modifications to them
df.where(col("state").isNull()).show()
df.where(col("city").isNull()).show()
df.where(col("postal_code").isNull()).show()

+------+------+--------+------+--------+-------+---------+-----------+----+-----+
|Friday|Monday|Saturday|Sunday|Thursday|Tuesday|Wednesday|postal_code|city|state|
+------+------+--------+------+--------+-------+---------+-----------+----+-----+
+------+------+--------+------+--------+-------+---------+-----------+----+-----+

+------+------+--------+------+--------+-------+---------+-----------+----+-----+
|Friday|Monday|Saturday|Sunday|Thursday|Tuesday|Wednesday|postal_code|city|state|
+------+------+--------+------+--------+-------+---------+-----------+----+-----+
+------+------+--------+------+--------+-------+---------+-----------+----+-----+

+------+------+--------+------+--------+-------+---------+-----------+----+-----+
|Friday|Monday|Saturday|Sunday|Thursday|Tuesday|Wednesday|postal_code|city|state|
+------+------+--------+------+--------+-------+---------+-----------+----+-----+
+------+------+--------+------+--------+-------+---------+-----------+----+-----+



## Convert hours from string to float format

Business hours are saved as string. They will be converted to float to calculate the median and p95 of all businesses during the week.

In [9]:
# Check data format
df.printSchema()

root
 |-- Friday: string (nullable = true)
 |-- Monday: string (nullable = true)
 |-- Saturday: string (nullable = true)
 |-- Sunday: string (nullable = true)
 |-- Thursday: string (nullable = true)
 |-- Tuesday: string (nullable = true)
 |-- Wednesday: string (nullable = true)
 |-- postal_code: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)



In [10]:
# Replace ':' to '.' to be able to convert the data to float
df = md.change_time(dataframe=df, column_range=df.columns[:7])

# Quick check this operation was done successfully
df.show(5)

+---------+---------+---------+---------+---------+---------+---------+-----------+-----------+-----+
|   Friday|   Monday| Saturday|   Sunday| Thursday|  Tuesday|Wednesday|postal_code|       city|state|
+---------+---------+---------+---------+---------+---------+---------+-----------+-----------+-----+
|11.0-23.0|11.0-23.0|11.0-23.0|11.0-23.0|11.0-23.0|11.0-23.0|11.0-23.0|      80302|    Boulder|   CO|
| 5.0-18.0| 5.0-18.0| 5.0-18.0| 5.0-18.0| 5.0-18.0| 5.0-17.0| 5.0-18.0|      97218|   Portland|   OR|
|11.0-18.0|     null|11.0-18.0|11.0-18.0|11.0-18.0|     null|     null|      97214|   Portland|   OR|
|     null|     null|     null|     null|     null|     null|     null|      32763|Orange City|   FL|
|16.0-19.0|16.0-19.0| 9.0-11.0|     null|16.0-19.0|16.0-19.0|16.0-19.0|      30316|    Atlanta|   GA|
+---------+---------+---------+---------+---------+---------+---------+-----------+-----------+-----+
only showing top 5 rows



In [11]:
# Opening and closing hours are split to different columns to be able to calculate the median and p95 at each time.
# As the first question only requires opening times, closing times won't be printed just yet to avoid confusion. 
# The same function will be used when working with closing times.
df_open = md.split_hours(df, df.schema.names[:7], time_to_check = "opening")

# Check the function worked
df_open.show(5)

+------+------+--------+------+--------+-------+---------+-----------+-----------+-----+
|Friday|Monday|Saturday|Sunday|Thursday|Tuesday|Wednesday|postal_code|       city|state|
+------+------+--------+------+--------+-------+---------+-----------+-----------+-----+
|  11.0|  11.0|    11.0|  11.0|    11.0|   11.0|     11.0|      80302|    Boulder|   CO|
|   5.0|   5.0|     5.0|   5.0|     5.0|    5.0|      5.0|      97218|   Portland|   OR|
|  11.0|  null|    11.0|  11.0|    11.0|   null|     null|      97214|   Portland|   OR|
|  null|  null|    null|  null|    null|   null|     null|      32763|Orange City|   FL|
|  16.0|  16.0|     9.0|  null|    16.0|   16.0|     16.0|      30316|    Atlanta|   GA|
+------+------+--------+------+--------+-------+---------+-----------+-----------+-----+
only showing top 5 rows



As mentioned before Null values are ignored for the purpose of this exercise. It is not recommended to convert them to zeros as these represent 24.00h and can alter results.

In [12]:
# Converting hours from string to float format
df_open = md.convert_to_num(df_open.schema.names[:7], df_open, "float")

df_open.printSchema()

root
 |-- Friday: float (nullable = true)
 |-- Monday: float (nullable = true)
 |-- Saturday: float (nullable = true)
 |-- Sunday: float (nullable = true)
 |-- Thursday: float (nullable = true)
 |-- Tuesday: float (nullable = true)
 |-- Wednesday: float (nullable = true)
 |-- postal_code: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)



## Question 1

Median and p95 opening time during the week by postal code, city, and state triplet.

In [13]:
# To keep the results clear the median and p95 will be shown in different dataframes

# Median
# The median is the value where 50% or the data values fall at or below it. Therefore, it is the same than the 50% percentile. 
# The below function uses the in-build 50% percentile method in pyspark library.
df1_median = md.calc_percentile(df_open, df_open.schema.names[:7], 0.5, df_open.schema.names[:7], ["postal_code", "city", "state"])
df1_median.show()

+-----------+----------------+-----+--------+--------+--------+--------+--------+--------+--------+
|postal_code|            city|state|Fri_p0.5|Mon_p0.5|Sat_p0.5|Sun_p0.5|Thu_p0.5|Tue_p0.5|Wed_p0.5|
+-----------+----------------+-----+--------+--------+--------+--------+--------+--------+--------+
|      02119|         Roxbury|   MA|     9.0|     7.3|     9.0|     8.0|     8.3|     9.0|     8.3|
|      02129|          Boston|   MA|     8.0|     7.0|     8.0|     8.0|     8.0|     8.0|     8.0|
|      32830|Lake Buena Vista|   FL|    10.0|     9.0|    10.0|    10.0|    10.0|    10.0|    10.0|
|      43140|          London|   OH|     8.0|     7.0|     7.0|     6.0|     8.0|     7.0|     8.0|
|      97034|     Lake Oswego|   OR|     9.0|     7.3|     9.3|    10.0|     9.0|     9.0|     9.0|
|    V5Z 3A3|       Vancouver|   BC|     9.3|     9.3|     9.3|     9.3|     9.3|     9.3|     9.3|
|    V7V 1H8|  West Vancouver|   BC|    10.0|     8.0|    10.0|     9.0|    10.0|     8.0|     8.0|


Now that opening/closing hours are split, all days can be easily grouped to calculate the median and p95 of all days together if required. In this case a new column, being a list of all week-days' hours, could be created and the median and p95 would be calculated to this column.

In [14]:
# Calculate the median of all days together
total_median = md.total_percentile(df_open, df_open.schema.names[:7], 0.5, "Total Median", ["postal_code", "city", "state"])

total_median.show()

+-----------+----------------+-----+------------+
|postal_code|            city|state|Total Median|
+-----------+----------------+-----+------------+
|      02119|         Roxbury|   MA|         9.0|
|      02129|          Boston|   MA|         8.0|
|      32830|Lake Buena Vista|   FL|        10.0|
|      43140|          London|   OH|         8.0|
|      97034|     Lake Oswego|   OR|         9.0|
|    V5Z 3A3|       Vancouver|   BC|         9.3|
|    V7V 1H8|  West Vancouver|   BC|        10.0|
|    V3H 2B2|      Port Moody|   BC|        16.0|
|    V5V 4B9|       Vancouver|   BC|        11.3|
|    V5Z 3V6|       Vancouver|   BC|        11.0|
|    V6E 1V8|       Vancouver|   BC|         8.0|
|    V6G 1C9|       Vancouver|   BC|        11.0|
|    V7M 2H5| North Vancouver|   BC|        17.3|
|    V4C 2S2|           Delta|   BC|        16.0|
|    V5J 1G3|         Burnaby|   BC|        null|
|    V7J 2A1| North Vancouver|   BC|        11.0|
|    V3B 5E2|       Coquitlam|   BC|        12.0|


In [15]:
# The total median is appended to the previous dataframe which was showing the median per day

df1_median = df1_median.join(total_median, ["postal_code", "city", "state"])
df1_median.show()

+-----------+----------------+-----+--------+--------+--------+--------+--------+--------+--------+------------+
|postal_code|            city|state|Fri_p0.5|Mon_p0.5|Sat_p0.5|Sun_p0.5|Thu_p0.5|Tue_p0.5|Wed_p0.5|Total Median|
+-----------+----------------+-----+--------+--------+--------+--------+--------+--------+--------+------------+
|      02119|         Roxbury|   MA|     9.0|     7.3|     9.0|     8.0|     8.3|     9.0|     8.3|         9.0|
|      02129|          Boston|   MA|     8.0|     7.0|     8.0|     8.0|     8.0|     8.0|     8.0|         8.0|
|      32830|Lake Buena Vista|   FL|    10.0|     9.0|    10.0|    10.0|    10.0|    10.0|    10.0|        10.0|
|      43140|          London|   OH|     8.0|     7.0|     7.0|     6.0|     8.0|     7.0|     8.0|         8.0|
|      97034|     Lake Oswego|   OR|     9.0|     7.3|     9.3|    10.0|     9.0|     9.0|     9.0|         9.0|
|    V5Z 3A3|       Vancouver|   BC|     9.3|     9.3|     9.3|     9.3|     9.3|     9.3|     9

The same procedure is used to calculate the 95% percentile

In [16]:
# P95

# p95 is calculated per week-day
df1_p95 = md.calc_percentile(df_open, df_open.schema.names[:7], 0.95, df_open.schema.names[:7], ["postal_code", "city", "state"])

# The total p95 of all days together is calculated
total_p95 = md.total_percentile(df_open, df_open.schema.names[:7], 0.95, "Total P95", ["postal_code", "city", "state"])

# Both answers are combined in one dataframe
df1_p95 = df1_p95.join(total_p95, ["postal_code", "city", "state"])

df1_p95.show()

+-----------+----------------+-----+---------+---------+---------+---------+---------+---------+---------+---------+
|postal_code|            city|state|Fri_p0.95|Mon_p0.95|Sat_p0.95|Sun_p0.95|Thu_p0.95|Tue_p0.95|Wed_p0.95|Total P95|
+-----------+----------------+-----+---------+---------+---------+---------+---------+---------+---------+---------+
|      02119|         Roxbury|   MA|     16.0|     12.0|     13.0|     13.0|     12.0|     16.0|     12.0|     13.0|
|      02129|          Boston|   MA|     16.0|     12.0|     12.0|     12.0|     16.0|     16.0|     16.0|     16.0|
|      32830|Lake Buena Vista|   FL|     17.0|     15.3|     16.3|     16.3|     17.0|     17.0|     17.0|     16.3|
|      43140|          London|   OH|     16.0|     16.0|     16.0|     11.0|     16.0|     16.0|     16.0|     16.0|
|      97034|     Lake Oswego|   OR|     16.0|     14.0|     16.0|     16.0|     16.0|     16.0|     16.0|     16.0|
|    V5Z 3A3|       Vancouver|   BC|      9.3|      9.3|      9.

## Question 2

Median and p95 closing time during the week, by postal code, city, and state triplet.

In [17]:
# The second question is very similar to the first one, so it is not necessary to start the same process from the beginning
# Closing hours are saved in a dataframe. In this case opening hours are disregarded
df_close = md.split_hours(df, df.schema.names[:7], time_to_check = "closing")

# Converting hours from string to float format
df_close = md.convert_to_num(df_close.schema.names[:7], df_close, "float")

df_close.show(5)

# Quick check to ensure the column 'hours' has been converted to float successfuly
df_close.printSchema()

+------+------+--------+------+--------+-------+---------+-----------+-----------+-----+
|Friday|Monday|Saturday|Sunday|Thursday|Tuesday|Wednesday|postal_code|       city|state|
+------+------+--------+------+--------+-------+---------+-----------+-----------+-----+
|  23.0|  23.0|    23.0|  23.0|    23.0|   23.0|     23.0|      80302|    Boulder|   CO|
|  18.0|  18.0|    18.0|  18.0|    18.0|   17.0|     18.0|      97218|   Portland|   OR|
|  18.0|  null|    18.0|  18.0|    18.0|   null|     null|      97214|   Portland|   OR|
|  null|  null|    null|  null|    null|   null|     null|      32763|Orange City|   FL|
|  19.0|  19.0|    11.0|  null|    19.0|   19.0|     19.0|      30316|    Atlanta|   GA|
+------+------+--------+------+--------+-------+---------+-----------+-----------+-----+
only showing top 5 rows

root
 |-- Friday: float (nullable = true)
 |-- Monday: float (nullable = true)
 |-- Saturday: float (nullable = true)
 |-- Sunday: float (nullable = true)
 |-- Thursday: floa

In [18]:
# To keep the results clear the median and p95 will be calculated in different dataframes

# Similar to before, the same procedure is used to calculate the median for the closing hours 
df2_median = md.calc_percentile(df_close, df_close.schema.names[:7], 0.5, df_close.schema.names[:7], ["postal_code", "city", "state"])
total_median_2 = md.total_percentile(df_close, df_close.schema.names[:7], 0.5, "Total Median", ["postal_code", "city", "state"])
df2_median = df2_median.join(total_median_2, ["postal_code", "city", "state"])

df2_median.show()

+-----------+----------------+-----+--------+--------+--------+--------+--------+--------+--------+------------+
|postal_code|            city|state|Fri_p0.5|Mon_p0.5|Sat_p0.5|Sun_p0.5|Thu_p0.5|Tue_p0.5|Wed_p0.5|Total Median|
+-----------+----------------+-----+--------+--------+--------+--------+--------+--------+--------+------------+
|      02119|         Roxbury|   MA|    18.0|    17.0|    17.0|    21.3|    19.0|    18.0|    18.0|        18.0|
|      02129|          Boston|   MA|    18.0|    17.0|    17.0|    17.0|    18.0|    18.0|    18.0|        17.3|
|      32830|Lake Buena Vista|   FL|    21.0|    19.0|    21.0|    21.0|    21.0|    21.0|    21.0|        21.0|
|      43140|          London|   OH|    20.0|    20.0|    20.0|     0.0|    20.0|    19.0|    20.0|        20.0|
|      97034|     Lake Oswego|   OR|    18.0|    17.0|    18.0|    18.0|    18.0|    18.0|    18.0|        18.0|
|    V5Z 3A3|       Vancouver|   BC|    23.0|    23.0|    23.0|    21.0|    23.0|    23.0|    23

In [19]:
# The same procedure is used to calculate the 95% percentile
df2_p95 = md.calc_percentile(df_close, df_close.schema.names[:7], 0.95, df_close.schema.names[:7], ["postal_code", "city", "state"])
total_p95_2 = md.total_percentile(df_close, df_close.schema.names[:7], 0.95, "Total P95", ["postal_code", "city", "state"])
df2_p95 = df2_p95.join(total_p95_2, ["postal_code", "city", "state"])

df2_p95.show()

+-----------+----------------+-----+---------+---------+---------+---------+---------+---------+---------+---------+
|postal_code|            city|state|Fri_p0.95|Mon_p0.95|Sat_p0.95|Sun_p0.95|Thu_p0.95|Tue_p0.95|Wed_p0.95|Total P95|
+-----------+----------------+-----+---------+---------+---------+---------+---------+---------+---------+---------+
|      02119|         Roxbury|   MA|     23.0|     23.0|     23.0|     23.0|     23.0|     23.0|     23.0|     23.0|
|      02129|          Boston|   MA|     22.0|     22.0|     22.0|     23.0|     22.0|     22.0|     22.0|     22.0|
|      32830|Lake Buena Vista|   FL|     23.3|     23.0|     23.3|     23.0|     23.0|     23.0|     23.0|     23.0|
|      43140|          London|   OH|     23.0|     23.0|     23.0|     23.0|     23.0|     23.0|     23.0|     23.0|
|      97034|     Lake Oswego|   OR|     22.0|     21.3|     22.0|     22.0|     21.0|     22.0|     22.0|     22.0|
|    V5Z 3A3|       Vancouver|   BC|     23.0|     23.0|     23.

## Question 3

The number of businesses that are open past 21:00, by city and state pair.

As we have already worked with the 'business' data, we already have a dataframe showing businesses' timetable as float. It is now only required to keep the businesses that are open past 21.00 and grouped them by city and state.

In [20]:
# The previous dataframe 'df_close' already shows the closing information of each business in float format
# As before Null values are ignored to avoid misleading results. It is not recommended to convert them to zeros as this represents 24.00h
df_close.show(5)
df_close.printSchema()

+------+------+--------+------+--------+-------+---------+-----------+-----------+-----+
|Friday|Monday|Saturday|Sunday|Thursday|Tuesday|Wednesday|postal_code|       city|state|
+------+------+--------+------+--------+-------+---------+-----------+-----------+-----+
|  23.0|  23.0|    23.0|  23.0|    23.0|   23.0|     23.0|      80302|    Boulder|   CO|
|  18.0|  18.0|    18.0|  18.0|    18.0|   17.0|     18.0|      97218|   Portland|   OR|
|  18.0|  null|    18.0|  18.0|    18.0|   null|     null|      97214|   Portland|   OR|
|  null|  null|    null|  null|    null|   null|     null|      32763|Orange City|   FL|
|  19.0|  19.0|    11.0|  null|    19.0|   19.0|     19.0|      30316|    Atlanta|   GA|
+------+------+--------+------+--------+-------+---------+-----------+-----------+-----+
only showing top 5 rows

root
 |-- Friday: float (nullable = true)
 |-- Monday: float (nullable = true)
 |-- Saturday: float (nullable = true)
 |-- Sunday: float (nullable = true)
 |-- Thursday: floa

Hours can be misleading, when comparing number 21.00 > 3.00 but when talking about hours 21.00pm < 3.00am. Therefore, to avoid any mistakes a two-parts check will be done. 

First, if the time is past midnight (0.00) it will automatically be considered as past 21.00h. If the hour is before midnight it will be compared in a one by one basis to 21.00h.

For clarification, if a business is opened past 21.00 one day per week it will be already considered. It won't be required for all days to be opened past 21.00.

In [21]:
# Obtain businesses opened between midnight and 10.00am 

# When reviewing results some days seem to be open before 21.00. 
# This is becuase only one day per week needs to be opened past 21.00 to consider the business as acceptable answer to this question. 
week = df_close.schema.names[:7]
df_past_midnight = md.filtering_hours(df_close, week)

# Quick check hours are past midnight, at least one day per week
df_past_midnight.show(5)

+------+------+--------+------+--------+-------+---------+-----------+-----------+-----+
|Friday|Monday|Saturday|Sunday|Thursday|Tuesday|Wednesday|postal_code|       city|state|
+------+------+--------+------+--------+-------+---------+-----------+-----------+-----+
|   3.0|   3.0|     3.0|   0.0|     3.0|    3.0|      3.0|      30309|    Atlanta|   GA|
|   2.0|   0.0|     2.0|   0.0|     0.0|    0.0|      0.0|      80301|    Boulder|   CO|
|   1.0|   1.0|     1.0|   1.0|     1.0|    1.0|      1.0|      78729|     Austin|   TX|
|  17.3|  null|     5.3|  null|    17.3|   17.3|     17.3|      32789|Winter Park|   FL|
|   1.0|   1.0|     1.0|   1.0|     1.0|    1.0|      1.0|      01803| Burlington|   MA|
+------+------+--------+------+--------+-------+---------+-----------+-----------+-----+
only showing top 5 rows



In [22]:
# Obtain businesses opened past 21.00
df_past_nine = md.filtering_hours(df_close, week, hours= [21.00, 23.59])

# Quick check hours are past 21h, at least one day per week
df_past_nine.show(5)

+------+------+--------+------+--------+-------+---------+-----------+--------+-----+
|Friday|Monday|Saturday|Sunday|Thursday|Tuesday|Wednesday|postal_code|    city|state|
+------+------+--------+------+--------+-------+---------+-----------+--------+-----+
|  23.0|  23.0|    23.0|  23.0|    23.0|   23.0|     23.0|      80302| Boulder|   CO|
|  22.0|  21.0|    22.0|  null|    21.0|   21.0|     21.0|      01960| Peabody|   MA|
|  22.0|   0.0|    22.0|  22.0|    22.0|   22.0|     22.0|      32830| Orlando|   FL|
|  22.0|  22.0|    22.0|  20.0|    22.0|   22.0|     22.0|      02215|  Boston|   MA|
|   0.0|  23.0|     0.0|  23.0|    23.0|   23.0|     23.0|      97230|Portland|   OR|
+------+------+--------+------+--------+-------+---------+-----------+--------+-----+
only showing top 5 rows



In [23]:
# Join together both dataframes
df_past_nine = df_past_nine.union(df_past_midnight)

# Grouping the combined dataframe by 'city' and 'state' as requested
df_past_nine = df_past_nine.groupBy("city", "state").count()

df_past_nine.show()

+---------------+-----+-----+
|           city|state|count|
+---------------+-----+-----+
|     Harrisburg|   OH|    1|
|      Blacklick|   OH|    8|
|      Lake City|   GA|    6|
|  North Reading|   MA|   18|
|        Deltona|   FL|   39|
|        Hanover|   MA|    1|
|   Portland, OR|   OR|    1|
|    Casselberry|   FL|   59|
|   West Roxbury|   MA|   31|
|          COCOA|   FL|    1|
|  ChampionsGate|   FL|    1|
|       St Cloud|   FL|    4|
|  Boston-Fenway|   MA|    2|
|        Medford|   MA|  109|
|          Brice|   OH|    1|
|       Superior|   CO|    9|
|      Vancouver|  ABE|    1|
|North Vancouver|   BC|  166|
|      Whitehall|   OH|   17|
|           Hull|   MA|   15|
+---------------+-----+-----+
only showing top 20 rows



## PART II
## Question 4

By combining the business.json and review.json dataset, can you calculate:

For each postal code, city, and state triplet, the business with the highest number of “cool” review votes that are not open on Sunday.

Now a new json file will need to be loaded and manipulated. The column 'cool' review will need to be converted to int format before combining the dataframes, to then be able to rank businesses based on this column. To avoid a combined dataframe with a lot of irrelevant columns only the required ones will be kept. Only businesses not open on Sunday will be ranked so this column will be maintained and opening/closing hours split to compare these times. Only then the dataframe will be grouped by postal code, city and state to provide the businesses with the higher number of 'cool' votes.

In [24]:
# business.json has already been loaded to this notebook. Now we need to read the review.json file
df_review = spark.read.json(path_data + "yelp_academic_dataset_review.json")

# Quick check the file has been loaded correctly
df_review.show(5)

+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+
|         business_id|cool|               date|funny|           review_id|stars|                text|useful|             user_id|
+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+
|buF9druCkbuXLX526...|   1|2014-10-11 03:34:02|    1|lWC-xP3rd6obsecCY...|  4.0|Apparently Prides...|     3|ak0TdVmGKo4pwqdJS...|
|RA4V8pr014UyUbDvI...|   0|2015-07-03 20:38:25|    0|8bFej1QE5LXp4O05q...|  4.0|This store is pre...|     1|YoVfDbnISlW0f7abN...|
|_sS2LBIGNT5NQb6PD...|   0|2013-05-28 20:38:06|    0|NDhkzczKjLshODbqD...|  5.0|I called WVM on t...|     0|eC5evKn1TWDyHCyQA...|
|0AzLzHfOJgL7ROwhd...|   1|2010-01-08 02:29:15|    1|T5fAqjjFooT4V0OeZ...|  2.0|I've stayed at ma...|     1|SFQ1jcnGguO0LYWnb...|
|8zehGz9jnxPqXtOc7...|   0|2011-07-28 18:05:01|    0|sjm_uUcQVxab_EeLC...|  4.0|The food i

## UNDERSTANDING DATA OF 'REVIEW' DATAFRAME

In [25]:
df_review.describe()

DataFrame[summary: string, business_id: string, cool: string, date: string, funny: string, review_id: string, stars: string, text: string, useful: string, user_id: string]

## Convert 'cool' votes from string to integer format

The column 'cool' will need to be converted to int format to be able to order reviews by the number of votes

In [26]:
# Converting 'cool' from string to int format
df_review = md.convert_to_num(df_review.schema.names[1:2], df_review, "int")

df_review.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- cool: integer (nullable = true)
 |-- date: string (nullable = true)
 |-- funny: long (nullable = true)
 |-- review_id: string (nullable = true)
 |-- stars: double (nullable = true)
 |-- text: string (nullable = true)
 |-- useful: long (nullable = true)
 |-- user_id: string (nullable = true)



For clarity only the relevant characteristics for this exercise are maintained in both dataframes prior to their join. 

Filtering out info in 'review' dataframe:

In [27]:
# Only the columns 'business_id' (only common column with 'business' dataframe, required to join them) and 'cool' will be maintained
df_review = df_review.select(col("business_id"), col("cool"))

df_review.show(3)

+--------------------+----+
|         business_id|cool|
+--------------------+----+
|buF9druCkbuXLX526...|   1|
|RA4V8pr014UyUbDvI...|   0|
|_sS2LBIGNT5NQb6PD...|   0|
+--------------------+----+
only showing top 3 rows



## UNDERSTANDING DATA OF 'BUSINESS' DATAFRAME

Filtering out info in 'business' dataframe:

In [28]:
# This dataframe has already been loaded to this notebook. Double check columns format for new question.
print(df_business.describe())

df_business.select("hours.*").describe()

DataFrame[summary: string, address: string, business_id: string, categories: string, city: string, is_open: string, latitude: string, longitude: string, name: string, postal_code: string, review_count: string, stars: string, state: string]


DataFrame[summary: string, Friday: string, Monday: string, Saturday: string, Sunday: string, Thursday: string, Tuesday: string, Wednesday: string]

In [29]:
# Based on the question the required columns are 'business_id' (common column with the other dataframe that will be used to join them),
# 'name' (name of businesses), 'postal_code', 'state', 'city' (columns to group by the final dataframe),'hours' (to check which Sundays are closed).
# All business, permanently opened or closed will be considered. Therefore, the column 'is_open' won't be used to filter any information. 

df_b = df_business.select(col("business_id"), col("name"), col("postal_code"), col("state"), col("city"), col("hours.*"))

# Check which position the column 'Sunday' is located in the dataframe
df_b.schema.names[8:9]

# It is understood that null values don't represent when a business is closed, but that no information was available from that business
# Therefore, a business is closed on Sunday when their opening and closing hours are the same on that day
# Reusing the function 'split_hours' it is possible to separate this info in different columns to ease their comparison.
sun_open = md.split_hours(df_b, df_b.schema.names[8:9], "opening").withColumnRenamed("Sunday","Sun_opening")
sun_close = md.split_hours(df_b, df_b.schema.names[8:9], "closing").withColumnRenamed("Sunday","Sun_closing")

# Join dataframes with opening and closing hours
df4 = sun_open.join(sun_close, ["business_id", "name", "postal_code", "state", "city"])\
                                .drop("Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday")

df4.show(5)

+--------------------+--------------------+-----------+-----+--------+-----------+-----------+
|         business_id|                name|postal_code|state|    city|Sun_opening|Sun_closing|
+--------------------+--------------------+-----------+-----+--------+-----------+-----------+
|-1rvXk4zbX3I6ddMC...|     ZenCha Tea Cafe|      43209|   OH|  Bexley|       10:0|       17:0|
|-56y4ePrMKCZbi059...|Cosmopolitan Rest...|      30067|   GA|Marietta|       11:0|      23:30|
|-BgWU01VUnurXHlFo...|Charles Barber & ...|      30084|   GA|  Tucker|       null|       null|
|-DzPWvc2PRDtIPa-2...|   Meg B White Salon|      30324|   GA| Atlanta|       null|       null|
|-N5w3E16qNruBDGd0...|Green's Grille an...|      01801|   MA|  Woburn|        8:0|       13:0|
+--------------------+--------------------+-----------+-----+--------+-----------+-----------+
only showing top 5 rows



In [30]:
# Dataframe containing only businesses that are not open on Sunday
df4 = df4.filter(df4["Sun_opening"] == df4["Sun_closing"])

# Quick check this info is correct in the new dataframe
df4.show(5)

+--------------------+--------------------+-----------+-----+----------+-----------+-----------+
|         business_id|                name|postal_code|state|      city|Sun_opening|Sun_closing|
+--------------------+--------------------+-----------+-----+----------+-----------+-----------+
|-NEVig2nUPOxHjAkR...|   Remediation Group|      30318|   GA|   Atlanta|        0:0|        0:0|
|0GEgDlxdQqMngSEjy...|Urban Express Cha...|      43219|   OH|  Columbus|        0:0|        0:0|
|0aJBoc-8eIUH0ysDE...|       Trial Pro, PA|      32801|   FL|   Orlando|        0:0|        0:0|
|0jlUpkdXg3LCE44UK...| Portland City Grill|      97204|   OR|  Portland|        0:0|        0:0|
|1l_FrJJkI22OMgL9L...|The Two Brothers ...|      02145|   MA|Somerville|        0:0|        0:0|
+--------------------+--------------------+-----------+-----+----------+-----------+-----------+
only showing top 5 rows



Joining both dataframes

In [31]:
df4 = df4.join(df_review, on="business_id")

df4.show(5)

+--------------------+-----------+-----------+-----+-------+-----------+-----------+----+
|         business_id|       name|postal_code|state|   city|Sun_opening|Sun_closing|cool|
+--------------------+-----------+-----------+-----+-------+-----------+-----------+----+
|3_vvDzgMFfLEMo8P9...|Ashley Fuel|      01915|   MA|Beverly|        0:0|        0:0|   0|
|3_vvDzgMFfLEMo8P9...|Ashley Fuel|      01915|   MA|Beverly|        0:0|        0:0|   0|
|3_vvDzgMFfLEMo8P9...|Ashley Fuel|      01915|   MA|Beverly|        0:0|        0:0|   0|
|3_vvDzgMFfLEMo8P9...|Ashley Fuel|      01915|   MA|Beverly|        0:0|        0:0|   0|
|3_vvDzgMFfLEMo8P9...|Ashley Fuel|      01915|   MA|Beverly|        0:0|        0:0|   0|
+--------------------+-----------+-----------+-----+-------+-----------+-----------+----+
only showing top 5 rows



Grouping businesses by postal code, state and city and organising the results based on the highest number of “cool” review votes.

In [32]:
# Group the dataframe and calculate the max 'cool' votes
df_cool = df4.groupBy("postal_code", "city", "state", "name").agg({"cool": "max"})

# Order the dataframe by the highest number of 'cool' reviews
df_cool = df_cool.sort(df_cool["max(cool)"].desc()) 

df_cool.show()

+-----------+-----------+-----+--------------------+---------+
|postal_code|       city|state|                name|max(cool)|
+-----------+-----------+-----+--------------------+---------+
|      02111|     Boston|   MA|The Ritz-Carlton,...|      297|
|      01923|    Danvers|   MA|Residence Inn Bos...|      137|
|      32819|    Orlando|   FL|Hyatt Regency Orl...|      128|
|      32830|    Orlando|   FL|Club Wyndham Bonn...|      123|
|      01742|    Concord|   MA|Residence Inn Bos...|      119|
|      32821|    Orlando|   FL|Wyndham Grand Orl...|      114|
|      02045|       Hull|   MA|The Beacon Luxury...|      112|
|      97202|   Portland|   OR|Original Hotcake ...|      106|
|      32771|    Sanford|   FL|Comfort Inn & Sui...|      105|
|      97204|   Portland|   OR| Portland City Grill|      100|
|      32763|Orange City|   FL|Quality Inn near ...|      100|
|    V7B 0A4|   Richmond|   BC|Vancouver Interna...|       98|
|      78758|     Austin|   TX| Archer Hotel Austin|   

## OUTPUT

All produced dataframes will be saved in the folder 'reports' so answers to all questions will be easily accesible.

In [33]:
# When saving dataframes as CSV files Pyspark saves them as a set of CSVs instead of a single file. 
# Therefore, to avoid having to concatenate them again the dataframe is converted to CSV file using Pandas, 
# (though it takes more time to do this operation).

save_path = md.route(1) + os.sep + "reports" + os.sep 

# Question 1: Median and p95 opening time during the week, by postal code, city, and state triplet.
md.df_to_csv(df1_median, save_path, "q1_median_opening")
md.df_to_csv(df1_p95, save_path, "q1_p95_opening")

# Question 2: Median and p95 closing time during the week, by postal code, city, and state triplet.
md.df_to_csv(df2_median, save_path, "q2_median_closing")
md.df_to_csv(df2_p95, save_path, "q2_p95_closing")

# Question 3: The number of businesses that are open past 21:00, by city and state pair.
md.df_to_csv(df_past_nine, save_path, "q3_open_late")

# Question 4: For each postal code, city, and state triplet, the business with the highest number of “cool” review votes that are not open on Sunday.
md.df_to_csv(df_cool, save_path, "q4_cool_votes")

Same operation is done again but this time files will be saved as json


In [34]:
# To save the results of each question in one Avro file per question, 
# the dataframe showing the median and p95 of question 1 and 2 will be combined in one before converting those to json.
df_q1 = df1_median.join(df1_p95, ["postal_code", "city", "state"])
df_q2 = df2_median.join(df2_p95, ["postal_code", "city", "state"])

# Question 1: Median and p95 opening time during the week, by postal code, city, and state triplet.
md.df_to_json(df_q1, save_path, "q1_opening")

# Question 2: Median and p95 closing time during the week, by postal code, city, and state triplet.
md.df_to_json(df_q2, save_path, "q2_closing")

# Question 3: The number of businesses that are open past 21:00, by city and state pair.
md.df_to_json(df_past_nine, save_path, "q3_open_late")

# Question 4: For each postal code, city, and state triplet, the business with the highest number of “cool” review votes that are not open on Sunday.
md.df_to_json(df_cool, save_path, "q4_cool_votes")