# US Accidents ETL Pipeline (Bronze → Silver → Gold)

## Project Objective
This notebook demonstrates how to build a production-grade ETL pipeline using Databricks (PySpark + Delta Lake).

We follow the Lakehouse architecture:
- **Bronze Layer**: Raw ingestion of CSV files
- **Silver Layer**: Cleaned, curated Delta table
- **Gold Layer**: Aggregated business-ready insights

Dataset: [US Accidents (5M+ rows)](https://www.kaggle.com/sobhanmoosavi/us-accidents)

## Bronze Layer – Raw Ingestion

### Objective:
Ingest raw accident data from CSV files into Databricks Bronze layer (raw storage). This layer is **unaltered**, serving as the source of truth.


In [0]:
file_paths =[
    "/FileStore/tables/US_Accidents_March23_part1.csv",
    "/FileStore/tables/US_Accidents_March23_part2.csv",
    "/FileStore/tables/US_Accidents_March23_part3.csv",
    "/FileStore/tables/US_Accidents_March23_part4.csv",
    "/FileStore/tables/US_Accidents_March23_part7.csv",
    "/FileStore/tables/US_Accidents_March23_part8.csv",
    "/FileStore/tables/US_Accidents_March23_part11.csv",
    "/FileStore/tables/US_Accidents_March23_part12.csv",
    "/FileStore/tables/US_Accidents_March23_part13.csv",
    "/FileStore/tables/US_Accidents_March23_part14.csv",
    "/FileStore/tables/US_Accidents_March23_part15.csv",
    "/FileStore/tables/US_Accidents_March23_part16.csv"
]

# Load all the existing csv parts together
bronze_df = spark.read.csv(file_paths, header=True, inferSchema=True)

display(bronze_df.limit(5))

ID,Source,Severity,Start_Time,End_Time,Start_Lat,Start_Lng,End_Lat,End_Lng,Distance(mi),Description,Street,City,County,State,Zipcode,Country,Timezone,Airport_Code,Weather_Timestamp,Temperature(F),Wind_Chill(F),Humidity(%),Pressure(in),Visibility(mi),Wind_Direction,Wind_Speed(mph),Precipitation(in),Weather_Condition,Amenity,Bump,Crossing,Give_Way,Junction,No_Exit,Railway,Roundabout,Station,Stop,Traffic_Calming,Traffic_Signal,Turning_Loop,Sunrise_Sunset,Civil_Twilight,Nautical_Twilight,Astronomical_Twilight
A-1,Source2,3,2016-02-08T05:46:00Z,2016-02-08T11:00:00Z,39.865147,-84.058723,,,0.01,Right lane blocked due to accident on I-70 Eastbound at Exit 41 OH-235 State Route 4.,I-70 E,Dayton,Montgomery,OH,45424,US,US/Eastern,KFFO,2016-02-08T05:58:00Z,36.9,,91.0,29.68,10.0,Calm,,0.02,Light Rain,False,False,False,False,False,False,False,False,False,False,False,False,False,Night,Night,Night,Night
A-2,Source2,2,2016-02-08T06:07:59Z,2016-02-08T06:37:59Z,39.92805900000001,-82.831184,,,0.01,Accident on Brice Rd at Tussing Rd. Expect delays.,Brice Rd,Reynoldsburg,Franklin,OH,43068-3402,US,US/Eastern,KCMH,2016-02-08T05:51:00Z,37.9,,100.0,29.65,10.0,Calm,,0.0,Light Rain,False,False,False,False,False,False,False,False,False,False,False,False,False,Night,Night,Night,Day
A-3,Source2,2,2016-02-08T06:49:27Z,2016-02-08T07:19:27Z,39.063148,-84.032608,,,0.01,Accident on OH-32 State Route 32 Westbound at Dela Palma Rd. Expect delays.,State Route 32,Williamsburg,Clermont,OH,45176,US,US/Eastern,KI69,2016-02-08T06:56:00Z,36.0,33.3,100.0,29.67,10.0,SW,3.5,,Overcast,False,False,False,False,False,False,False,False,False,False,False,True,False,Night,Night,Day,Day
A-4,Source2,3,2016-02-08T07:23:34Z,2016-02-08T07:53:34Z,39.747753,-84.20558199999998,,,0.01,Accident on I-75 Southbound at Exits 52 52B US-35. Expect delays.,I-75 S,Dayton,Montgomery,OH,45417,US,US/Eastern,KDAY,2016-02-08T07:38:00Z,35.1,31.0,96.0,29.64,9.0,SW,4.6,,Mostly Cloudy,False,False,False,False,False,False,False,False,False,False,False,False,False,Night,Day,Day,Day
A-5,Source2,2,2016-02-08T07:39:07Z,2016-02-08T08:09:07Z,39.627781,-84.188354,,,0.01,Accident on McEwen Rd at OH-725 Miamisburg Centerville Rd. Expect delays.,Miamisburg Centerville Rd,Dayton,Montgomery,OH,45459,US,US/Eastern,KMGY,2016-02-08T07:53:00Z,36.0,33.3,89.0,29.65,6.0,SW,3.5,,Mostly Cloudy,False,False,False,False,False,False,False,False,False,False,False,True,False,Day,Day,Day,Day


In [0]:
display(dbutils.fs.ls("FileStore/tables"))

path,name,size,modificationTime
dbfs:/FileStore/tables/US_Accidents_March23_part1.csv,US_Accidents_March23_part1.csv,185948083,1753308084000
dbfs:/FileStore/tables/US_Accidents_March23_part11.csv,US_Accidents_March23_part11.csv,214631434,1753311641000
dbfs:/FileStore/tables/US_Accidents_March23_part12.csv,US_Accidents_March23_part12.csv,214689170,1753311641000
dbfs:/FileStore/tables/US_Accidents_March23_part13.csv,US_Accidents_March23_part13.csv,211670187,1753311764000
dbfs:/FileStore/tables/US_Accidents_March23_part14.csv,US_Accidents_March23_part14.csv,202748332,1753311764000
dbfs:/FileStore/tables/US_Accidents_March23_part15.csv,US_Accidents_March23_part15.csv,185251537,1753311858000
dbfs:/FileStore/tables/US_Accidents_March23_part16.csv,US_Accidents_March23_part16.csv,85796042,1753311834000
dbfs:/FileStore/tables/US_Accidents_March23_part2.csv,US_Accidents_March23_part2.csv,188601922,1753308085000
dbfs:/FileStore/tables/US_Accidents_March23_part3.csv,US_Accidents_March23_part3.csv,188754231,1753308223000
dbfs:/FileStore/tables/US_Accidents_March23_part4.csv,US_Accidents_March23_part4.csv,188003058,1753308224000


In [0]:
#Get num of rows
num_rows = bronze_df.count()
#Get num of columns
num_col = len(bronze_df.columns)
#To know the shape
#print(f'shape of DataFrame:({num_rows}, {num_col})')
# Display the shape
displayHTML(f"<h3>Shape of the DataFrame: ({num_rows}, {num_col})</h3>")

In [0]:
%python
# Convert Spark DataFrame to Pandas DataFrame
bronze_df_pandas = bronze_df.limit(5).toPandas()

# Display the first 5 rows of the Pandas DataFrame
display(bronze_df_pandas)

ID,Source,Severity,Start_Time,End_Time,Start_Lat,Start_Lng,End_Lat,End_Lng,Distance(mi),Description,Street,City,County,State,Zipcode,Country,Timezone,Airport_Code,Weather_Timestamp,Temperature(F),Wind_Chill(F),Humidity(%),Pressure(in),Visibility(mi),Wind_Direction,Wind_Speed(mph),Precipitation(in),Weather_Condition,Amenity,Bump,Crossing,Give_Way,Junction,No_Exit,Railway,Roundabout,Station,Stop,Traffic_Calming,Traffic_Signal,Turning_Loop,Sunrise_Sunset,Civil_Twilight,Nautical_Twilight,Astronomical_Twilight
A-1,Source2,3,2016-02-08T05:46:00Z,2016-02-08T11:00:00Z,39.865147,-84.058723,,,0.01,Right lane blocked due to accident on I-70 Eastbound at Exit 41 OH-235 State Route 4.,I-70 E,Dayton,Montgomery,OH,45424,US,US/Eastern,KFFO,2016-02-08T05:58:00Z,36.9,,91.0,29.68,10.0,Calm,,0.02,Light Rain,False,False,False,False,False,False,False,False,False,False,False,False,False,Night,Night,Night,Night
A-2,Source2,2,2016-02-08T06:07:59Z,2016-02-08T06:37:59Z,39.92805900000001,-82.831184,,,0.01,Accident on Brice Rd at Tussing Rd. Expect delays.,Brice Rd,Reynoldsburg,Franklin,OH,43068-3402,US,US/Eastern,KCMH,2016-02-08T05:51:00Z,37.9,,100.0,29.65,10.0,Calm,,0.0,Light Rain,False,False,False,False,False,False,False,False,False,False,False,False,False,Night,Night,Night,Day
A-3,Source2,2,2016-02-08T06:49:27Z,2016-02-08T07:19:27Z,39.063148,-84.032608,,,0.01,Accident on OH-32 State Route 32 Westbound at Dela Palma Rd. Expect delays.,State Route 32,Williamsburg,Clermont,OH,45176,US,US/Eastern,KI69,2016-02-08T06:56:00Z,36.0,33.3,100.0,29.67,10.0,SW,3.5,,Overcast,False,False,False,False,False,False,False,False,False,False,False,True,False,Night,Night,Day,Day
A-4,Source2,3,2016-02-08T07:23:34Z,2016-02-08T07:53:34Z,39.747753,-84.20558199999998,,,0.01,Accident on I-75 Southbound at Exits 52 52B US-35. Expect delays.,I-75 S,Dayton,Montgomery,OH,45417,US,US/Eastern,KDAY,2016-02-08T07:38:00Z,35.1,31.0,96.0,29.64,9.0,SW,4.6,,Mostly Cloudy,False,False,False,False,False,False,False,False,False,False,False,False,False,Night,Day,Day,Day
A-5,Source2,2,2016-02-08T07:39:07Z,2016-02-08T08:09:07Z,39.627781,-84.188354,,,0.01,Accident on McEwen Rd at OH-725 Miamisburg Centerville Rd. Expect delays.,Miamisburg Centerville Rd,Dayton,Montgomery,OH,45459,US,US/Eastern,KMGY,2016-02-08T07:53:00Z,36.0,33.3,89.0,29.65,6.0,SW,3.5,,Mostly Cloudy,False,False,False,False,False,False,False,False,False,False,False,True,False,Day,Day,Day,Day


In [0]:
#To display the schema
bronze_df.printSchema()

root
 |-- ID: string (nullable = true)
 |-- Source: string (nullable = true)
 |-- Severity: integer (nullable = true)
 |-- Start_Time: timestamp (nullable = true)
 |-- End_Time: timestamp (nullable = true)
 |-- Start_Lat: double (nullable = true)
 |-- Start_Lng: double (nullable = true)
 |-- End_Lat: double (nullable = true)
 |-- End_Lng: double (nullable = true)
 |-- Distance(mi): double (nullable = true)
 |-- Description: string (nullable = true)
 |-- Street: string (nullable = true)
 |-- City: string (nullable = true)
 |-- County: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Zipcode: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Timezone: string (nullable = true)
 |-- Airport_Code: string (nullable = true)
 |-- Weather_Timestamp: timestamp (nullable = true)
 |-- Temperature(F): double (nullable = true)
 |-- Wind_Chill(F): double (nullable = true)
 |-- Humidity(%): double (nullable = true)
 |-- Pressure(in): double (nullable = true)
 |-- V

In [0]:
summary_df = bronze_df.describe()
display(summary_df)

summary,ID,Source,Severity,Start_Lat,Start_Lng,End_Lat,End_Lng,Distance(mi),Description,Street,City,County,State,Zipcode,Country,Timezone,Airport_Code,Temperature(F),Wind_Chill(F),Humidity(%),Pressure(in),Visibility(mi),Wind_Direction,Wind_Speed(mph),Precipitation(in),Weather_Condition,Sunrise_Sunset,Civil_Twilight,Nautical_Twilight,Astronomical_Twilight
count,5728394,5728394,5728394.0,5728394.0,5728394.0,3325632.0,3325632.0,5728394.0,5728390,5721098,5728190,5728394,5728394,5726775.0,5728394,5722537,5711947,5603881.0,4411095.0,5595939.0,5621961.0,5595537.0,5595060,5342373.0,4272506.0,5597926,5713631,5713631,5713631,5713631
mean,,,2.216766863452479,36.23718092206087,-95.156002056226,36.33546037597316,-96.33916060607665,0.5425242754032846,1096.6666666666667,,,,,57674.02992961756,,,,61.68364076610453,58.582832357951936,64.91603804115806,29.51167969503889,9.099359158915425,,7.551498669224718,0.0080071555194999,,,,,
stddev,,,0.5043679627320824,5.109432650848612,17.575415368950782,5.287102935632912,18.21158343795104,1.7234630100269808,171.95146900305244,,,,,31051.40346343124,,,,18.827203113849706,21.912866410928963,22.784501464477916,1.0158523090655742,2.65750741012453,,5.469075435368797,0.1123123371784662,,,,,
min,A-1,Source1,1.0,24.5548,-124.623833,24.566013,-124.545748,0.0,1039 GOLDEN BEAR - BOT,1 1/2 Ave,Aaronsburg,Abbeville,AL,1001.0,US,US/Central,K01M,-89.0,-89.0,1.0,0.0,0.0,CALM,0.0,0.0,Blowing Dust,Day,Day,Day,Day
max,A-999999,Source3,4.0,49.002201,-67.113167,49.075,-67.10924200000001,441.75,VEHICLE CRASH I77 SB NEAR LAKEVIEW ROAD THAT HAS THE CENTER AND FAR RIGHT LANE BLOCKED,william Carey Dr,Zwingle,Ziebach,WY,99403.0,US,US/Pacific,KZZV,207.0,207.0,100.0,58.63,140.0,West,1087.0,36.47,Wintry Mix / Windy,Night,Night,Night,Night


## Silver Layer – Clean & Curated

### Objective:
Clean the raw Bronze data (remove duplicates, handle nulls, cast datatypes) and save as a **Delta table** for reliable downstream analytics.


In [0]:
#Checking for duplicates
duplicates_df = bronze_df.groupBy(bronze_df.columns).count().where('count>1')
display(duplicates_df)

ID,Source,Severity,Start_Time,End_Time,Start_Lat,Start_Lng,End_Lat,End_Lng,Distance(mi),Description,Street,City,County,State,Zipcode,Country,Timezone,Airport_Code,Weather_Timestamp,Temperature(F),Wind_Chill(F),Humidity(%),Pressure(in),Visibility(mi),Wind_Direction,Wind_Speed(mph),Precipitation(in),Weather_Condition,Amenity,Bump,Crossing,Give_Way,Junction,No_Exit,Railway,Roundabout,Station,Stop,Traffic_Calming,Traffic_Signal,Turning_Loop,Sunrise_Sunset,Civil_Twilight,Nautical_Twilight,Astronomical_Twilight,count


In [0]:
%python
from pyspark.sql import functions as F

# Check for null values in each column of the DataFrame
null_counts = bronze_df.select([
    F.count(F.when(F.col(c).isNull(), c)).alias(c) 
    for c in bronze_df.columns
])
display(null_counts)


ID,Source,Severity,Start_Time,End_Time,Start_Lat,Start_Lng,End_Lat,End_Lng,Distance(mi),Description,Street,City,County,State,Zipcode,Country,Timezone,Airport_Code,Weather_Timestamp,Temperature(F),Wind_Chill(F),Humidity(%),Pressure(in),Visibility(mi),Wind_Direction,Wind_Speed(mph),Precipitation(in),Weather_Condition,Amenity,Bump,Crossing,Give_Way,Junction,No_Exit,Railway,Roundabout,Station,Stop,Traffic_Calming,Traffic_Signal,Turning_Loop,Sunrise_Sunset,Civil_Twilight,Nautical_Twilight,Astronomical_Twilight
0,0,0,0,0,0,0,2402762,2402762,0,4,7296,204,0,0,1619,0,5857,16447,91066,124513,1317299,132455,106433,132857,133334,386021,1455888,130468,0,0,0,0,0,0,0,0,0,0,0,0,0,14763,14763,14763,14763


In [0]:
%python
columns_list = bronze_df.columns
columns_df = spark.createDataFrame([(col,) for col in columns_list], ["Columns"])
display(columns_df)

Columns
ID
Source
Severity
Start_Time
End_Time
Start_Lat
Start_Lng
End_Lat
End_Lng
Distance(mi)


In [0]:
%python
# Calculate the threshold for dropping columns
threshold = 0.5 * bronze_df.count()

# Identify columns to drop based on null values
columns_to_drop = [c for c in bronze_df.columns if bronze_df.filter(F.col(c).isNull()).count() > threshold]

# Add any additional irrelevant columns to drop
columns_to_drop.extend(['Source', 'End_Lat', 'End_Lng', 'Zipcode', 'Civil_Twilight', 'Nautical_Twilight', 'Astronomical_Twilight' ])

# Drop the identified columns
cleaned_df = bronze_df.drop(*columns_to_drop)

# Display the cleaned DataFrame
display(cleaned_df.limit(5))

ID,Severity,Start_Time,End_Time,Start_Lat,Start_Lng,Distance(mi),Description,Street,City,County,State,Country,Timezone,Airport_Code,Weather_Timestamp,Temperature(F),Wind_Chill(F),Humidity(%),Pressure(in),Visibility(mi),Wind_Direction,Wind_Speed(mph),Precipitation(in),Weather_Condition,Amenity,Bump,Crossing,Give_Way,Junction,No_Exit,Railway,Roundabout,Station,Stop,Traffic_Calming,Traffic_Signal,Turning_Loop,Sunrise_Sunset
A-1,3,2016-02-08T05:46:00Z,2016-02-08T11:00:00Z,39.865147,-84.058723,0.01,Right lane blocked due to accident on I-70 Eastbound at Exit 41 OH-235 State Route 4.,I-70 E,Dayton,Montgomery,OH,US,US/Eastern,KFFO,2016-02-08T05:58:00Z,36.9,,91.0,29.68,10.0,Calm,,0.02,Light Rain,False,False,False,False,False,False,False,False,False,False,False,False,False,Night
A-2,2,2016-02-08T06:07:59Z,2016-02-08T06:37:59Z,39.92805900000001,-82.831184,0.01,Accident on Brice Rd at Tussing Rd. Expect delays.,Brice Rd,Reynoldsburg,Franklin,OH,US,US/Eastern,KCMH,2016-02-08T05:51:00Z,37.9,,100.0,29.65,10.0,Calm,,0.0,Light Rain,False,False,False,False,False,False,False,False,False,False,False,False,False,Night
A-3,2,2016-02-08T06:49:27Z,2016-02-08T07:19:27Z,39.063148,-84.032608,0.01,Accident on OH-32 State Route 32 Westbound at Dela Palma Rd. Expect delays.,State Route 32,Williamsburg,Clermont,OH,US,US/Eastern,KI69,2016-02-08T06:56:00Z,36.0,33.3,100.0,29.67,10.0,SW,3.5,,Overcast,False,False,False,False,False,False,False,False,False,False,False,True,False,Night
A-4,3,2016-02-08T07:23:34Z,2016-02-08T07:53:34Z,39.747753,-84.20558199999998,0.01,Accident on I-75 Southbound at Exits 52 52B US-35. Expect delays.,I-75 S,Dayton,Montgomery,OH,US,US/Eastern,KDAY,2016-02-08T07:38:00Z,35.1,31.0,96.0,29.64,9.0,SW,4.6,,Mostly Cloudy,False,False,False,False,False,False,False,False,False,False,False,False,False,Night
A-5,2,2016-02-08T07:39:07Z,2016-02-08T08:09:07Z,39.627781,-84.188354,0.01,Accident on McEwen Rd at OH-725 Miamisburg Centerville Rd. Expect delays.,Miamisburg Centerville Rd,Dayton,Montgomery,OH,US,US/Eastern,KMGY,2016-02-08T07:53:00Z,36.0,33.3,89.0,29.65,6.0,SW,3.5,,Mostly Cloudy,False,False,False,False,False,False,False,False,False,False,False,True,False,Day


In [0]:
%python
from pyspark.sql import functions as F

# Convert the timestamp columns to proper timestamp format
bronze_df = bronze_df.withColumn("Start_Time", F.to_timestamp("Start_Time")) \
                     .withColumn("End_Time", F.to_timestamp("End_Time")) \
                     .withColumn("Weather_Timestamp", F.to_timestamp("Weather_Timestamp"))

# Display the DataFrame to verify the changes
display(bronze_df.limit(5))

ID,Source,Severity,Start_Time,End_Time,Start_Lat,Start_Lng,End_Lat,End_Lng,Distance(mi),Description,Street,City,County,State,Zipcode,Country,Timezone,Airport_Code,Weather_Timestamp,Temperature(F),Wind_Chill(F),Humidity(%),Pressure(in),Visibility(mi),Wind_Direction,Wind_Speed(mph),Precipitation(in),Weather_Condition,Amenity,Bump,Crossing,Give_Way,Junction,No_Exit,Railway,Roundabout,Station,Stop,Traffic_Calming,Traffic_Signal,Turning_Loop,Sunrise_Sunset,Civil_Twilight,Nautical_Twilight,Astronomical_Twilight
A-1,Source2,3,2016-02-08T05:46:00Z,2016-02-08T11:00:00Z,39.865147,-84.058723,,,0.01,Right lane blocked due to accident on I-70 Eastbound at Exit 41 OH-235 State Route 4.,I-70 E,Dayton,Montgomery,OH,45424,US,US/Eastern,KFFO,2016-02-08T05:58:00Z,36.9,,91.0,29.68,10.0,Calm,,0.02,Light Rain,False,False,False,False,False,False,False,False,False,False,False,False,False,Night,Night,Night,Night
A-2,Source2,2,2016-02-08T06:07:59Z,2016-02-08T06:37:59Z,39.92805900000001,-82.831184,,,0.01,Accident on Brice Rd at Tussing Rd. Expect delays.,Brice Rd,Reynoldsburg,Franklin,OH,43068-3402,US,US/Eastern,KCMH,2016-02-08T05:51:00Z,37.9,,100.0,29.65,10.0,Calm,,0.0,Light Rain,False,False,False,False,False,False,False,False,False,False,False,False,False,Night,Night,Night,Day
A-3,Source2,2,2016-02-08T06:49:27Z,2016-02-08T07:19:27Z,39.063148,-84.032608,,,0.01,Accident on OH-32 State Route 32 Westbound at Dela Palma Rd. Expect delays.,State Route 32,Williamsburg,Clermont,OH,45176,US,US/Eastern,KI69,2016-02-08T06:56:00Z,36.0,33.3,100.0,29.67,10.0,SW,3.5,,Overcast,False,False,False,False,False,False,False,False,False,False,False,True,False,Night,Night,Day,Day
A-4,Source2,3,2016-02-08T07:23:34Z,2016-02-08T07:53:34Z,39.747753,-84.20558199999998,,,0.01,Accident on I-75 Southbound at Exits 52 52B US-35. Expect delays.,I-75 S,Dayton,Montgomery,OH,45417,US,US/Eastern,KDAY,2016-02-08T07:38:00Z,35.1,31.0,96.0,29.64,9.0,SW,4.6,,Mostly Cloudy,False,False,False,False,False,False,False,False,False,False,False,False,False,Night,Day,Day,Day
A-5,Source2,2,2016-02-08T07:39:07Z,2016-02-08T08:09:07Z,39.627781,-84.188354,,,0.01,Accident on McEwen Rd at OH-725 Miamisburg Centerville Rd. Expect delays.,Miamisburg Centerville Rd,Dayton,Montgomery,OH,45459,US,US/Eastern,KMGY,2016-02-08T07:53:00Z,36.0,33.3,89.0,29.65,6.0,SW,3.5,,Mostly Cloudy,False,False,False,False,False,False,False,False,False,False,False,True,False,Day,Day,Day,Day


In [0]:
# Rename the column to remove invalid characters
cleaned_df = cleaned_df.withColumnRenamed("Distance(mi)", "Distance_mi").withColumnRenamed("Temperature(F)", "Temperature_F").withColumnRenamed("Wind_Chill(F)", "Wind_Chill_F").withColumnRenamed("Humidity(%)", "Humidity").withColumnRenamed("Pressure(in)", "Pressure_in").withColumnRenamed("Visibility(mi)", "Visibility_mi").withColumnRenamed("VWind_Speed(mph)", "Wind_Speed_mph").withColumnRenamed("Precipitation(in)", "Precipitation_in").withColumnRenamed("Wind_Speed(mph)", "Wind_Speed_mph")

# Save the cleaned DataFrame to a Delta table
cleaned_df.write.format("delta").mode("overwrite").save("/mnt/delta/US_Accidents")

In [0]:
silver_df=cleaned_df.write.format("delta").mode("overwrite").save("/FileStore/tables/silver/us_accidents")

In [0]:
silver_check = spark.read.format("delta").load("/FileStore/tables/silver/us_accidents")
display(silver_check.limit(5))
print(f"Total Silver rows: {silver_check.count()}")

ID,Severity,Start_Time,End_Time,Start_Lat,Start_Lng,Distance_mi,Description,Street,City,County,State,Country,Timezone,Airport_Code,Weather_Timestamp,Temperature_F,Wind_Chill_F,Humidity,Pressure_in,Visibility_mi,Wind_Direction,Wind_Speed_mph,Precipitation_in,Weather_Condition,Amenity,Bump,Crossing,Give_Way,Junction,No_Exit,Railway,Roundabout,Station,Stop,Traffic_Calming,Traffic_Signal,Turning_Loop,Sunrise_Sunset
A-1009766,3,2021-06-10T06:29:27Z,2021-06-10T07:29:20Z,27.711987,-82.385284,2.5299999713897705,Queueing traffic and right lane blocked due to accident on I-75 Southbound between 19th Ave and FL-674.,I-75 S,Ruskin,Hillsborough,FL,US,US/Eastern,KMCF,2021-06-10T06:56:00Z,75.0,75.0,88.0,30.04,10.0,ESE,6.0,0.0,Fair,False,False,False,False,False,False,False,False,False,False,False,False,False,Night
A-1009767,2,2021-06-10T06:42:37Z,2021-06-10T09:10:52Z,27.878794,-82.659882,0.0,Accident on I-275 Northbound at CR-296.,Roosevelt Blvd N,Saint Petersburg,Pinellas,FL,US,US/Eastern,KPIE,2021-06-10T06:53:00Z,77.0,77.0,88.0,30.05,10.0,SE,8.0,0.0,Fair,False,False,False,False,False,False,False,False,False,False,False,False,False,Day
A-1009768,2,2021-06-10T06:46:34Z,2021-06-10T08:37:22Z,27.85145,-82.346352,0.0,Accident on I-75 Northbound at Gibsonton Dr.,I-75 N,Gibsonton,Hillsborough,FL,US,US/Eastern,KTPF,2021-06-10T06:55:00Z,75.0,75.0,89.0,30.06,10.0,CALM,0.0,0.0,Fair,False,False,False,False,False,False,False,False,False,False,False,False,False,Day
A-1009769,3,2021-06-10T06:55:39Z,2021-06-10T07:40:31Z,27.96545,-82.439545,0.0,Queueing traffic and left hand shoulder blocked due to accident on I-4 Westbound at 14th Ave.,I-4 W,Tampa,Hillsborough,FL,US,US/Eastern,KTPF,2021-06-10T06:55:00Z,75.0,75.0,89.0,30.06,10.0,CALM,0.0,0.0,Fair,False,False,False,False,False,False,False,False,False,False,False,False,False,Day
A-1009770,2,2021-06-10T07:01:19Z,2021-06-10T07:30:10Z,27.95682,-82.45970200000002,0.5899999737739563,Accident on exit ramp from I-275 Southbound to I-4 Westbound.,N Florida Ave,Tampa,Hillsborough,FL,US,US/Eastern,KTPF,2021-06-10T06:55:00Z,75.0,75.0,89.0,30.06,10.0,CALM,0.0,0.0,Fair,False,False,True,False,False,False,False,False,False,False,False,False,False,Day


Total Silver rows: 5728394


In [0]:
%sql
CREATE TABLE IF NOT EXISTS silver_us_accidents
USING DELTA
LOCATION '/FileStore/tables/silver/us_accidents'

## Gold Layer – Aggregated Insights

### Objective:
Generate business-ready tables for analytics and dashboards.


In [0]:
from pyspark.sql.functions import col, count, avg, hour
#Load the silver layer which is the cleaned layer
silver_df = spark.read.format('delta').load('FileStore/tables/silver/us_accidents')
display(silver_df.limit(5))

ID,Severity,Start_Time,End_Time,Start_Lat,Start_Lng,Distance_mi,Description,Street,City,County,State,Country,Timezone,Airport_Code,Weather_Timestamp,Temperature_F,Wind_Chill_F,Humidity,Pressure_in,Visibility_mi,Wind_Direction,Wind_Speed_mph,Precipitation_in,Weather_Condition,Amenity,Bump,Crossing,Give_Way,Junction,No_Exit,Railway,Roundabout,Station,Stop,Traffic_Calming,Traffic_Signal,Turning_Loop,Sunrise_Sunset
A-1009766,3,2021-06-10T06:29:27Z,2021-06-10T07:29:20Z,27.711987,-82.385284,2.5299999713897705,Queueing traffic and right lane blocked due to accident on I-75 Southbound between 19th Ave and FL-674.,I-75 S,Ruskin,Hillsborough,FL,US,US/Eastern,KMCF,2021-06-10T06:56:00Z,75.0,75.0,88.0,30.04,10.0,ESE,6.0,0.0,Fair,False,False,False,False,False,False,False,False,False,False,False,False,False,Night
A-1009767,2,2021-06-10T06:42:37Z,2021-06-10T09:10:52Z,27.878794,-82.659882,0.0,Accident on I-275 Northbound at CR-296.,Roosevelt Blvd N,Saint Petersburg,Pinellas,FL,US,US/Eastern,KPIE,2021-06-10T06:53:00Z,77.0,77.0,88.0,30.05,10.0,SE,8.0,0.0,Fair,False,False,False,False,False,False,False,False,False,False,False,False,False,Day
A-1009768,2,2021-06-10T06:46:34Z,2021-06-10T08:37:22Z,27.85145,-82.346352,0.0,Accident on I-75 Northbound at Gibsonton Dr.,I-75 N,Gibsonton,Hillsborough,FL,US,US/Eastern,KTPF,2021-06-10T06:55:00Z,75.0,75.0,89.0,30.06,10.0,CALM,0.0,0.0,Fair,False,False,False,False,False,False,False,False,False,False,False,False,False,Day
A-1009769,3,2021-06-10T06:55:39Z,2021-06-10T07:40:31Z,27.96545,-82.439545,0.0,Queueing traffic and left hand shoulder blocked due to accident on I-4 Westbound at 14th Ave.,I-4 W,Tampa,Hillsborough,FL,US,US/Eastern,KTPF,2021-06-10T06:55:00Z,75.0,75.0,89.0,30.06,10.0,CALM,0.0,0.0,Fair,False,False,False,False,False,False,False,False,False,False,False,False,False,Day
A-1009770,2,2021-06-10T07:01:19Z,2021-06-10T07:30:10Z,27.95682,-82.45970200000002,0.5899999737739563,Accident on exit ramp from I-275 Southbound to I-4 Westbound.,N Florida Ave,Tampa,Hillsborough,FL,US,US/Eastern,KTPF,2021-06-10T06:55:00Z,75.0,75.0,89.0,30.06,10.0,CALM,0.0,0.0,Fair,False,False,True,False,False,False,False,False,False,False,False,False,False,Day


In [0]:
#Lets do total accidents per state
gold_state_accidents = (
    silver_df.groupBy('State')
    .agg(count('*').alias('total_accidents'))
    .orderBy(col('total_accidents').desc())
)

display(gold_state_accidents.show(10))

+-----+---------------+
|State|total_accidents|
+-----+---------------+
|   CA|        1348225|
|   FL|         657915|
|   TX|         433062|
|   SC|         264481|
|   NY|         245876|
|   NC|         230607|
|   PA|         222503|
|   VA|         213662|
|   OR|         151406|
|   MN|         141167|
+-----+---------------+
only showing top 10 rows


In [0]:
#Average Severity by Weather
gold_weather_severity = (
    silver_df.groupBy('Weather_Condition')
    .agg(avg('Severity').alias('avg_severity'))
    .orderBy(col('avg_severity').desc())
)

display(gold_weather_severity.limit(10))

Weather_Condition,avg_severity
Light Blowing Snow,4.0
Patches of Fog / Windy,3.142857142857143
Light Snow Showers,3.111111111111111
Partial Fog / Windy,3.0
Heavy Blowing Snow,3.0
Snow Showers,3.0
Thunder and Hail / Windy,3.0
Heavy Freezing Rain / Windy,3.0
Light Fog,3.0
Heavy Thunderstorms and Snow,2.8


In [0]:
from pyspark.sql import functions as F

# Check for null values in each column of the DataFrame
null_counts = silver_df.select([
    F.count(F.when(F.col(c).isNull(), c)).alias(c) 
    for c in silver_df.columns
])
display(null_counts)

ID,Severity,Start_Time,End_Time,Start_Lat,Start_Lng,Distance_mi,Description,Street,City,County,State,Country,Timezone,Airport_Code,Weather_Timestamp,Temperature_F,Wind_Chill_F,Humidity,Pressure_in,Visibility_mi,Wind_Direction,Wind_Speed_mph,Precipitation_in,Weather_Condition,Amenity,Bump,Crossing,Give_Way,Junction,No_Exit,Railway,Roundabout,Station,Stop,Traffic_Calming,Traffic_Signal,Turning_Loop,Sunrise_Sunset
0,0,0,0,0,0,0,4,7296,204,0,0,0,5857,16447,91066,124513,1317299,132455,106433,132857,133334,386021,1455888,130468,0,0,0,0,0,0,0,0,0,0,0,0,0,14763


In [0]:
#Hourly Accident Trend
gold_hourly_trend = (
    silver_df.withColumn('Accident_Hour', hour(col('Start_Time')))
    .groupBy('Accident_Hour')
    .agg(count('*').alias('total_accidents'))
    .orderBy('Accident_Hour')
)

display(gold_hourly_trend)

Accident_Hour,total_accidents
0,91671
1,80358
2,76636
3,68699
4,125937
5,173493
6,296969
7,418369
8,413577
9,262156


In [0]:

#Saving Gold Tables as Delta Tables 
gold_state_accidents.write.format('delta').mode('overwrite').save("/Filestore/tables/gold/gold_state_accidents")
gold_weather_severity.write.format('delta').mode('overwrite').save("/Filestore/tables/gold/gold_weather_severity")
gold_hourly_trend.write.format('delta').mode('overwrite').save("/Filestore/tables/gold/gold_hourly_trend")