In [0]:
# run if haversine not already installed
# %pip install haversine 

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, collect_list, count, dayofweek, explode, hour, lit, lower, regexp_replace, row_number, size, split, sum, udf
from pyspark.ml.feature import NGram
from pyspark.sql.types import StringType, FloatType
from pyspark.sql.window import Window
from haversine import haversine as hs

spark = SparkSession.builder.appName("dh3382-hw2").getOrCreate()
BAKERY_PATH = '/FileStore/tables/BreadBasket_DMS.csv'
POP_PATH = '/FileStore/tables/populationbycountry19802010millions.csv'
REST_PATH = '/FileStore/tables/Restaurants_in_Durham_County_NC.csv'
REST_FORECLOSED_PATH = '/FileStore/tables/durham_nc_foreclosure_2006_2016.json'
WORD_DIR_PATH = '/FileStore/tables/hw1text/'

In [0]:
# read and clean bakery data
bakery_data = spark.read.options(header=True, inferSchema=True).csv(BAKERY_PATH)

# remove NONE entries
bakery_data = bakery_data.filter(col('Item') != 'NONE')

In [0]:
##### QUESTION 1: Show the total number bought by item, per day, between 11AM and 1PM #####

# filter out all transactions outside of specified time range (11:00-13:00 inclusive)
bakery_data_lunch = bakery_data.filter(col('Time').between('11:00:00', '13:00:00'))

# group by Item and Day purchased, then get total count of items per day purchased between 11:00-13:00
bakery_q1_res = bakery_data_lunch.groupBy(col('Item'), col('Date')).count().withColumnRenamed('count', 'Quantity')

# DF with answer to question 1
bakery_q1_res.show()

+-----------------+----------+--------+
|             Item|      Date|Quantity|
+-----------------+----------+--------+
|         Focaccia|2016-11-03|       1|
|          Tartine|2016-11-04|       1|
|            Bread|2016-12-13|       4|
|           Coffee|2017-01-05|       7|
|     Scandinavian|2017-01-20|       1|
|         Art Tray|2017-01-24|       1|
|            Bread|2017-03-22|       6|
| Coffee granules |2017-03-25|       1|
|           Muffin|2016-11-14|       1|
|           Coffee|2016-11-24|       7|
|Gingerbread syrup|2016-12-21|       1|
|           Coffee|2017-01-06|       8|
|     Scandinavian|2017-01-07|       3|
|            Bread|2017-01-11|       6|
|           Coffee|2017-02-01|       4|
|     Chicken Stew|2017-02-08|       1|
|    Hot chocolate|2017-02-09|       1|
|       Farm House|2017-02-12|       1|
|          Brownie|2017-02-13|       1|
|            Bread|2017-02-27|       5|
+-----------------+----------+--------+
only showing top 20 rows



In [0]:
##### QUESTION 2: Show the top 3 (by qty) items bought by Daypart, by DayType #####
#udf to define weekend/weekday
def weekend(is_weekend):
    if is_weekend:
        return "Weekend"
    else:
        return "Weekday"

weekend_udf = udf(lambda is_weekend: weekend(is_weekend), StringType() )


# udf to define part of day
def day_part(date):
    if (date.hour >= 7) & (date.hour < 12):
        return "Morning"
    elif date.hour < 17:
        return "Afternoon"
    else:
        return "Night"

day_part_udf = udf(lambda hour: day_part(hour), StringType() )

In [0]:
# use weekend udf to add Daytype column, pass in boolean with sparkSQL dayofweek and isin functions
bakery_weekend = bakery_data.withColumn('Daytype', weekend_udf(dayofweek(col('Date') ).isin([1, 7]) ) )

# use day_part udf to add Daypart column
bakery_daypart = bakery_weekend.withColumn('Daypart', day_part_udf(col('Time') ) )

#group by Daypart and Daytype and count purchases
bakery_purchases = bakery_daypart.groupBy(col('Daypart'), col("Daytype"), col('Item') ).count().withColumnRenamed('count', 'Purchases')

# partition by Daypart, Daytype and find top 3 Purchases for each Daypart/Daytime combination
windowBakery = Window.partitionBy('Daypart', 'Daytype').orderBy(col('Purchases').desc() )
bakery_top_purchases = bakery_purchases.withColumn("row",row_number()\
  .over(windowBakery) ) \
  .filter(col("row") <= 3) \

# Collect all items into single row grouped by Daypart/Daytype
bakery_q2_res = bakery_top_purchases.groupBy(col('Daypart'), col("Daytype") ).agg(collect_list('Item').alias('Top_3_Items') )

# Select columns in order Daypart/Top_3_Items/Daytype for cleaner output format
bakery_q2_res.select(col('Daypart'), col('Top_3_Items'), col('Daytype') ).show(truncate=False)


+---------+------------------------------------------+-------+
|Daypart  |Top_3_Items                               |Daytype|
+---------+------------------------------------------+-------+
|Afternoon|[Coffee, Bread, Tea]                      |Weekday|
|Afternoon|[Coffee, Bread, Tea]                      |Weekend|
|Morning  |[Coffee, Bread, Pastry]                   |Weekday|
|Morning  |[Coffee, Bread, Pastry]                   |Weekend|
|Night    |[Coffee, Bread, Tea]                      |Weekday|
|Night    |[Coffee, Tshirt, Afternoon with the baker]|Weekend|
+---------+------------------------------------------+-------+



In [0]:
##### QUESTION 3: The total number of entities by “rpt_area_desc” #####
# set European style csv delimiter (;)
rest_data_raw = spark.read.options(header=True, delimiter=';', inferSchema=True)\
    .csv(REST_PATH)

In [0]:
# group by rpt area desc, aggregate count as 'Total', then sort in descending order and show top 3
rest_q3_res = rest_data_raw.groupBy(col('Rpt_Area_Desc') )\
    .agg(count('*').alias('Total') )\
    .sort(col('Total'), ascending=False)\

rest_q3_res.show(3)

+--------------+-----+
| Rpt_Area_Desc|Total|
+--------------+-----+
|  Food Service| 1093|
|Swimming Pools|  420|
|   Summer Food|  242|
+--------------+-----+
only showing top 3 rows



In [0]:
#####  QUESTION 4: Show the top 10 regions with the biggest percentage decrease in population, for the years 1990-2000 #####

pop_data_raw = spark.read.options(header=True, inferSchema=True)\
    .csv(POP_PATH)

# first column name is Null in file, find col name through columns attribute and rename first column to region
cols = pop_data_raw.columns 
pop_data_clean_header = pop_data_raw.select(col(cols[0]).alias('Region'), col('1990'), col('2000') )

# remove regions with no population data (e.g. Antarctica, Wake Island, Croatia, Former U.S.S.R.)
pop_data_clean_pop_nums = pop_data_clean_header.filter((col('1990')  != 'NA') &  (col('2000')  != 'NA') )
pop_data_clean_pop_nums = pop_data_clean_pop_nums.filter((col('1990')  != '--') &  (col('2000')  != '--') )

# remove leftover aggregate regions
pop_data_clean_regions = pop_data_clean_pop_nums\
    .filter((col('Region') != 'World') & (col('Region') != 'North America') & (col('Region') != 'Central & South America') & (col('Region') != 'Eurasia') & (col('Region') != 'Western Sahara') & (col('Region') != 'Asia & Oceania') )

In [0]:
# calculate gross increase
pop_decrease_gross = pop_data_clean_regions.withColumn('gross_increase', col('2000') - col('1990') )

# remove regions with positive population growth
pop_decrease_gross = pop_decrease_gross.filter(col('gross_increase') <= 0)

# calculate percentage of growth increase
pop_decrease_perc = pop_decrease_gross.withColumn('perc_increase', col('gross_increase')/col('1990') )

# create DF sorted in ascending order of pop increase, only contains Region and perc_increase 
pop_q4_res = pop_decrease_perc.sort(col('perc_increase') )\
    .select(col('Region'), col('perc_increase') )

# show top 10 results
pop_q4_res.show(10)

+-------------+--------------------+
|       Region|       perc_increase|
+-------------+--------------------+
|   Montserrat| -0.6318732525629077|
|     Bulgaria|-0.12092718374010437|
| Cook Islands|-0.11310494834148986|
| Sierra Leone|-0.09912965328035572|
|       Kuwait|-0.07841983884671756|
|    Gibraltar| -0.0632085194091378|
|Faroe Islands|-0.03478077571669489|
|      Albania|-0.02668162333239...|
|      Hungary|-0.02164024265610...|
|      Romania|-0.01830669620112...|
+-------------+--------------------+
only showing top 10 rows



In [0]:
##### QUESTION 5 #####
"""
Do word count exercise using pyspark. Ignore punctuation and normalize to lower case. Replace characters NOT in this set: [0-9a-z] with space.
"""
word_df = spark.read.text(WORD_DIR_PATH)

# normalize to lower case
word_df_lower = word_df.select(lower(col('value')).alias('words') )

# replace punctuation with space
word_df_parsed = word_df_lower.withColumn('words', regexp_replace('words', '[^a-z0-9]', ' ') )

In [0]:
# explode and overwrite words column to create row for each word
word_df_explode = word_df_parsed.withColumn('words', explode(split('words', " +") ) )

# group by words and count each occurence of each word
word_q5_res = word_df_explode.groupBy(col('words') ).count()

# show result
word_q5_res.show()

+------------+-----+
|       words|count|
+------------+-----+
|       trail|   57|
|       those| 3409|
|    medicare|   32|
|        some| 4335|
|         few| 1057|
|   connected|  162|
| herzegovina|    7|
|   involving|   99|
|    randomly|   10|
|     clinics|   78|
|       still| 2139|
| transmitted|   92|
|      travel| 1367|
|vicissitudes|    1|
|      online| 1357|
|         wto|   17|
|  paramedics|   26|
|          07|   90|
|   traveling|   97|
|   recognize|   66|
+------------+-----+
only showing top 20 rows



In [0]:
##### QUESTION 6: Find the 10 most common bigrams #####

# convert strings to arrays of strings for NGram transformation
word_df_str_arrays = word_df_parsed.withColumn('words', split('words', ' +') )

# transform word_df to bigram_df using NGram feature
bigram = NGram(n=2).setInputCol('words').setOutputCol('bigrams')
bigram_df = bigram.transform(word_df_str_arrays)

# remove unnecessary words column
bigram_df = bigram_df.select(col('bigrams') )

# explode bigrams so each has own separate row
bigram_df = bigram_df.withColumn('bigrams', explode(col('bigrams') ) )

# group by bigram and count occurences
bigram_df = bigram_df.groupBy(col('bigrams') ).count()

# sort on bigram count for q5 res
bigram_q5_res = bigram_df.sort('count', ascending=False)

# show top 10 bigrams
bigram_q5_res.show(10)

+--------+-----+
| bigrams|count|
+--------+-----+
|  of the|17484|
|  in the|12808|
|   p the|10363|
|covid 19| 8762|
|  to the| 8372|
| for the| 5588|
|     n t| 5393|
|  on the| 5032|
|   to be| 4581|
| will be| 4177|
+--------+-----+
only showing top 10 rows



In [0]:
##### Question 7 #####
"""
a) Find the food service and active restaurant closest to the following coordinates: 35.994914, -78.897133
b) With that restaurant as your center point, find the number of foreclosures within a 1 mile radius
"""
# user function that accepts 2 strings of coordinates and outputs float distance in miles for part a
def hs_with_str(coord_str1, coord_str2):
    # convert coordinate strings to float tuples
    coord_tup1 = tuple(map(float, coord_str1.split(', ') ) )
    coord_tup2 = tuple(map(float, coord_str2.split(', ') ) )

    # plug float tuples intro haversine function, specify unit as miles
    return hs(coord_tup1, coord_tup2, unit='mi')

# convert hs_with_str to udf
hs_with_str_udf = udf(lambda coord1, coord2: hs_with_str(coord1, coord2), FloatType() )

In [0]:
## PART A: Find the food service and active restaurant closest to the following coordinates: 35.994914, -78.897133

# origin coordinate string specified in prompt
ORIGIN_COORD_STR = '35.994914, -78.897133'

#filter dataset to include only active food service restaurants
rest_data_active_fs = rest_data_raw.filter(col('Status') == 'ACTIVE').filter(col('Rpt_Area_Desc') == 'Food Service')

# drop any restaurants with no geolocation data
rest_data_active_fs = rest_data_active_fs.na.drop(subset=['geolocation'])

# apply haversine UDF to create Distance(mi) column
rest_data_distance = rest_data_active_fs.withColumn('Distance(mi)', hs_with_str_udf(col('geolocation'), lit(ORIGIN_COORD_STR) ) )

# sort on Distance(mi) for final result
rest_q7a_res = rest_data_distance.sort('Distance(mi)')

# show closest restaurant, select only relevant columns for cleaner output
rest_q7a_res\
    .select(col('Premise_Name'), col('Status'), col('Rpt_Area_Desc'), col('geolocation'), col('Distance(mi)') )\
    .show(1, truncate=False)

+------------------------+------+-------------+-----------------------+------------+
|Premise_Name            |Status|Rpt_Area_Desc|geolocation            |Distance(mi)|
+------------------------+------+-------------+-----------------------+------------+
|OLD HAVANA SANDWICH SHOP|ACTIVE|Food Service |35.9932826, -78.8981331|0.1258222   |
+------------------------+------+-------------+-----------------------+------------+
only showing top 1 row



In [0]:
## PART B: With that restaurant as your center point, find the number of foreclosures within a 1 mile radius

# save coordinates of restaurant from Q7a to variable COORD_STR
COORD_STR = rest_q7a_res.collect()[0]['geolocation']

# read in foreclosure dataset
fc_rest_raw = spark.read.json(REST_FORECLOSED_PATH)

# select only geocode subfield and discard unnecessary data
fc_rest_coords = fc_rest_raw.select(col('fields').getItem('geocode').alias('geocodes') )

# remove entries with null geolocation values
fc_rest_coords = fc_rest_coords.na.drop()

# cast geocode column to string
fc_coords_str = fc_rest_coords.withColumn('geocodes', col('geocodes').cast(StringType() ) )

# remove brackets
fc_coords_str = fc_coords_str.withColumn('geocodes', regexp_replace('geocodes', "\[", "") )
fc_coords_str = fc_coords_str.withColumn('geocodes', regexp_replace('geocodes', "\]", "") )

# calculate Distance(mi) column using hs_with_str_udf
fc_coords_distance = fc_coords_str.withColumn('Distance(mi)', hs_with_str_udf(col('geocodes'), lit(COORD_STR) ) )

#filter out all restaurants more than 1 mile from center point
fc_coords_q7b_res = fc_coords_distance.filter(col('Distance(mi)') <= 1)

# count and print umber of foreclosures within 1 mile of center point
print("RESULT")
print('Number of foreclosures within 1 mile of center point: %d' % fc_coords_q7b_res.count() )