# Mapping External Data to Site Location

This notebook integrates crash data with precipitation information through a series of transformations and joins. First, it maps sites to counties via zip codes, then joins site/county mapping with crash data while simultaneously connecting site data with precipitation measurements. Finally, it aggregates this combined data by site/month and site ID/date to calculate key metrics including crash statistics (fatalities, injuries) and precipitation patterns (mean, median, standard deviation).

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import DoubleType, BooleanType
from shapely.geometry import shape
import geopandas as gpd
import json
import os
import re
from pyspark.sql.functions import to_date

from pyspark.sql.functions import col, format_string,length, expr, when, array, struct, udf, explode, split, array_intersect, size, lit, year, month, sum, count, collect_set,  radians, sin, cos, asin, sqrt, lit
from pyspark.sql.types import ArrayType, StructType, StructField, StringType
from functools import partial
import pandas as pd
from pyspark.sql.functions import col
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, lpad, lower, expr, substring
from pyspark.sql.functions import col, explode, mean, stddev, min, max, expr




In [0]:
# Setting up Spark Session
spark = SparkSession.builder.appName("Data to Site Location Mapping").getOrCreate()

## Workflow

**Cartesian Product of Sites and Months:**
* The pipeline begins by creating a Cartesian product of site data and months. The months_df DataFrame, which contains a range of months from 2023 to 2025, is cross-joined with the site_data_df DataFrame. This results in a new row for every combination of site and month, preparing the data for later joins and aggregations.

**Exploding Site Data:**
* The site_data_df is then "exploded" to create a new DataFrame where each row represents a single zip code associated with a specific site. This step is crucial for later joins with both precipitation data and crash data, as a site may cover multiple zip codes.

**Joining Site Data with Crash Data:**
* Mapping Sites to Counties: The exploded site data is joined with the crash data to establish a relationship between each site and the counties it covers based on zip codes.
* County-Crash Join: The site-to-county mapping is then joined with the crash data (unique_county_crashes) to associate each site-month combination with the corresponding crash statistics.

**Aggregating Crash Data:**
Once the site and crash data are joined, the pipeline aggregates the data by site and month. Aggregation includes calculating various crash-related statistics, such as:
* Total crashes
* Fatalities
* Injuries
* Vehicles involved


**Precipitation Data Transformation**
* Unpivoting Precipitation Data: Precipitation data, which I saved a wide format with daily columns, is transformed into a long format using the stack() function. This unpivoting process makes it easier to analyze precipitation data by date.
* Handling Precipitation Data by Year: Separate precipitation DataFrames for the years 2023, 2024, and 2025 are processed and then combined into one unified DataFrame, preparing the precipitation data for integration with site data.

**Joining Site Data with Precipitation Data:**

* Zip Code Mapping: A distinct mapping between site IDs and their zip codes is created. This mapping will be used to link site locations with the corresponding precipitation data.
* Joining with Precipitation Data: The site data is joined with the precipitation data based on zip codes using an inner join. This ensures that only zip codes with corresponding precipitation data are retained.

**Aggregating Precipitation Data:**
* After the join, the precipitation data is aggregated by site ID and date to calculate key precipitation statistics:
  * Mean precipitation
  * Median precipitation
  * Standard deviation of precipitation
  * Interquartile range (IQR) of precipitation


## Reading in Data and Basic Preprocessing

In [0]:
# read site data f"/FileStore/tables/fusionsite_regional_df.csv")
site_data_df = spark.read.csv("/FileStore/tables/fusionsite_regional_df.csv", header=True)

In [0]:
# Turn Zip into a string and make sure its forward filled up to five places so 8057 would become "08057"
site_data_df = site_data_df.withColumn("Zip", format_string("%05d", col("Zip").cast("int")))

In [0]:
state_crash_monthly_county_counts = spark.read.csv("/FileStore/tables/state_crash_monthly_county_counts.csv", header=True)

files = os.listdir("/dbfs/FileStore/intermediate_output")
table_names = [file for file in files if re.search(r"\d{5}$", file)]

# Filter table names to only be zips in site_data_df Zip column
table_names = [table_name for table_name in table_names if table_name in site_data_df.select("Zip").rdd.flatMap(lambda x: x).collect()]

dataframes = {
    table_name: spark.read.option("header", "false").csv(f"/FileStore/intermediate_output/{table_name}")
    for table_name in table_names
}

# Read the second CSV file
us_city_states = spark.read.csv("/FileStore/tables/uscities.csv", header=True)

# load the 2023 precipitation data parquet 
precipitation_data_2023 = spark.read.parquet("/FileStore/intermediate_output/2023_precipitation_data")
precipitation_data_2024 = spark.read.parquet("/FileStore/intermediate_output/2024_precipitation_data")
precipitation_data_2025 = spark.read.parquet("/FileStore/intermediate_output/2025_precipitation_data")

In [0]:
# Rename State3 to State and State26 to State_ID
site_data_df = site_data_df.withColumnRenamed("State3", "State") 
site_data_df = site_data_df.withColumnRenamed("State26", "State_ID")

site_data_df = site_data_df.withColumnRenamed("City4", "City") 
site_data_df = site_data_df.withColumnRenamed("City25", "Full_City_Name")

In [0]:
# elect only the required columns
site_data_df = site_data_df.select("ID", "Brand", "Region", "State", "City", "Address",
            "Latitude", "Longitude", "County", "Zip", "Country")

In [0]:
# rename the columns in each dataframe to be zip and distance 
for table_name, df in dataframes.items():
    dataframes[table_name] = df.withColumnRenamed("_c0", "zip").withColumnRenamed("_c1", "distance")

In [0]:
# Create a dictionary to store the zip codes for each key
zip_codes_dict = {}
for key, df in dataframes.items():
    zip_codes_dict[key] = df.select("zip").rdd.flatMap(lambda x: x).collect()

# Define a UDF that simply returns the value from the dictionary
def get_zip_codes(key):
    # Make sure the key is treated as a string
    str_key = str(key) if key is not None else ""
    return zip_codes_dict.get(str_key, [])

# Register the UDF with the correct return type
get_zip_codes_udf = udf(get_zip_codes, ArrayType(StringType()))

# Apply the UDF to the column
site_data_df = site_data_df.withColumn("Zip Codes", get_zip_codes_udf(col("Zip")))

In [0]:
site_data_df.printSchema()

root
 |-- ID: string (nullable = true)
 |-- Brand: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- State: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- County: string (nullable = true)
 |-- Zip: string (nullable = false)
 |-- Country: string (nullable = true)
 |-- Zip Codes: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [0]:
# Check if any rows are missing value for county
site_data_df.filter(col("County").isNull()).show()

+----+----------+-------------+--------------+---------+--------------------+--------+---------+------+-----+-------+--------------------+
|  ID|     Brand|       Region|         State|     City|             Address|Latitude|Longitude|County|  Zip|Country|           Zip Codes|
+----+----------+-------------+--------------+---------+--------------------+--------+---------+------+-----+-------+--------------------+
|52.0|A Sani-Can|South Central|North Carolina|Salisbury|1882 Briggs Rd Sa...|    NULL|     NULL|  NULL|28147|   NULL|[28144, 28159, 28...|
+----+----------+-------------+--------------+---------+--------------------+--------+---------+------+-----+-------+--------------------+



In [0]:
# Fill misisng county with Rowan County
site_data_df = site_data_df.withColumn("County", when(col("County").isNull(), "Rowan County").otherwise(col("County")))

In [0]:
site_data_df = site_data_df.withColumn(
    "County",
    expr("substring(County, 1, length(County) - 7)")
)

In [0]:
# Convert County Code column to numeric, handle errors, fill nulls, and format as 3-digit string
state_crash_monthly_county_counts = state_crash_monthly_county_counts \
    .withColumn("County Code", col("County Code").cast("double")) \
    .withColumn("County Code", when(col("County Code").isNull(), 0).otherwise(col("County Code"))) \
    .withColumn("County Code", lpad(col("County Code").cast("int").cast("string"), 3, "0")) \
    .withColumn("State", lower(col("State")))

# Extract first two digits of county fips for state fips and last 3 for county code
us_city_states = us_city_states \
    .withColumn("state_fips", substring(col("county_fips"), 1, 2)) \
    .withColumn("county_code", substring(col("county_fips"), -3, 3))

# Convert county_code to numeric, handle errors, fill nulls, and format as 3-digit string
us_city_states = us_city_states \
    .withColumn("county_code", col("county_code").cast("double")) \
    .withColumn("county_code", when(col("county_code").isNull(), 0).otherwise(col("county_code"))) \
    .withColumn("county_code", lpad(col("county_code").cast("int").cast("string"), 3, "0")) \
    .withColumn("state_id", lower(col("state_id")))

In [0]:
# Get unique country
county_data = us_city_states.select("county_code", "county_name", "state_id", "zips")

In [0]:
state_crash_monthly_county_counts = state_crash_monthly_county_counts.join(
    county_data,
    (state_crash_monthly_county_counts["County Code"]== county_data["county_code"]) & 
    (state_crash_monthly_county_counts["State"]== county_data["state_id"]),
    "left"
)

In [0]:
# Prepare crash data by extracting zip codes into an array
crash_df_with_zip_array = state_crash_monthly_county_counts.withColumn(
    "zip_array", split(col("zips"), " ")
)

In [0]:
months_df = spark.createDataFrame(
    [(2023, 1), (2023, 2), (2023, 3), (2023, 4), (2023, 5), (2023, 6),
     (2023, 7), (2023, 8), (2023, 9), (2023, 10), (2023, 11), (2023, 12),
     (2024, 1), (2024, 2), (2024, 3), (2024, 4), (2024, 5), (2024, 6),
     (2024, 7), (2024, 8), (2024, 9), (2024, 10), (2024, 11), (2024, 12),
     (2025, 1)],
    ["Crash Year", "Crash Month"]
)

# Create a cartesian product of sites and months
sites_months_df = site_data_df.crossJoin(months_df)

In [0]:
# Get unique crash data per county-month combination
unique_county_crashes = crash_df_with_zip_array.select(
    "Crash Year", 
    "Crash Month", 
    "County Code",
    "crash_count", 
    "total_fatalities", 
    "total_injuries", 
    "total_vehicles"
).distinct()

In [0]:
exploded_sites = site_data_df.select(
    "ID", "Brand", "State", explode("Zip Codes").alias("site_zip")
)

In [0]:
# Create a mapping between site IDs and the unique counties they cover
site_to_counties = exploded_sites.select(
    "ID", 
    "Brand",
    "site_zip"
).join(
    crash_df_with_zip_array.select(
        "County Code",
        explode("zip_array").alias("site_zip")
    ).distinct(),
    on="site_zip",
    how="inner"
).select(
    "ID", 
    "Brand", 
    "County Code"
).distinct()

In [0]:
# Now join the sites with their covered counties and then with crash data
site_county_crashes = sites_months_df.join(
    site_to_counties,
    on=["ID", "Brand"],
    how="inner"
).join(
    unique_county_crashes,
    on=["Crash Year", "Crash Month", "County Code"],
    how="inner"
)

# Aggregate to get totals per site-month
aggregated_site_radius_crash_df = site_county_crashes.groupBy(
    "ID", "Brand", "Crash Year", "Crash Month"
).agg(
    sum("crash_count").alias("total_crashes"),
    sum("total_fatalities").alias("total_fatalities"),
    sum("total_injuries").alias("total_injuries"),
    sum("total_vehicles").alias("total_vehicles")
)

In [0]:
aggregated_site_radius_crash_df.cache()

DataFrame[ID: string, Brand: string, Crash Year: bigint, Crash Month: bigint, total_crashes: double, total_fatalities: double, total_injuries: double, total_vehicles: double]

In [0]:
aggregated_site_radius_crash_df.count()

1175

In [0]:
display(aggregated_site_radius_crash_df.limit(5))

ID,Brand,Crash Year,Crash Month,total_crashes,total_fatalities,total_injuries,total_vehicles
9.0,Bullitt Septic Service,2023,1,2550.0,99.0,1144.0,5151.0
6.0,Georgia Container Inc./ Portable Toilets of Georgia,2023,7,2752.0,93.0,1365.0,5672.0
46.0,J Bar,2023,8,1135.0,37.0,585.0,2422.0
17.0,Ace Disposal,2023,9,3092.0,108.0,1582.0,6478.0
44.0,Forzas Site Services,2024,5,221.0,7.0,95.0,507.0


In [0]:
# Save aggregated_site_radius_crash_df as a csv
aggregated_site_radius_crash_df.write.csv(
    "/FileStore/intermediate_output/aggregated_site_radius_crash_df.csv",
    header=True,
    mode="overwrite"
    )

In [0]:
# Validation 
# Group by ID and count number of observations 
display(aggregated_site_radius_crash_df.groupBy("ID").count().orderBy("count", ascending=False))

ID,count
50.0,25
44.0,25
17.0,25
9.0,25
18.0,25
46.0,25
6.0,25
30.0,25
15.0,25
26.0,25


In [0]:
def transform_precipitation_df(precipitation_df, year):
    # Get all the precipitation column names 
    prism_cols = [col_name for col_name in precipitation_df.columns if col_name.startswith("PRISM_")]
    
    # Prepare for unpivoting: create a list of expressions to select
    stack_expr = []
    for prism_col in prism_cols:
        # Extract date from column name using regex
        date_str = re.search(r"_(\d{8})_", prism_col).group(1)
        year_str = date_str[0:4]
        month = date_str[4:6]
        day = date_str[6:8]
        
        # Add to stack expression
        stack_expr.append(f"'{year_str}-{month}-{day}', `{prism_col}`")
    
    # Create the unpivot expression
    stack_expr_str = ", ".join(stack_expr)
    unpivot_expr = f"stack({len(prism_cols)}, {stack_expr_str}) as (date_str, precipitation)"
    
    # Apply the unpivot
    precipitation_unpivoted = precipitation_df.selectExpr(
        "ZCTA5CE20", 
        "latitude", 
        "longitude",
        unpivot_expr
    ).withColumn("date", to_date("date_str", "yyyy-MM-dd"))
    
    return precipitation_unpivoted

# Process each year's data separately
precipitation_2023_unpivoted = transform_precipitation_df(precipitation_data_2023, 2023)
precipitation_2024_unpivoted = transform_precipitation_df(precipitation_data_2024, 2024)
precipitation_2025_unpivoted = transform_precipitation_df(precipitation_data_2025, 2025)

In [0]:
# Combine all three years as before
all_precipitation_unpivoted = precipitation_2023_unpivoted.union(
    precipitation_2024_unpivoted
).union(
    precipitation_2025_unpivoted
)

# Instead of exploding sites, create a distinct mapping between site IDs and zip codes
site_zip_mapping = site_data_df.select(
    "ID", "Brand", explode("Zip Codes").alias("site_zip")
).distinct()

# Find unique zip code to precipitation mappings
# This ensures we only use each (zip, date) precipitation value once
unique_precipitation = all_precipitation_unpivoted.select(
    "ZCTA5CE20", "date", "precipitation"
).distinct()

# Join using the mapping
joined_df = site_zip_mapping.join(
    unique_precipitation,
    site_zip_mapping["site_zip"] == unique_precipitation["ZCTA5CE20"],
    "inner"
)

# Aggregate statistics by site and date
agg_precipitation = joined_df.groupBy("ID", "date").agg(
    mean("precipitation").alias("mean_precipitation"),
    expr("percentile_approx(precipitation, 0.5)").alias("median_precipitation"),
    stddev("precipitation").alias("stddev_precipitation"),
    expr("percentile_approx(precipitation, 0.75)").alias("q3_precipitation"),
    expr("percentile_approx(precipitation, 0.25)").alias("q1_precipitation"),
    min("precipitation").alias("min_precipitation"),
    max("precipitation").alias("max_precipitation")
)

# Calculate IQR directly in the agg function
final_precipitation_stats = agg_precipitation.withColumn(
    "iqr_precipitation", expr("q3_precipitation - q1_precipitation")
)

In [0]:
final_precipitation_stats.cache()

DataFrame[ID: string, date: date, mean_precipitation: double, median_precipitation: double, stddev_precipitation: double, q3_precipitation: double, q1_precipitation: double, min_precipitation: double, max_precipitation: double, iqr_precipitation: double]

In [0]:
# group by ID and count 
display(final_precipitation_stats.groupBy("ID").count())

ID,count
1.0,809
50.0,809
22.0,809
15.0,809
45.0,809
47.0,809
44.0,809
25.0,809
26.0,809
17.0,809


In [0]:
# Maximum Date in precipitation_df
final_precipitation_stats.select(max("date")).collect()[0][0]

datetime.date(2025, 3, 19)

In [0]:
# Save the final results 
final_precipitation_stats.write.mode("overwrite").parquet("/FileStore/intermediate_output/site_daily_radius_precipitation_stats")

In [0]:
## Zip Code Coverage 
precip_2023_zips = precipitation_data_2023.select("ZCTA5CE20").distinct()
precip_2024_zips = precipitation_data_2024.select("ZCTA5CE20").distinct()
precip_2025_zips = precipitation_data_2025.select("ZCTA5CE20").distinct()

# Union all zip codes
all_precip_zips = precip_2023_zips.union(precip_2024_zips).union(precip_2025_zips).distinct()

# Calculate the total number of unique zip codes in precipitation data
total_precip_zips = all_precip_zips.count()
print(f"Total unique zip codes in precipitation data: {total_precip_zips}")

# Explode the site data to get all site-zipcode pairs
exploded_sites = site_data_df.select(
    "ID", "Brand", "State", explode("Zip Codes").alias("site_zip")
)

Total unique zip codes in precipitation data: 33791


In [0]:
# Count unique zip codes for each site
site_zip_counts = exploded_sites.groupBy("ID").agg(
    count("site_zip").alias("total_zips"),
    collect_set("site_zip").alias("all_site_zips")
)

# Convert all precipitation zip codes to a list (to be used in the UDF)
all_precip_zips_list = [row.ZCTA5CE20 for row in all_precip_zips.collect()]

# Create a function to check coverage
def calculate_coverage(site_zips):
    if site_zips is None or len(site_zips) == 0:
        return 0, 0, []
    
    covered_zips = [z for z in site_zips if z in all_precip_zips_list]
    coverage_percent = (len(covered_zips) / len(site_zips)) * 100 if len(site_zips) > 0 else 0
    return len(covered_zips), coverage_percent, covered_zips

In [0]:
from pyspark.sql.types import IntegerType, DoubleType, ArrayType, StringType, StructType, StructField

In [0]:
coverage_schema = StructType([
    StructField("covered_count", IntegerType(), False),
    StructField("coverage_percent", DoubleType(), False),
    StructField("covered_zips", ArrayType(StringType()), False)
])

coverage_udf = udf(calculate_coverage, coverage_schema)

# Apply the UDF to calculate coverage
coverage_df = site_zip_counts.withColumn(
    "coverage", coverage_udf("all_site_zips")
).select(
    "ID", 
    "total_zips",
    col("coverage.covered_count").alias("covered_zips"),
    col("coverage.coverage_percent").alias("coverage_percent"),
    col("coverage.covered_zips").alias("covered_zip_list")
)

In [0]:
# Show the results
coverage_df.filter(col("coverage_percent") < 100).orderBy(col("coverage_percent").desc()).show()

+----+----------+------------+-----------------+--------------------+
|  ID|total_zips|covered_zips| coverage_percent|    covered_zip_list|
+----+----------+------------+-----------------+--------------------+
|18.0|       462|         461|99.78354978354979|[08901, 19050, 18...|
|17.0|       438|         437|99.77168949771689|[19050, 19806, 19...|
|39.0|       112|         111|99.10714285714286|[29687, 29321, 28...|
+----+----------+------------+-----------------+--------------------+



In [0]:
coverage_df.orderBy(col("covered_zips").desc()).limit(10).show()

+----+----------+------------+-----------------+--------------------+
|  ID|total_zips|covered_zips| coverage_percent|    covered_zip_list|
+----+----------+------------+-----------------+--------------------+
|18.0|       462|         461|99.78354978354979|[08901, 19050, 18...|
|17.0|       438|         437|99.77168949771689|[19050, 19806, 19...|
|12.0|       288|         288|            100.0|[18340, 12420, 10...|
|15.0|       140|         140|            100.0|[43571, 48159, 48...|
| 9.0|       133|         133|            100.0|[47142, 40207, 40...|
|16.0|       133|         133|            100.0|[45368, 43748, 43...|
| 8.0|       132|         132|            100.0|[40006, 47174, 40...|
|13.0|       130|         130|            100.0|[28244, 28208, 28...|
|32.0|       116|         116|            100.0|[25922, 24311, 24...|
|52.0|       114|         114|            100.0|[28071, 27262, 28...|
+----+----------+------------+-----------------+--------------------+



In [0]:
# Save coverage df
coverage_df.write.mode("overwrite").parquet("/FileStore/intermediate_output/site_coverage_df")

In [0]:
spark.stop()