# Big Data HW 2
## Ahhyun Moon - am12180@nyu.edu

In [1]:
import os
import pyspark

conf = pyspark.SparkConf()
conf = conf.setAppName("<my-app-name>")
conf.set('spark.ui.proxyBase', '/user/' + os.environ['JUPYTERHUB_USER'] + '/proxy/4040') ## to setup SPARK UI
conf = conf.set('spark.jars', os.environ['GRAPHFRAMES_PATH']) ## graphframes in spark configuration
sc = pyspark.SparkContext(conf=conf)
spark = pyspark.SQLContext(sc)
spark

24/10/26 00:23:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


<pyspark.sql.context.SQLContext at 0x7b10c40df390>

In [2]:
# Import relevant functinos from Pyspark SQL Library
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType, DoubleType
from pyspark.sql.window import Window

## Question 1

In [13]:
# Prepare bakery data frame
bakeryData = spark\
  .read\
  .option("inferSchema", "true")\
  .option("header", "true")\
  .csv("./data/Bakery.csv") 
# bakeryData.show()

#### Showing best-selling items for EVERY Monday (7 - 11 am)

In [14]:
# Update data frame with necessary columns
# Add weekday column based on date, 
# Add hour-period based on time,
# Change transaction data type to Int.
bakeryData1 = bakeryData.withColumn("weekday", F.date_format("date", "EEEE")) \
                        .withColumn("hour-period", F.date_format("Time", "h").cast(IntegerType())) \
                        .withColumn("transaction", bakeryData["transaction"].cast(IntegerType()))

# Filter transactions between 7 AM and 11:59 AM on Mondays
filtered_data = bakeryData1.filter((F.col("hour-period").between(7, 11)) &(F.col("weekday") == "Monday"))

# Perform the grouping and aggregation for each item and hour
grouped_bakeryData1 = filtered_data.groupBy("date","weekday", "hour-period", "item") \
                                   .agg(F.sum("Transaction").alias("quantity"))

# Use window function to get the highest selling item per hour
window_spec = Window.partitionBy("date","weekday","hour-period").orderBy(F.desc("quantity"))
ranked_data = grouped_bakeryData1.withColumn("rank", F.row_number().over(window_spec))

# Filter to get only the highest-selling item per hour and order the results by Date and Hour
top_selling_items = ranked_data.filter(F.col("rank") == 1).orderBy("date")

# Select the required cols and show results
print("Show the highest selling item for Mondays, per hour, for the 7AM to 11AM hours. Note that 'weekday', 'period' have to be computed.")
top_selling_items.select("item", "quantity", "weekday", "date", "hour-period").show()

Show the highest selling item for Mondays, per hour, for the 7AM to 11AM hours. Note that 'weekday', 'period' have to be computed.
+---------+--------+-------+----------+-----------+
|     item|quantity|weekday|      date|hour-period|
+---------+--------+-------+----------+-----------+
|    Bread|     165| Monday|2016-10-31|          8|
|   Coffee|    1008| Monday|2016-10-31|          9|
|   Coffee|    1060| Monday|2016-10-31|         10|
|   Coffee|    1587| Monday|2016-10-31|         11|
|   Pastry|     739| Monday|2016-11-07|          8|
|      Tea|    2237| Monday|2016-11-07|          9|
|   Coffee|    5273| Monday|2016-11-07|         10|
|   Coffee|    7668| Monday|2016-11-07|         11|
|   Coffee|    1275| Monday|2016-11-14|          7|
|Medialuna|    2555| Monday|2016-11-14|          8|
|   Coffee|    6404| Monday|2016-11-14|          9|
|   Coffee|    6459| Monday|2016-11-14|         10|
|    Bread|    6493| Monday|2016-11-14|         11|
|   Coffee|    1800| Monday|2016-11-2

#### Showing best-selling items for all Mondays (7 - 11 am) 
Added this solution just in case for different interpretation of the question.

In [15]:
# Perform the grouping and aggregation for each item and hour
grouped_bakeryData1_2 = filtered_data.groupBy("weekday", "hour-period", "item") \
                                   .agg(F.sum("Transaction").alias("quantity"))

# Use window function to get the highest selling item per hour
window_spec_2 = Window.partitionBy("weekday","hour-period").orderBy(F.desc("quantity"))
ranked_data_2 = grouped_bakeryData1_2.withColumn("rank", F.row_number().over(window_spec_2))

# Filter to get only the highest-selling item per hour and order the results by Date and Hour
top_selling_items_2 = ranked_data_2.filter(F.col("rank") == 1).orderBy("hour-period")

# Select the required cols and show results
print("Show the highest selling item for Mondays, per hour, for the 7AM to 11AM hours. Note that 'weekday', 'period' have to be computed.")
top_selling_items_2.select("item", "quantity", "weekday", "hour-period").show()

Show the highest selling item for Mondays, per hour, for the 7AM to 11AM hours. Note that 'weekday', 'period' have to be computed.
+------+--------+-------+-----------+
|  item|quantity|weekday|hour-period|
+------+--------+-------+-----------+
|Coffee|    5292| Monday|          7|
|Coffee|  129124| Monday|          8|
|Coffee|  293503| Monday|          9|
|Coffee|  543844| Monday|         10|
|Coffee|  523336| Monday|         11|
+------+--------+-------+-----------+



### Question 2

#### Show the top 2 (by qty) items bought by Daypart, by DayType.

In [16]:
# Add Weekday column based on the Date value
# Add Hour-period column based on the Time value
# Change transaction data type to Int.
bakeryData2 = bakeryData.withColumn("Weekday", F.date_format("date", "EEEE")) \
                        .withColumn("Hour-period", F.date_format("Time", "h").cast(IntegerType())) \
                        .withColumn("Transaction", bakeryData["Transaction"].cast(IntegerType()))

# Add DayType by categorizing Weekday into Weekday or Weekends
bakeryData2 = bakeryData2.withColumn("DayType", 
                                     F.when(F.col("Weekday").isin("Saturday", "Sunday"), "Weekend")\
                                     .otherwise("Weekday"))

# Add DayPart by categorizing Hour-period in to Breakfast, Lunch, and Dinner
bakeryData2 = bakeryData2.withColumn("DayPart", 
                                     F.when((F.col("Hour-period").between(6, 10)), "Breakfast")\
                                     .when((F.col("Hour-period").between(11, 15)), "Lunch")\
                                     .otherwise("Dinner"))

# Group by same DayType, DayPart, Item and aggregate transaction as total sum 
grouped_bakeryData2 = bakeryData2.groupBy("DayType", "DayPart", "Item")\
                                    .agg(F.sum("Transaction").alias("quantity"))

# Create a window specification to order items within each DayType and Daypart
window_spec_q2 = Window.partitionBy("DayType", "DayPart").orderBy(F.desc("quantity"))

# Give row number for ranking 
ranked_bakeryData_q2 = grouped_bakeryData2.withColumn("rank", F.row_number().over(window_spec_q2))

# Filter only top 2 items in the ranking
top2_items = ranked_bakeryData_q2.filter(F.col("rank") <= 2)

# Put top 2 items into a single column
grouped_top2_items = top2_items.groupBy("DayType", "DayPart")\
                                .agg(F.collect_list(F.col("Item")).alias("top2"))
# Show result
grouped_top2_items.show()


+-------+---------+---------------+
|DayType|  DayPart|           top2|
+-------+---------+---------------+
|Weekday|Breakfast|[Coffee, Bread]|
|Weekday|   Dinner|[Coffee, Bread]|
|Weekday|    Lunch|[Coffee, Bread]|
|Weekend|Breakfast|[Coffee, Bread]|
|Weekend|   Dinner|[Coffee, Bread]|
|Weekend|    Lunch|[Coffee, Bread]|
+-------+---------+---------------+



### Question 3

#### Show the number of entities by “fields.rpt_area_desc”

In [17]:
# Read json data
restaurantData = spark.read.json("./data/Restaurants_in_Durham_County_NC.json") 

# Group data by fields.rpt_area_desc and get the total count
restaurantData_entity = restaurantData.groupBy(F.col("fields.rpt_area_desc"))\
                                        .agg(F.count("*").alias("count"))
# Show result
restaurantData_entity.show(truncate=False)

+---------------------+-----+
|rpt_area_desc        |count|
+---------------------+-----+
|Bed&Breakfast Home   |4    |
|Summer Camps         |4    |
|Institutions         |30   |
|Local Confinement    |2    |
|Mobile Food          |147  |
|School Buildings     |89   |
|Summer Food          |242  |
|Swimming Pools       |420  |
|Day Care             |173  |
|Tattoo Establishments|36   |
|Residential Care     |154  |
|Bed&Breakfast Inn    |2    |
|Adult Day Care       |5    |
|Lodging              |62   |
|Food Service         |1093 |
+---------------------+-----+



### Question 4

#### Show the country or region with the biggest percentage increase in population AND the country with biggest percentage decrease in population, between the years 1990 and 2000. Use only the countries, not ‘World’.

In [18]:
# Read the csv data
populationData = spark\
  .read\
  .option("header", "true")\
  .option("inferSchema", "true")\
  .csv("./data/populationbycountry19802010millions.csv") 

# Rename the first column (unnamed) to Country
# Change the population (millions) of 1990 and 2000 to float
populationData = populationData.withColumnRenamed("_c0", "Country")\
                                .withColumn("1990", F.col("1990").cast("float"))\
                                .withColumn("2000", F.col("2000").cast("float"))\
                                
# Filter out the row with World population
# Drop rows where either '1990' or '2000' columns are null (not numeric e.g. NULL NA or --)
cleaned_populationData = populationData.filter(F.col("Country") != "World")\
                                        .na.drop(subset=["1990", "2000"])

# Add a column with the value of the percentage change between 1990 and 2000
growth_rate = cleaned_populationData.withColumn("Change_%", (F.col("2000") - F.col("1990")) / F.col("1990") * 100)

# Order by change % in descending order to find max
# Order by change % in ascending order to find min
max_growth = growth_rate.select("Country","Change_%").orderBy(F.desc("Change_%")).limit(1)
min_growth = growth_rate.select("Country","Change_%").orderBy(F.asc("Change_%")).limit(1)

# Show both results as one table
(max_growth.union(min_growth)).show()

+--------------------+-----------------+
|             Country|         Change_%|
+--------------------+-----------------+
|United Arab Emirates|76.27926665641841|
|          Montserrat|-63.1873277639145|
+--------------------+-----------------+



24/10/26 00:26:35 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , 1990, 2000
 Schema: _c0, 1990, 2000
Expected: _c0 but found: 
CSV file: file:///home/jovyan/am12180/am12180-hw2/data/populationbycountry19802010millions.csv
24/10/26 00:26:35 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , 1990, 2000
 Schema: _c0, 1990, 2000
Expected: _c0 but found: 
CSV file: file:///home/jovyan/am12180/am12180-hw2/data/populationbycountry19802010millions.csv


### Question 5

In [19]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, RegexTokenizer
import pyspark.sql.functions as F
from pyspark.ml.feature import Tokenizer, NGram

hw1Data = spark.read.text("./data/hw1text/*.txt")

# Set up a tokenizer to split each row of texts into words
regex_tokenizer = RegexTokenizer(inputCol="value", outputCol="words",
                                 pattern="\\W",  # Split by non-word characters (punctuation, spaces, etc.)
                                 toLowercase=True ) # Convert text to lowercase
words = regex_tokenizer.transform(hw1Data)

# Create unigram transformer and apply it to words
unigram = NGram(n=1, inputCol="words", outputCol="unigram")
unigram_data = unigram.transform(words)

# Use explode function to transform the unigrams column to rows
unigram_exploded = unigram_data.withColumn("word", F.explode(F.col("unigram")))

# Group by word for total counts
unigram_count = unigram_exploded.groupBy("word").count().orderBy(F.col("count").desc())

# Show result of top 10 counts
unigram_count.show(10)



+----+------+
|word| count|
+----+------+
| the|163547|
|  to| 89046|
|   p| 78664|
|  of| 75568|
| and| 72730|
|  in| 56782|
|   a| 53198|
| for| 29770|
|that| 28852|
|  is| 27601|
+----+------+
only showing top 10 rows



                                                                                

### Question 6

In [20]:
# Create bigram transformer and apply it to words
bigram = NGram(n=2, inputCol="words", outputCol="bigrams")
bigram_data = bigram.transform(words)

# Use explode function to transform the bigram column to rows
bigram_exploded = bigram_data.withColumn("bigram", F.explode(F.col("bigrams")))

# Group by word for total counts
bigram_count = bigram_exploded.groupBy("bigram").count().orderBy(F.col("count").desc())

# Show result of top 10 counts
bigram_count.show(10)

                                                                                

+--------+-----+
|  bigram|count|
+--------+-----+
|  of the|17484|
|  in the|12808|
|   p the|10363|
|covid 19| 8762|
|  to the| 8372|
| for the| 5588|
|     n t| 5393|
|  on the| 5032|
|   to be| 4581|
| will be| 4177|
+--------+-----+
only showing top 10 rows



### Question 7

#### a) Find food service and active restaurants (“status” = “ACTIVE” and “"rpt_area_desc" = "Food Service”) closest to the following coordinate: of 35.994914, -78.897133, and show it.

In [21]:
# Use haversine lib to calculate distance
from haversine import haversine, Unit

# Given coordinates
coords = (35.994914, -78.897133)

# Distance function that takes in coordinates,
# and calculates the distance to the coords given
def distance(coordinates):
    lat = coordinates[1]
    long = coordinates[0]
    return haversine(coords, (lat, long), unit=Unit.MILES)

# Set distance function as a UDF
distance_udf = F.udf(distance, DoubleType())

# Filter only Active Food Services with Coordinates
active_restaurant = restaurantData.filter(
                    (F.col("fields.status") == "ACTIVE") & 
                    (F.col("fields.rpt_area_desc") == "Food Service") &
                    (F.col("geometry.coordinates").isNotNull()))

# Add distance column with its value calculated from udf
# Order by closest - farthest
restaurant_with_distance = active_restaurant.withColumn("distance", distance_udf(F.col("geometry.coordinates")))                     

# Show closest restaurant result
closest_restaurant = restaurant_with_distance.orderBy(F.col("distance"))\
                            .select("fields.premise_name", "fields.status", 
                                    "fields.rpt_area_desc","geometry.coordinates", 
                                    "distance")\
                            .limit(1)
closest_restaurant.show(truncate=False)

+------------------------+------+-------------+-------------------------+-------------------+
|premise_name            |status|rpt_area_desc|coordinates              |distance           |
+------------------------+------+-------------+-------------------------+-------------------+
|OLD HAVANA SANDWICH SHOP|ACTIVE|Food Service |[-78.8981331, 35.9932826]|0.12582219684232088|
+------------------------+------+-------------+-------------------------+-------------------+



                                                                                

#### b) With that restaurant in (a) as your center point, find the number of foreclosures within a 1 mile radius

In [12]:
# Read foreclosure data
foreclosures = spark.read.json("./data/durham-nc-foreclosure-2006-2016.json") 
# Closest restaurant's coordinates from 7(a)
a_coords = tuple(closest_restaurant.collect()[0].coordinates)

# Distance function that takes in coordinates,
# and calculates the distance to the closest restaurant's coords
def b_distance(coordinates):
    lat = coordinates[0]
    long = coordinates[1]
    return haversine(a_coords, (lat, long), unit=Unit.MILES)
b_udf = F.udf(b_distance, DoubleType())

# Filter out rows with no coordinates
foreclosures_filtered = foreclosures.filter(F.col("geometry.coordinates").isNotNull())

# Add distance column and filter <= 1 mile
foreclosure_with_distance = foreclosures_filtered.withColumn("distance", b_udf(F.col("geometry.coordinates")))\
                                                    .filter(F.col("distance") <= 1)

# Count the number of distinct foreclosures within the radius
number_of_foreclosures = foreclosure_with_distance.select("fields.parcel_number","fields.address","distance")\
                                                    .distinct().count()

# Print the result
print(f"Coords of closest restaurant: {a_coords}")
print(f"Number of foreclosures within 1 mile radius: {number_of_foreclosures}")
foreclosure_with_distance.select("fields.parcel_number","fields.address", "geometry.coordinates", "distance").show(truncate=True)

                                                                                

Coords of closest restaurant: (-78.8981331, 35.9932826)
Number of foreclosures within 1 mile radius: 283
+-------------+--------------------+--------------------+-------------------+
|parcel_number|             address|         coordinates|           distance|
+-------------+--------------------+--------------------+-------------------+
|       110138|217 E CORPORATION ST|[-78.8922549, 36....|0.42018153731654434|
|       110535|      401 N QUEEN ST|[-78.895396, 35.9...| 0.1920521426225851|
|       110536|      403 N QUEEN ST|[-78.8950321, 35....| 0.2161257772521711|
|       111324|      918 GILBERT ST|[-78.8873774, 35....| 0.7438842496954408|
|       111399|      721 LIBERTY ST|[-78.888343, 35.9...| 0.6764401088260837|
|       111426|      729 HOPKINS ST|[-78.888092, 35.9...| 0.6939319675911833|
|       112166|      1302 E MAIN ST|[-78.886681, 35.9...| 0.7962788643952043|
|       115974|       209 NELSON ST|[-78.9041979, 35....| 0.4984384851724365|
|       116301|    2721 ATLANTIC ST|[