# 1. Geospatial Analytics - Data Ingestion

**Mobility and Automotive - Road Safety and Risk Prevention**

## Overview

This notebook delivers the ingestion pipeline to build the medallion architecture for the geospatial use case. To make it easily reproducible, it downloads raw datasets from APIs (or a repo) into UC Volumes (bronze). All subsequent steps in the pipeline then read raw files from the Volume to build the silver and gold layers.

We provide sample data from public APIs in the USA, primarily NYC data sources given their availability.

Customers can plug in any dataset as long as it provides the following:

- **Collisions**: A set of road incidents, including contributing factors
- **Traffic Volume**: Historical traffic volume for a set of latitude/longitude points
- **Road Condition**: Road and traffic conditions
- **Rides**: A set of rides/drives with distance, pick and droff locations.
- **Weather**: Historical weather conditions for a set of latitude/longitude points
- **Telematics**: A set of rides/drives with driving metrics like acceleration and speed


⚙️ **Note**: Ensure to configure `./Config.ipynb` to set the destination catalog and volume.

### Load Depedencies

In [None]:
# Un-comment the following lines to generate the requirements.txt file from requirements.in file
# %pip install pip-tools
# !pip-compile requirements.in

In [None]:
%pip install -r requirements.txt

try:
    dbutils.library.restartPython()
except:
    pass


### Set Configs and Create Spark Session

In [None]:
%run ./Config


In [None]:
%run ./Utils

### Data Design



We build a medallion architecture pipeline to populate a final gold schema providing the following schema. Provided you can populate this gold schema, you should reuse much of our code.



In [None]:

mm("""
erDiagram

COLLISION_BRONZE {
  STRING crash_date
  STRING crash_time
  STRING borough
  STRING zip_code
  DOUBLE latitude
  DOUBLE longitude
  STRING location
  STRING on_street_name
  STRING cross_street_name
  STRING off_street_name
  STRING number_of_persons_injured
  INT number_of_persons_killed
  INT number_of_pedestrians_injured
  INT number_of_pedestrians_killed
  INT number_of_cyclist_injured
  STRING number_of_cyclist_killed
  STRING number_of_motorist_injured
  INT number_of_motorist_killed
  STRING contributing_factor_vehicle_1
  STRING contributing_factor_vehicle_2
  STRING contributing_factor_vehicle_3
  STRING contributing_factor_vehicle_4
  STRING contributing_factor_vehicle_5
  INT collision_id
  STRING vehicle_type_code_1
  STRING vehicle_type_code_2
  STRING vehicle_type_code_3
  STRING vehicle_type_code_4
  STRING vehicle_type_code_5
}

COLLISION_SILVER {
  DATE crash_date
  TIMESTAMP crash_time
  STRING borough
  STRING zip_code
  DOUBLE latitude
  DOUBLE longitude
  STRING on_street_name
  STRING cross_street_name
  STRING off_street_name
  INT number_of_persons_injured
  INT number_of_persons_killed
  INT number_of_pedestrians_injured
  INT number_of_pedestrians_killed
  INT number_of_cyclist_injured
  INT number_of_cyclist_killed
  INT number_of_motorist_injured
  INT number_of_motorist_killed
  STRING contributing_factor_vehicle_1
  STRING contributing_factor_vehicle_2
  STRING contributing_factor_vehicle_3
  STRING contributing_factor_vehicle_4
  STRING contributing_factor_vehicle_5
  INT collision_id
  STRING vehicle_type_code_1
  STRING vehicle_type_code_2
  STRING vehicle_type_code_3
  STRING vehicle_type_code_4
  STRING vehicle_type_code_5
}

TRAFFIC_VOLUME_COUNTS_BRONZE {
  BIGINT requestid
  STRING boro
  INT yr
  INT m
  INT d
  INT hh
  INT mm
  INT vol
  BIGINT segmentid
  STRING wktgeom
  STRING street
  STRING fromst
  STRING tost
  STRING direction
}

TRAFFIC_VOLUME_COUNTS_SILVER {
  INT vol
  DOUBLE lat
  DOUBLE long
  TIMESTAMP timestamp
  BIGINT h3_index
}

ROAD_CONDITION {
  DOUBLE latitude
  DOUBLE longitude
  STRING id
  STRING region_name
  STRING county_name
  STRING severity
  STRING roadway_name
  STRING direction_of_travel
  STRING description
  STRING lanes_affected
  STRING lanes_status
  STRING navteq_link_id
  STRING primary_location
  STRING secondary_location
  STRING first_article_city
  STRING second_city
  STRING event_type
  STRING event_sub_type
  TIMESTAMP reported_date
  TIMESTAMP start_date
  TIMESTAMP planned_end_date
}

TELEMATICS_BRONZE {
  BIGINT vehicle_id
  STRING vin_number
  TIMESTAMP timestamp
  DOUBLE latitude
  DOUBLE longitude
  BOOLEAN abs
  BOOLEAN tpms
  BOOLEAN brake_pad
  BOOLEAN accelerometer
  BOOLEAN blind_spot
  BOOLEAN lane_departure
  BOOLEAN fcw
  BOOLEAN parking_sensor
  BOOLEAN rear_view_camera
  BOOLEAN engine_light
  BOOLEAN rain_sensor
  BOOLEAN wiper_blades
  BOOLEAN fog_light
  BOOLEAN seatbelt_off
  BOOLEAN door_ajar
  DOUBLE total_fuel_consumption
  DOUBLE fuel_level_percent
  DOUBLE fuel_level_liters
  DOUBLE vehicle_mileage_dashboard
  DOUBLE vehicle_mileage_computed
  INT engine_speed_rpm
  BOOLEAN oil_level_status
  DOUBLE engine_temperature
  DOUBLE vehicle_speed
  DOUBLE accelerator_pedal_position
  BOOLEAN lights_parking
  BOOLEAN lights_dipped
  BOOLEAN lights_full_beam
  BOOLEAN lights_fog_front
  BOOLEAN lights_fog_rear
  BOOLEAN door_front_left
  BOOLEAN door_front_right
  BOOLEAN door_rear_right
  BOOLEAN door_rear_left
  BOOLEAN trunk_cover
  BOOLEAN engine_hood
  BOOLEAN charging_cable_status
  BOOLEAN charging_status
  DOUBLE battery_level
  DOUBLE vehicle_range_km
  DOUBLE battery_temperature
  DOUBLE cng_level
  DOUBLE total_cng_consumption
  BOOLEAN engine_on_cng
  DOUBLE adblue_level_dashboard
  BOOLEAN seatbelt_driver
  BOOLEAN seatbelt_passenger
  BOOLEAN check_engine
  BOOLEAN lights_failure
  BOOLEAN low_tire_pressure
}

TRIPS_BRONZE {
  TIMESTAMP tpep_pickup_datetime
  TIMESTAMP tpep_dropoff_datetime
  DOUBLE trip_distance
  DOUBLE fare_amount
  INT pickup_zip
  INT dropoff_zip
  STRING trip_type
}

TRIPS_SILVER {
  TIMESTAMP tpep_pickup_datetime
  TIMESTAMP tpep_dropoff_datetime
  DOUBLE trip_distance
  INT pickup_zip
  INT dropoff_zip
  DOUBLE pickup_latitude
  DOUBLE pickup_longitude
  DOUBLE dropoff_latitude
  DOUBLE dropoff_longitude
  STRING trip_type
}

WEATHER_HISTORY_BRONZE {
  TIMESTAMP datetime
  DATE date
  DOUBLE latitude
  DOUBLE longitude
  FLOAT temperature_2m
  FLOAT precipitation
  FLOAT rain
  FLOAT snowfall
  FLOAT snow_depth
  FLOAT weather_code
  FLOAT wind_speed_10m
  FLOAT wind_direction_10m
  FLOAT wind_gusts_10m
}

COLLISION_BRONZE ||--|| COLLISION_SILVER : collision_id
TRAFFIC_VOLUME_COUNTS_BRONZE ||--|| TRAFFIC_VOLUME_COUNTS_SILVER : request_id
TRIPS_BRONZE ||--o| TRIPS_SILVER : maps_to
""")

### Public datasets used for this example pipeline



| Dataset       | Source Used                                                                                  | Description                                       | Note                                                        |
|----------------|---------------------------------------------------------------------------------------------|---------------------------------------------------|-------------------------------------------------------------|
| Collisions     | [NYC Open Data](http://data.cityofnewyork.us/Public-Safety/Motor-Vehicle-Collisions-Crashes/h9gi-nx95/about_data) | A set of road incidents, including contributing factors. | Refer to [Terms of Use](https://opendata.cityofnewyork.us/overview/#termsofuse) for more information.                                    |
| Traffic Volume | [NYC Open Data](https://data.cityofnewyork.us/Transportation/Automated-Traffic-Volume-Counts/7ym2-wayt/about_data) | Historical traffic volume for a set of lat/long.   | Refer to [Terms of Use](https://opendata.cityofnewyork.us/overview/#termsofuse) for more information.                                  |
| Road Condition | [511ny](https://511ny.org)                                                                  | Road and traffic conditions.                       | Refer to [Developer Access Agreement](https://511ny.org/developers/daa) for more information                  |
| Weather        | [Open-Meteo Weather API](http://pypi.org/project/openmeteo-requests/)                        | Historical weather data.                           | Licensed under [Attribution 4.0 International (CC BY 4.0)](https://creativecommons.org/licenses/by/4.0/)      |
| Telematics     | [Synthetic](source)                                                                         | A set of rides/drives with driving metrics like acceleration, speed. | Uses [dbldatagen](https://github.com/databrickslabs/dbldatagen) for synthetic generation. |

Please view [./1A-Data Download.ipynb](./1A-Data%20Download.ipynb) to understand how the Weather and Road Condition CSV datasets were created.
  


## Pipeline Implementation



### Upload Sample Datasets into UC Volume


**Collisions**

In [None]:
local_path = "./data/Motor_Vehicle_Collisions_-_Crashes_20250328.csv"


volume_path = f"/Volumes/{catalog_name}/{schema_name}/{volume_name}/data/collisions"

upload_file_to_volume(local_path, volume_path, "Motor_Vehicle_Collisions_-_Crashes_20250328.csv"  )


**Traffic Volume** 

In [None]:

local_path = "./data/Automated_Traffic_Volume_Counts_20240903.csv.gz"

# Destination in the Volume
volume_path = f"/Volumes/{catalog_name}/{schema_name}/{volume_name}/data/traffic_volume"

upload_file_to_volume(local_path, volume_path, "Automated_Traffic_Volume_Counts_20240903.csv.gz"  )

**Road Condition and Events**

In [None]:
local_path = "./data/road_condition_2024.csv.gz"

# Destination in the Volume
volume_path = f"/Volumes/{catalog_name}/{schema_name}/{volume_name}/data/road_condition"

upload_file_to_volume(local_path, volume_path, "road_condition_2024.csv.gz"  )

**Contributing Factors**

In [None]:

local_path = "./data/contributing_factor_tags_lkp_gold.csv"

# Destination in the Volume
volume_path = f"/Volumes/{catalog_name}/{schema_name}/{volume_name}/data/contributing_factor"

upload_file_to_volume(local_path, volume_path, "contributing_factor_tags_lkp_gold.csv"  )

**Weather - Contains weather condition for all 2024 crashes**

In [None]:
local_path = "./data/historical_weather_crashes_2024.csv"

# Destination in the Volume
volume_path = f"/Volumes/{catalog_name}/{schema_name}/{volume_name}/data/weather"

upload_file_to_volume(local_path, volume_path, "historical_weather_crashes.csv"  )

### Create Data Model

Collisions

In [None]:
sql_ddl = f'''
CREATE OR REPLACE TABLE {catalog_name}.{schema_name}.collision_bronze (
  crash_date STRING COMMENT 'Represents the date when the collision occurred.',
  crash_time STRING COMMENT 'Represents the time when the collision occurred.',
  borough STRING COMMENT 'Represents the borough where the collision took place.',
  zip_code STRING COMMENT 'Represents the zip code where the collision took place.',
  latitude DOUBLE COMMENT 'Represents the latitude of the collision location.',
  longitude DOUBLE COMMENT 'Represents the longitude of the collision location.',
  location STRING COMMENT 'Represents the human-readable description of the collision location.',
  on_street_name STRING COMMENT 'Represents the name of the street where the collision took place.',
  cross_street_name STRING COMMENT 'Represents the name of the cross street where the collision took place.',
  off_street_name STRING COMMENT 'Represents the name of the off-street location where the collision took place.',
  number_of_persons_injured STRING COMMENT 'Represents the number of people who were injured in the collision.',
  number_of_persons_killed INT COMMENT 'Represents the number of people who were killed in the collision.',
  number_of_pedestrians_injured INT COMMENT 'Represents the number of pedestrians who were injured in the collision.',
  number_of_pedestrians_killed INT COMMENT 'Represents the number of pedestrians who were killed in the collision.',
  number_of_cyclist_injured INT COMMENT 'Represents the number of cyclists who were injured in the collision.',
  number_of_cyclist_killed STRING COMMENT 'Represents the number of cyclists who were killed in the collision.',
  number_of_motorist_injured STRING COMMENT 'Represents the number of motorists who were injured in the collision.',
  number_of_motorist_killed INT COMMENT 'Represents the number of motorists who were killed in the collision.',
  contributing_factor_vehicle_1 STRING COMMENT 'Represents the first contributing factor of the collision, related to one of the vehicles involved.',
  contributing_factor_vehicle_2 STRING COMMENT 'Represents the second contributing factor of the collision, related to one of the vehicles involved.',
  contributing_factor_vehicle_3 STRING COMMENT 'Represents the third contributing factor of the collision, if applicable.',
  contributing_factor_vehicle_4 STRING COMMENT 'Represents the fourth contributing factor of the collision, if applicable.',
  contributing_factor_vehicle_5 STRING COMMENT 'Represents the fifth contributing factor of the collision, if applicable.',
  collision_id INT COMMENT 'Unique identifier for the collision, allowing easy reference and tracking.',
  vehicle_type_code_1 STRING COMMENT 'First vehicle type code associated with the collision.',
  vehicle_type_code_2 STRING COMMENT 'Second vehicle type code associated with the collision.',
  vehicle_type_code_3 STRING COMMENT 'Third vehicle type code associated with the collision.',
  vehicle_type_code_4 STRING COMMENT 'Fourth vehicle type code associated with the collision.',
  vehicle_type_code_5 STRING COMMENT 'Fifth vehicle type code associated with the collision.'
)
COMMENT 'This table contains data on traffic collisions. It includes details such as the date, time, location, and contributing factors. This data can be used to analyze traffic patterns, identify high-risk areas, and understand the causes of accidents. It can also help in planning traffic management strategies, allocating resources, and improving road safety.'
'''

spark.sql(sql_ddl)

In [None]:
sql_ddl = f'''
CREATE OR REPLACE TABLE {catalog_name}.{schema_name}.collision_silver (
  crash_date DATE COMMENT 'Represents the date of the collision.',
  crash_time TIMESTAMP COMMENT 'Represents the time of the collision.',
  borough STRING COMMENT 'Identifies the borough where the collision occurred.',
  zip_code STRING COMMENT 'Represents the zip code where the collision took place.',
  latitude DOUBLE COMMENT 'Represents the latitude of the collision location.',
  longitude DOUBLE COMMENT 'Represents the longitude of the collision location.',
  on_street_name STRING COMMENT 'Identifies the name of the street where the collision occurred.',
  cross_street_name STRING COMMENT 'Represents the name of the cross street where the collision took place.',
  off_street_name STRING COMMENT 'Identifies the name of the off-street location where the collision occurred.',
  number_of_persons_injured INT COMMENT 'Represents the number of people who were injured in the collision.',
  number_of_persons_killed INT COMMENT 'Represents the number of people who were killed in the collision.',
  number_of_pedestrians_injured INT COMMENT 'Identifies the number of pedestrians who were injured in the collision.',
  number_of_pedestrians_killed INT COMMENT 'Represents the number of pedestrians who were killed in the collision.',
  number_of_cyclist_injured INT COMMENT 'Identifies the number of cyclists who were injured in the collision.',
  number_of_cyclist_killed INT COMMENT 'Represents the number of cyclists who were killed in the collision.',
  number_of_motorist_injured INT COMMENT 'Identifies the number of motorists who were injured in the collision.',
  number_of_motorist_killed INT COMMENT 'Represents the number of motorists who were killed in the collision.',
  contributing_factor_vehicle_1 STRING COMMENT 'Represents the first contributing factor of the collision involving a vehicle.',
  contributing_factor_vehicle_2 STRING COMMENT 'Identifies the second contributing factor of the collision involving a vehicle.',
  contributing_factor_vehicle_3 STRING COMMENT 'Represents the third contributing factor of the collision involving a vehicle.',
  contributing_factor_vehicle_4 STRING COMMENT 'Represents the fourth contributing factor of the collision, if applicable.',
  contributing_factor_vehicle_5 STRING COMMENT 'Represents the fifth contributing factor of the collision, if applicable.',
  collision_id INT COMMENT 'Unique identifier for the collision.',
  vehicle_type_code_1 STRING COMMENT 'Represents the first vehicle type code involved in the collision.',
  vehicle_type_code_2 STRING COMMENT 'Represents the second vehicle type code involved in the collision.',
  vehicle_type_code_3 STRING COMMENT 'Represents the third vehicle type code involved in the collision.',
  vehicle_type_code_4 STRING COMMENT 'Represents the fourth vehicle type code involved in the collision.',
  vehicle_type_code_5 STRING COMMENT 'Represents the fifth vehicle type code involved in the collision.'
)
COMMENT 'The table contains data related to traffic accidents. It includes details such as the date and time of the accident, the location (including borough, zip code, latitude, and longitude), and the number of people involved (pedestrians, cyclists, and motorists). The table also captures the contributing factors of the accidents. This data can be used to analyze traffic patterns, identify accident-prone areas, and assess the impact of various factors on accidents. It can also help in developing strategies to reduce accidents and improve road safety in the city.'
'''

spark.sql(sql_ddl)

Traffic Volume

In [None]:
sql_ddl = f'''
CREATE OR REPLACE TABLE {catalog_name}.{schema_name}.traffic_volume_counts_bronze (
  requestid BIGINT COMMENT 'A unique ID that is generated for each counts request.',
  boro STRING COMMENT 'Lists which of the five administrative divisions of New York City the location is within, written as a word.',
  yr INT COMMENT 'The two-digit year portion of the date when the count was conducted.',
  m INT COMMENT 'The two-digit month portion of the date when the count was conducted.',
  d INT COMMENT 'The two-digit day portion of the date when the count was conducted.',
  hh INT COMMENT 'The two-digit hour portion of the time when the count was conducted.',
  mm INT COMMENT 'The two-digit start minute portion of the time when the count was conducted.',
  vol INT COMMENT 'The total sum of counts collected within a 15-minute increment.',
  segmentid BIGINT COMMENT 'The ID that identifies each segment of a street in the LION street network version 14.',
  wktgeom STRING COMMENT 'A text markup language for representing vector geometry objects on a map and spatial reference systems of spatial objects.',
  street STRING COMMENT 'The On Street where the count took place.',
  fromst STRING COMMENT 'The From Street where the count took place.',
  tost STRING COMMENT 'The To Street where the count took place.',
  direction STRING COMMENT 'The text-based direction of traffic where the count took place.'
)
COMMENT 'Traffic volume count data. Schema based on https://data.cityofnewyork.us/Transportation/Automated-Traffic-Volume-Counts/7ym2-wayt/about_data.';
'''

spark.sql(sql_ddl)

In [None]:

spark.sql(f"""
CREATE OR REPLACE TABLE {catalog_name}.{schema_name}.traffic_volume_counts_silver (
  vol INT COMMENT 'The total sum of counts collected within a 15-minute increment.',
  lat DOUBLE COMMENT 'Latitude derived from geometry.',
  long DOUBLE COMMENT 'Longitude derived from geometry.',
  timestamp TIMESTAMP COMMENT 'Timestamp of the count event.',
  h3_index BIGINT COMMENT 'H3 index (resolution 9) based on lat/long.'
)
CLUSTER BY (h3_index)
COMMENT 'Traffic volume count data with lat/log and H3 index.';
""")

Road Condition

In [None]:
spark.sql(f"""
CREATE OR REPLACE TABLE {catalog_name}.{schema_name}.road_condition_bronze (
    latitude DOUBLE COMMENT 'Latitude coordinate of the road event.',
    longitude DOUBLE COMMENT 'Longitude coordinate of the road event.',
    id STRING COMMENT 'Unique identifier for the road condition event.',
    region_name STRING COMMENT 'Name of the region where the event occurred.',
    county_name STRING COMMENT 'County in which the road event was reported.',
    severity STRING COMMENT 'Severity level of the road condition or event.',
    roadway_name STRING COMMENT 'Name of the affected roadway.',
    direction_of_travel STRING COMMENT 'Direction of travel on the affected roadway.',
    description STRING COMMENT 'Description of the road event or condition.',
    lanes_affected STRING COMMENT 'Information about lanes impacted by the event.',
    lanes_status STRING COMMENT 'Current status of affected lanes (e.g., closed, open, restricted).',
    navteq_link_id STRING COMMENT 'Navteq link ID used for mapping the roadway segment.',
    primary_location STRING COMMENT 'Primary location detail for the road event.',
    secondary_location STRING COMMENT 'Secondary location reference related to the event.',
    first_article_city STRING COMMENT 'Primary city associated with the road event.',
    second_city STRING COMMENT 'Secondary city or neighboring city reference.',
    event_type STRING COMMENT 'General type of road event (e.g., construction, incident, closure).',
    event_sub_type STRING COMMENT 'More specific classification of the road event.',
    reported_date TIMESTAMP COMMENT 'Timestamp when the event was first reported.',
    start_date TIMESTAMP COMMENT 'Planned or actual start date of the event.',
    planned_end_date TIMESTAMP COMMENT 'Planned end date of the road event.'
)
COMMENT 'This table contains geospatial road condition data, including planned events such as closures, construction, or hazards. Each record captures key location, time, and severity details to support mapping, traffic analysis, and operational decision-making.';
""")

Trips

In [None]:
spark.sql(f"""
CREATE OR REPLACE TABLE {catalog_name}.{schema_name}.trips_bronze (
  tpep_pickup_datetime TIMESTAMP COMMENT 'Timestamp when the passenger was picked up.',
  tpep_dropoff_datetime TIMESTAMP COMMENT 'Timestamp when the passenger was dropped off.',
  trip_distance DOUBLE COMMENT 'Total distance of the trip in miles.',
  fare_amount DOUBLE COMMENT 'Fare charged for the trip in USD.',
  pickup_zip INT COMMENT 'ZIP code of the pickup location.',
  dropoff_zip INT COMMENT 'ZIP code of the dropoff location.',
  trip_type STRING COMMENT 'Type of trip (e.g., Taxi, Ride Sharing).'
)
COMMENT 'Contains car trips. Raw trip data including pickup/dropoff times, trip distance, fare amount, and ZIP codes for pickup and dropoff locations.';""")
 


In [None]:
spark.sql(f"""
CREATE OR REPLACE TABLE {catalog_name}.{schema_name}.trips_silver (
  id BIGINT GENERATED ALWAYS AS IDENTITY COMMENT 'Auto populated unique identifier for the trip.',
  tpep_pickup_datetime TIMESTAMP COMMENT 'The timestamp of the taxi trip pickup.',
  tpep_dropoff_datetime TIMESTAMP COMMENT 'The timestamp of the taxi trip dropoff.',
  trip_distance DOUBLE COMMENT 'The distance of the taxi trip, measured in miles.',
  pickup_zip INT COMMENT 'The zip code of the trip pickup location.',
  dropoff_zip INT COMMENT 'The zip code of the trip dropoff location.',
  pickup_latitude DOUBLE COMMENT 'The latitude of the trip pickup location.',
  pickup_longitude DOUBLE COMMENT 'The longitude of the trip pickup location.',
  dropoff_latitude DOUBLE COMMENT 'The latitude of the trip dropoff location.',
  dropoff_longitude DOUBLE COMMENT 'The longitude of the trip dropoff location.',
  trip_type STRING COMMENT 'Type of trip (e.g., Taxi, Ride Sharing).'
)
COMMENT 'Contains car trips. It includes details such as the trip duration, pick-up and drop-off locations, and latitude/longitude coordinates. This data can be used to analyze taxi usage patterns, identify high-traffic areas, and track the movement of taxis throughout the city. It can also be used to optimize taxi routes and improve overall taxi service efficiency.'
""")

In [None]:
spark.sql(f"""
CREATE OR REPLACE TABLE {catalog_name}.{schema_name}.trips_route_gold (
  trip_id BIGINT COMMENT 'Reference to the trip id in trips_silver.',
  path_latitude DOUBLE COMMENT 'Latitude of the trip route, allowing tracking of the trip path.',
  path_longitude DOUBLE COMMENT 'Longitude of the trip route, allowing tracking of the trip path.',
  order LONG COMMENT 'Unique identifier for the trip order, allowing easy reference and tracking.',
  tpep_pickup_datetime TIMESTAMP COMMENT 'The timestamp of the taxi trip pickup.',
  tpep_dropoff_datetime TIMESTAMP COMMENT 'The timestamp of the taxi trip dropoff.'
)
COMMENT 'Contains information about trips with their route. It includes latitude and longitude of the path taken. This data can be used to analyze taxi/ride sharing routes, identify congested areas, and optimize taxi routes for improved efficiency and reduced travel times. Additionally, it can help in understanding taxi demand patterns and identifying high-demand areas for taxi services.'
""")

Telematics

In [None]:

spark.sql(f"""
CREATE OR REPLACE TABLE {catalog_name}.{schema_name}.telematics_bronze (
vehicle_id BIGINT COMMENT 'Unique identifier for the vehicle',
vin_number STRING COMMENT 'Vehicle Identification Number (VIN)',
timestamp TIMESTAMP COMMENT 'Represents the time of data capture',
-- Location Data
latitude DOUBLE COMMENT 'Latitude coordinate of the vehicle at the time of data capture',
longitude DOUBLE COMMENT 'Longitude coordinate of the vehicle at the time of data capture',
-- Safety & Diagnostics
abs BOOLEAN COMMENT 'Indicates if the Anti-lock Braking System (ABS) was activated.',
tpms BOOLEAN COMMENT 'Indicates if the tire pressure monitoring system (TPMS) detected low tire pressure.',
brake_pad BOOLEAN COMMENT 'Indicates if the brake pad needs to be changed.',
accelerometer BOOLEAN COMMENT 'Indicates if the vehicles acceleration level was high.',
blind_spot BOOLEAN COMMENT 'Indicates the presence of a vehicle in the drivers blind spot.',
lane_departure BOOLEAN COMMENT 'Indicates if the vehicle departed from its lane.',
fcw BOOLEAN COMMENT 'Indicates a potential forward collision detected by the FCW system.',
parking_sensor BOOLEAN COMMENT 'Indicates presence of obstacles in the vehicles parking path.',
rear_view_camera BOOLEAN COMMENT 'Indicates presence of obstacles detected by the rear-view camera.',
engine_light BOOLEAN COMMENT 'Indicates if the engine light is on.',
rain_sensor BOOLEAN COMMENT 'Indicates presence of rain detected by the rain sensor.',
wiper_blades BOOLEAN COMMENT 'Indicates if the wipers were on.',
fog_light BOOLEAN COMMENT 'Indicates if fog lights were activated.',
seatbelt_off BOOLEAN COMMENT 'Indicates if a seatbelt was unfastened during the trip.',
door_ajar BOOLEAN COMMENT 'Indicates if any of the vehicle doors were open.',
-- Fuel & Engine Metrics
total_fuel_consumption DOUBLE COMMENT 'Total fuel consumption in liters',
fuel_level_percent DOUBLE COMMENT 'Fuel level as a percentage',
fuel_level_liters DOUBLE COMMENT 'Fuel level in liters',
vehicle_mileage_dashboard DOUBLE COMMENT 'Mileage reading from vehicle dashboard (in kilometers)',
vehicle_mileage_computed DOUBLE COMMENT 'Mileage calculated since adapter installation (in kilometers)',
engine_speed_rpm INT COMMENT 'Engine speed in RPM',
oil_level_status BOOLEAN COMMENT 'Indicates oil level/pressure status',
engine_temperature DOUBLE COMMENT 'Engine temperature in Celsius',
vehicle_speed DOUBLE COMMENT 'Vehicle speed in km/h',
accelerator_pedal_position DOUBLE COMMENT 'Accelerator pedal position as a percentage',
-- Lights
lights_parking BOOLEAN COMMENT 'Parking lights status',
lights_dipped BOOLEAN COMMENT 'Dipped beam headlights status',
lights_full_beam BOOLEAN COMMENT 'Full beam headlights status',
lights_fog_front BOOLEAN COMMENT 'Front fog lights status',
lights_fog_rear BOOLEAN COMMENT 'Rear fog lights status',
-- Door & Covers
door_front_left BOOLEAN COMMENT 'Front left door open status',
door_front_right BOOLEAN COMMENT 'Front right door open status',
door_rear_right BOOLEAN COMMENT 'Rear right door open status',
door_rear_left BOOLEAN COMMENT 'Rear left door open status',
trunk_cover BOOLEAN COMMENT 'Trunk cover open status',
engine_hood BOOLEAN COMMENT 'Engine cover (hood) open status',
-- Electric Vehicle (EV) Metrics
charging_cable_status BOOLEAN COMMENT 'Indicates whether the charging cable is connected',
charging_status BOOLEAN COMMENT 'Indicates whether the vehicle is currently charging',
battery_level DOUBLE COMMENT 'Battery level as a percentage',
vehicle_range_km DOUBLE COMMENT 'Estimated remaining driving range on battery (in kilometers)',
battery_temperature DOUBLE COMMENT 'Battery temperature in Celsius',
-- CNG & Emissions
cng_level DOUBLE COMMENT 'CNG level in percentage or volume',
total_cng_consumption DOUBLE COMMENT 'Total compressed natural gas consumed in liters',
engine_on_cng BOOLEAN COMMENT 'Indicates if engine is currently running on CNG',
adblue_level_dashboard DOUBLE COMMENT 'AdBlue level from dashboard',
-- Other Systems
seatbelt_driver BOOLEAN COMMENT 'Drivers seat belt status',
seatbelt_passenger BOOLEAN COMMENT 'Passengers seat belt status',
-- Warning Indicators
check_engine BOOLEAN COMMENT 'Check engine warning status',
lights_failure BOOLEAN COMMENT 'Light failure warning status',
low_tire_pressure BOOLEAN COMMENT 'Low tire pressure warning status'

)
COMMENT 'This table stores raw telematics data collected from vehicles, including location, fuel and engine metrics, electric vehicle parameters, CNG usage, system diagnostics, and safety indicators. It provides a comprehensive view of vehicle behavior and condition over time, supporting downstream analytics such as predictive maintenance, driver behavior analysis, and geospatial insights.'
""")

Contributing Factors

In [None]:

spark.sql(f"""
CREATE OR REPLACE TABLE {catalog_name}.{schema_name}.contributing_factor_tags_lkp_gold (
  contributing_factor_grouping STRING COMMENT 'Represents the category or grouping of contributing factors.',
  contributing_factor STRING COMMENT 'Identifies the specific contributing factor within the grouping.',
  abs BOOLEAN COMMENT 'Represents the Anti-lock Braking System status, indicating if the ABS was activated',
  tpms BOOLEAN COMMENT 'Represents the tire pressure monitoring system (TPMS) status, indicating the tire pressure was low',
  brake_pad BOOLEAN COMMENT 'Represents the brake pad condition, indicating if brake pad need to be changed',
  accelerometer BOOLEAN COMMENT 'Represents the accelerometer data, indicating the vehicles acceleration level was high',
  blind_spot BOOLEAN COMMENT 'Represents the blind spot sensor data, indicating the presence or absence of vehicles in the driver blind spot.',
  lane_departure BOOLEAN COMMENT 'Represents the lane departure warning system (LDWS) data, indicating the presence or absence of the vehicleposition in its lane.',
  fcw BOOLEAN COMMENT 'Represents the forward collision warning (FCW) system data, indicating the presence or absence of potential collisions.',
  parking_sensor BOOLEAN COMMENT 'Represents the parking sensor data, indicating the presence or absence of obstacles in the vehicle parking path.',
  rear_view_camera BOOLEAN COMMENT 'Represents the rear-view camera data, indicating the presence or absence of obstacles behind the vehicle.',
  engine_light BOOLEAN COMMENT 'Represents the engine light status, indicating the presence or absence of engine issues.',
  rain_sensor BOOLEAN COMMENT 'Represents the rain sensor data, indicating the presence or absence of rain.',
  wiper_baldes BOOLEAN COMMENT 'Represents the wiper status, indicating the wiper was on or off',
  fog_light BOOLEAN COMMENT 'Represents the fog light status, indicating the presence or absence of fog.',
  seatbelt_off BOOLEAN COMMENT 'Represents the seatbelt status, indicating the presence or absence of seatbelt usage.',
  door_ajar BOOLEAN COMMENT 'Represents the door status, indicating the presence or absence of open doors.'
)
COMMENT 'Contains data on various contributing factors that can impact the performance of a vehicle. It includes information on factors like tire pressure, brake pads, and sensor data. This data can be used to identify potential issues that may affect the safety and reliability of the vehicle, enabling proactive maintenance and repair. Additionally, it can help in tracking the performance of the vehicle over time and identifying trends in the data.'

""")

Weather

In [None]:

spark.sql(f"""
CREATE OR REPLACE TABLE {catalog_name}.{schema_name}.weather_history_bronze (
  datetime TIMESTAMP COMMENT 'Represents the time when the weather data was recorded.',
  date DATE COMMENT 'Represents the date when the weather data was recorded.',
  latitude DOUBLE COMMENT 'Represents the latitude of the location where the weather data was recorded.',
  longitude DOUBLE COMMENT 'Represents the longitude of the location where the weather data was recorded.',
  temperature_2m FLOAT COMMENT 'Represents the temperature at 2 meters above the ground level.',
  precipitation FLOAT COMMENT 'Represents the precipitation level, measured in millimeters.',
  rain FLOAT COMMENT 'Represents the rainfall level, measured in millimeters.',
  snowfall FLOAT COMMENT 'Represents the snowfall level, measured in millimeters.',
  snow_depth FLOAT COMMENT 'Represents the snow depth at the location, measured in centimeters.',
  weather_code FLOAT COMMENT 'Represents the weather code, which can be used to identify the weather type.',
  wind_speed_10m FLOAT COMMENT 'Represents the wind speed at 10 meters above the ground level.',
  wind_direction_10m FLOAT COMMENT 'Represents the wind direction at 10 meters above the ground level.',
  wind_gusts_10m FLOAT COMMENT 'Represents the wind gusts at 10 meters above the ground level.'
)
COMMENT 'The table contains historical weather. It includes information such as temperature, precipitation, wind speed, and wind direction. This data can be useful for various purposes, including weather forecasting, historical weather analysis, and climate modeling. For instance, it can help in understanding weather patterns, predicting future weather trends, and assessing the impact of climate change.'
""")

Final Gold Table - Trip Analytics Synthesis

In [None]:
spark.sql(f"""
CREATE OR REPLACE TABLE {catalog_name}.{schema_name}.trip_analyics_synthesis_gold (
  trip_trip_id LONG COMMENT 'Reference to the trip id in trips_silver.',
  trip_start_datetime TIMESTAMP COMMENT 'The timestamp of the taxi trip pickup.',
  trip_end_datetime TIMESTAMP COMMENT 'The timestamp of the taxi trip dropoff.',
  trip_latitude DOUBLE COMMENT 'Latitude of the trip route, allowing tracking of the trip path.',
  trip_longitude DOUBLE COMMENT 'Longitude of the trip route, allowing tracking of the trip path.',
  trip_path_order LONG COMMENT 'Unique identifier for the trip order, allowing easy reference and tracking.',
  trip_path_description STRING,
  trip_h3_index LONG,
  collision_crash_date DATE,
  collision_crash_time TIMESTAMP,
  collision_borough STRING COMMENT 'Represents the borough where the collision took place.',
  collision_zip_code STRING COMMENT 'Represents the zip code where the collision took place.',
  collision_latitude DOUBLE COMMENT 'Represents the latitude of the collision location.',
  collision_longitude DOUBLE COMMENT 'Represents the longitude of the collision location.',
  collision_on_street_name STRING COMMENT 'Represents the name of the street where the collision took place.',
  collision_cross_street_name STRING COMMENT 'Represents the name of the cross street where the collision took place.',
  collision_off_street_name STRING COMMENT 'Represents the name of the off-street location where the collision took place.',
  collision_number_of_persons_injured INT,
  collision_number_of_persons_killed INT COMMENT 'Represents the number of people who were killed in the collision.',
  collision_number_of_pedestrians_injured INT COMMENT 'Represents the number of pedestrians who were injured in the collision.',
  collision_number_of_pedestrians_killed INT COMMENT 'Represents the number of pedestrians who were killed in the collision.',
  collision_number_of_cyclist_injured INT COMMENT 'Represents the number of cyclists who were injured in the collision.',
  collision_number_of_cyclist_killed INT,
  collision_number_of_motorist_injured INT,
  collision_number_of_motorist_killed INT COMMENT 'Represents the number of motorists who were killed in the collision.',
  collision_contributing_factor_vehicle_1 STRING COMMENT 'Represents the first contributing factor of the collision, related to one of the vehicles involved.',
  collision_contributing_factor_vehicle_2 STRING COMMENT 'Represents the second contributing factor of the collision, related to one of the vehicles involved.',
  collision_contributing_factor_vehicle_3 STRING COMMENT 'Represents the third contributing factor of the collision, if applicable.',
  collision_contributing_factor_vehicle_4 STRING COMMENT 'Represents the fourth contributing factor of the collision, if applicable.',
  collision_contributing_factor_vehicle_5 STRING COMMENT 'Represents the fifth contributing factor of the collision, if applicable.',
  collision_collision_id INT COMMENT 'Unique identifier for the collision, allowing easy reference and tracking.',
  collision_vehicle_type_code_1 STRING COMMENT 'First vehicle type code associated with the collision.',
  collision_vehicle_type_code_2 STRING COMMENT 'Second vehicle type code associated with the collision.',
  collision_vehicle_type_code_3 STRING COMMENT 'Third vehicle type code associated with the collision.',
  collision_vehicle_type_code_4 STRING COMMENT 'Fourth vehicle type code associated with the collision.',
  collision_vehicle_type_code_5 STRING COMMENT 'Fifth vehicle type code associated with the collision.',
  contributing_factor STRING,
  weather_temperature_2m FLOAT COMMENT 'Represents the temperature at 2 meters above the ground level.',
  weather_precipitation FLOAT COMMENT 'Represents the precipitation level, measured in millimeters.',
  weather_rain FLOAT COMMENT 'Represents the rainfall level, measured in millimeters.',
  weather_snowfall FLOAT COMMENT 'Represents the snowfall level, measured in millimeters.',
  weather_snow_depth FLOAT COMMENT 'Represents the snow depth at the location, measured in centimeters.',
  weather_weather_code FLOAT COMMENT 'Represents the weather code, which can be used to identify the weather type.',
  weather_wind_speed_10m FLOAT COMMENT 'Represents the wind speed at 10 meters above the ground level.',
  weather_wind_direction_10m FLOAT COMMENT 'Represents the wind direction at 10 meters above the ground level.',
  weather_wind_gusts_10m FLOAT COMMENT 'Represents the wind gusts at 10 meters above the ground level.',
  traffic_volume INT COMMENT 'The total sum of counts collected within a 15-minute increment.',
  road_severity STRING COMMENT 'Severity level of the road condition or event.',
  road_roadway_name STRING COMMENT 'Name of the affected roadway.',
  road_direction_of_travel STRING COMMENT 'Direction of travel on the affected roadway.',
  road_description STRING COMMENT 'Description of the road event or condition.',
  road_lanes_affected STRING COMMENT 'Information about lanes impacted by the event.',
  road_lanes_status STRING COMMENT 'Current status of affected lanes (e.g., closed, open, restricted).',
  road_navteq_link_id STRING COMMENT 'Navteq link ID used for mapping the roadway segment.',
  road_primary_location STRING COMMENT 'Primary location detail for the road event.',
  road_secondary_location STRING COMMENT 'Secondary location reference related to the event.',
  road_first_article_city STRING COMMENT 'Primary city associated with the road event.',
  road_second_city STRING COMMENT 'Secondary city or neighboring city reference.',
  road_event_type STRING COMMENT 'General type of road event (e.g., construction, incident, closure).',
  road_event_sub_type STRING COMMENT 'More specific classification of the road event.',
  road_reported_date TIMESTAMP COMMENT 'Timestamp when the event was first reported.',
  road_start_date TIMESTAMP COMMENT 'Planned or actual start date of the event.',
  road_planned_end_date TIMESTAMP COMMENT 'Planned end date of the road event.',
  telematics_abs INT,
  telematics_accelerometer INT,
  telematics_blind_spot INT,
  telematics_brake_pad INT,
  telematics_door_ajar INT,
  telematics_engine_light INT,
  telematics_fcw INT,
  telematics_fog_light INT,
  telematics_lane_departure INT,
  telematics_parking_sensor INT,
  telematics_rain_sensor INT,
  telematics_rear_view_camera INT,
  telematics_seatbelt_off INT,
  telematics_tpms INT,
  telematics_wiper_blades INT,
  vehicle_id LONG,
  is_collision BOOLEAN
)
COMMENT 'Contains a summary of trip analytics, including vehicle information, trip details, weather conditions, and contributing factors to collisions. This data can be used for traffic analysis, safety assessments, and operational decision-making. It provides insights into the relationship between weather conditions and traffic incidents, helping to improve road safety and inform transportation planning.'
""")

### Implement Medallion Pipeline

#### Bronze Tables

##### Load files with Auto Loader
Simply load from bronze tables using Auto Loader

Note: We process compressed files directly, but for large-scale deployments, it’s recommended to decompress files before using AutoLoader.

In [None]:
volume_path = f"/Volumes/{catalog_name}/{schema_name}/{volume_name}"

df_casted = (
  spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .option("cloudFiles.validateOptions", "false")
    .option("header", "true")
    .option("cloudFiles.schemaLocation", f"{volume_path}/schema/collision_bronze")
    .load(f"{volume_path}/data/collisions")
        .selectExpr(
        "`CRASH DATE` AS crash_date",
        "`CRASH TIME` AS crash_time",
        "`BOROUGH` AS borough",
        "`ZIP CODE` AS zip_code",
        "CAST(`LATITUDE` AS DOUBLE) AS latitude",
        "CAST(`LONGITUDE` AS DOUBLE) AS longitude",
        "`LOCATION` AS location",
        "`ON STREET NAME` AS on_street_name",
        "`CROSS STREET NAME` AS cross_street_name",
        "`OFF STREET NAME` AS off_street_name",
        "`NUMBER OF PERSONS INJURED` AS number_of_persons_injured",
        "CAST(`NUMBER OF PERSONS KILLED` AS INT) AS number_of_persons_killed",
        "CAST(`NUMBER OF PEDESTRIANS INJURED` AS INT) AS number_of_pedestrians_injured",
        "CAST(`NUMBER OF PEDESTRIANS KILLED` AS INT) AS number_of_pedestrians_killed",
        "CAST(`NUMBER OF CYCLIST INJURED` AS INT) AS number_of_cyclist_injured",
        "`NUMBER OF CYCLIST KILLED` AS number_of_cyclist_killed",
        "`NUMBER OF MOTORIST INJURED` AS number_of_motorist_injured",
        "CAST(`NUMBER OF MOTORIST KILLED` AS INT) AS number_of_motorist_killed",
        "`CONTRIBUTING FACTOR VEHICLE 1` AS contributing_factor_vehicle_1",
        "`CONTRIBUTING FACTOR VEHICLE 2` AS contributing_factor_vehicle_2",
        "`CONTRIBUTING FACTOR VEHICLE 3` AS contributing_factor_vehicle_3",
        "`CONTRIBUTING FACTOR VEHICLE 4` AS contributing_factor_vehicle_4",
        "`CONTRIBUTING FACTOR VEHICLE 5` AS contributing_factor_vehicle_5",
        "CAST(`COLLISION_ID` AS INT) AS collision_id",
        "`VEHICLE TYPE CODE 1` AS vehicle_type_code_1",
        "`VEHICLE TYPE CODE 2` AS vehicle_type_code_2",
        "`VEHICLE TYPE CODE 3` AS vehicle_type_code_3",
        "`VEHICLE TYPE CODE 4` AS vehicle_type_code_4",
        "`VEHICLE TYPE CODE 5` AS vehicle_type_code_5"
    )
)

df_casted.writeStream \
    .format("delta") \
    .option("checkpointLocation", f"{volume_path}/checkpoints/collisions_bronze{checkpoint_suffix}") \
    .trigger(availableNow=True) \
    .toTable(f"{catalog_name}.{schema_name}.collision_bronze").awaitTermination()



In [None]:
display(spark.read.table(f"{catalog_name}.{schema_name}.collision_bronze").summary())

In [None]:
volume_path = f"/Volumes/{catalog_name}/{schema_name}/{volume_name}"

df_casted = (
  spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .option("header", "true")
    .option("cloudFiles.schemaLocation", f"{volume_path}/schema/road_condition_bronze")
    .load(f"{volume_path}/data/road_condition")
    .selectExpr(
        "CAST(latitude AS DOUBLE) AS latitude",
        "CAST(longitude AS DOUBLE) AS longitude",
        "CAST(ID AS STRING) AS id",
        "CAST(RegionName AS STRING) AS region_name",
        "CAST(CountyName AS STRING) AS county_name",
        "CAST(Severity AS STRING) AS severity",
        "CAST(RoadwayName AS STRING) AS roadway_name",
        "CAST(DirectionOfTravel AS STRING) AS direction_of_travel",
        "CAST(Description AS STRING) AS description",
        "CAST(LanesAffected AS STRING) AS lanes_affected",
        "CAST(LanesStatus AS STRING) AS lanes_status",
        "CAST(NavteqLinkId AS STRING) AS navteq_link_id",
        "CAST(PrimaryLocation AS STRING) AS primary_location",
        "CAST(SecondaryLocation AS STRING) AS secondary_location",
        "CAST(FirstArticleCity AS STRING) AS first_article_city",
        "CAST(SecondCity AS STRING) AS second_city",
        "CAST(EventType AS STRING) AS event_type",
        "CAST(EventSubType AS STRING) AS event_sub_type",
        "CAST(Reported_Date AS TIMESTAMP) AS reported_date",
        "CAST(Start_Date AS TIMESTAMP) AS start_date",
        "CAST(Planned_End_Date AS TIMESTAMP) AS planned_end_date"
    )
)

df_casted.writeStream \
    .format("delta") \
    .option("checkpointLocation", f"{volume_path}/checkpoints/road_condition_bronze{checkpoint_suffix}") \
    .trigger(availableNow=True) \
    .toTable(f"{catalog_name}.{schema_name}.road_condition_bronze").awaitTermination()

In [None]:
display(spark.read.table(f"{catalog_name}.{schema_name}.road_condition_bronze").summary())

In [None]:
volume_path = f"/Volumes/{catalog_name}/{schema_name}/{volume_name}"

df_casted = (
  spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .option("header", "true")
    .option("cloudFiles.inferColumnTypes", "false")
    .option("cloudFiles.schemaLocation", f"{volume_path}/schema/traffic_volume_counts_bronze")
    .load(f"{volume_path}/data/traffic_volume")
    .selectExpr(
        "CAST(`RequestID` AS BIGINT) AS requestid",
        "`Boro` AS boro",
        "CAST(`Yr` AS INT) AS yr",
        "CAST(`M` AS INT) AS m",
        "CAST(`D` AS INT) AS d",
        "CAST(`HH` AS INT) AS hh",
        "CAST(`MM` AS INT) AS mm",
        "CAST(`Vol` AS INT) AS vol",
        "CAST(`SegmentID` AS BIGINT) AS segmentid",
        "`WktGeom` AS wktgeom",
        "`street` AS street",
        "`fromSt` AS fromst",
        "`toSt` AS tost",
        "`Direction` AS direction"
    )
)

df_casted.writeStream \
    .format("delta") \
    .option("checkpointLocation", f"{volume_path}/checkpoints/traffic_volume_counts_bronze{checkpoint_suffix}") \
    .trigger(availableNow=True) \
    .toTable(f"{catalog_name}.{schema_name}.traffic_volume_counts_bronze").awaitTermination()

display(spark.read.table(f"{catalog_name}.{schema_name}.traffic_volume_counts_bronze").summary())


In [None]:
volume_path = f"/Volumes/{catalog_name}/{schema_name}/{volume_name}"

df_casted = (
  spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .option("header", "true")
    .option("cloudFiles.inferColumnTypes", "false")
    .option("cloudFiles.schemaLocation", f"{volume_path}/schema/contributing_factor")
    .load(f"{volume_path}/data/contributing_factor")
    .selectExpr(
        "`contributing_factor_grouping` AS contributing_factor_grouping",
        "`contributing_factor` AS contributing_factor",
        "CAST(`abs` AS BOOLEAN) AS abs",
        "CAST(`tpms` AS BOOLEAN) AS tpms",
        "CAST(`brake_pad` AS BOOLEAN) AS brake_pad",
        "CAST(`accelerometer` AS BOOLEAN) AS accelerometer",
        "CAST(`blind_spot` AS BOOLEAN) AS blind_spot",
        "CAST(`lane_departure` AS BOOLEAN) AS lane_departure",
        "CAST(`fcw` AS BOOLEAN) AS fcw",
        "CAST(`parking_sensor` AS BOOLEAN) AS parking_sensor",
        "CAST(`rear_view_camera` AS BOOLEAN) AS rear_view_camera",
        "CAST(`engine_light` AS BOOLEAN) AS engine_light",
        "CAST(`rain_sensor` AS BOOLEAN) AS rain_sensor",
        "CAST(`wiper_baldes` AS BOOLEAN) AS wiper_baldes",
        "CAST(`fog_light` AS BOOLEAN) AS fog_light",
        "CAST(`seatbelt_off` AS BOOLEAN) AS seatbelt_off",
        "CAST(`door_ajar` AS BOOLEAN) AS door_ajar"
    )
)

df_casted.writeStream \
    .format("delta") \
    .option("checkpointLocation", f"{volume_path}/checkpoints/contributing_factor{checkpoint_suffix}") \
    .trigger(availableNow=True) \
    .toTable(f"{catalog_name}.{schema_name}.contributing_factor_tags_lkp_gold").awaitTermination()

display(spark.read.table(f"{catalog_name}.{schema_name}.contributing_factor_tags_lkp_gold").summary())


In [None]:
volume_path = f"/Volumes/{catalog_name}/{schema_name}/{volume_name}"

df_casted = (
  spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .option("header", "true")
    .option("cloudFiles.schemaLocation", f"{volume_path}/schema/weather_history_bronze")
    .load(f"{volume_path}/data/weather")
    .selectExpr(
        "CAST(datetime AS TIMESTAMP) AS datetime",
        "CAST(date AS DATE) AS date",
        "CAST(latitude AS DOUBLE) AS latitude",
        "CAST(longitude AS DOUBLE) AS longitude",
        "CAST(temperature_2m AS FLOAT) AS temperature_2m",
        "CAST(precipitation AS FLOAT) AS precipitation",
        "CAST(rain AS FLOAT) AS rain",
        "CAST(snowfall AS FLOAT) AS snowfall",
        "CAST(snow_depth AS FLOAT) AS snow_depth",
        "CAST(weather_code AS FLOAT) AS weather_code",
        "CAST(wind_speed_10m AS FLOAT) AS wind_speed_10m",
        "CAST(wind_direction_10m AS FLOAT) AS wind_direction_10m",
        "CAST(wind_gusts_10m AS FLOAT) AS wind_gusts_10m"
    )
)

df_casted.writeStream \
    .format("delta") \
    .option("checkpointLocation", f"{volume_path}/checkpoints/weather_history_bronze{checkpoint_suffix}") \
    .trigger(availableNow=True) \
    .toTable(f"{catalog_name}.{schema_name}.weather_history_bronze").awaitTermination()


In [None]:

display(spark.read.table(f"{catalog_name}.{schema_name}.weather_history_bronze").summary())

##### **Load Rides from samples catalog**

Populates directly from Databricks sample catalog (NYC Taxi Rides Sample). 

In [None]:


spark.sql(f"""
  MERGE INTO {catalog_name}.{schema_name}.trips_bronze AS target
  USING (
    SELECT 
      MAKE_TIMESTAMP(2024, MONTH(tpep_pickup_datetime), DAY(tpep_pickup_datetime), HOUR(tpep_pickup_datetime), MINUTE(tpep_pickup_datetime), SECOND(tpep_pickup_datetime)) AS tpep_pickup_datetime,
      MAKE_TIMESTAMP(2024, MONTH(tpep_dropoff_datetime), DAY(tpep_dropoff_datetime), HOUR(tpep_dropoff_datetime), MINUTE(tpep_dropoff_datetime), SECOND(tpep_dropoff_datetime)) AS tpep_dropoff_datetime,
      trip_distance,
      NULL AS fare_amount,
      pickup_zip,
      dropoff_zip,
      'Taxi' AS trip_type
    FROM samples.nyctaxi.trips
  ) AS source
  ON FALSE
  WHEN NOT MATCHED THEN INSERT (
    tpep_pickup_datetime,
    tpep_dropoff_datetime,
    trip_distance,
    fare_amount,
    pickup_zip,
    dropoff_zip,
    trip_type
  )
  VALUES (
    source.tpep_pickup_datetime,
    source.tpep_dropoff_datetime,
    source.trip_distance,
    source.fare_amount,
    source.pickup_zip,
    source.dropoff_zip,
    source.trip_type
  );
  """)

##### Build synthetic Telematics data

Generate 1M synthetic records using  [Databricks Labs Data Generator (dbldatagen)](https://github.com/databrickslabs/dbldatagen)




In [None]:
from dbldatagen import DataGenerator
from pyspark.sql.functions import col, when, rand

data_gen = (DataGenerator(spark, rows=1000000, partitions=8)
    .withColumn("vehicle_id", "bigint", minValue=1, maxValue=1000000)
    .withColumn("vin_number", "string", format="vin")
    .withColumn("timestamp", "timestamp", begin="2024-01-01 00:00:00", end="2024-12-31 23:59:59", interval="1 second")
    .withColumn("latitude", "double", minValue=40.477399, maxValue=45.01585)
    .withColumn("longitude", "double", minValue=-79.76259, maxValue=-71.7517)
    
    .withColumn("abs", "boolean", expr="floor(rand() * 2)")
    .withColumn("tpms", "boolean", expr="floor(rand() * 2)")
    .withColumn("brake_pad", "boolean", expr="floor(rand() * 2)")
    .withColumn("accelerometer", "boolean", expr="floor(rand() * 2)")
    .withColumn("blind_spot", "boolean", expr="floor(rand() * 2)")
    .withColumn("lane_departure", "boolean", expr="floor(rand() * 2)")
    .withColumn("fcw", "boolean", expr="floor(rand() * 2)")
    .withColumn("parking_sensor", "boolean", expr="floor(rand() * 2)")
    .withColumn("rear_view_camera", "boolean", expr="floor(rand() * 2)")
    .withColumn("engine_light", "boolean", expr="floor(rand() * 2)")
    .withColumn("rain_sensor", "boolean", expr="floor(rand() * 2)")
    .withColumn("wiper_blades", "boolean", expr="floor(rand() * 2)")
    .withColumn("fog_light", "boolean", expr="floor(rand() * 2)")
    .withColumn("seatbelt_off", "boolean", expr="floor(rand() * 2)")
    .withColumn("door_ajar", "boolean", expr="floor(rand() * 2)")
    
    .withColumn("total_fuel_consumption", "double", minValue=0, maxValue=1000)
    .withColumn("fuel_level_percent", "double", minValue=0, maxValue=100)
    .withColumn("fuel_level_liters", "double", minValue=0, maxValue=50)
    .withColumn("vehicle_mileage_dashboard", "double", minValue=0, maxValue=300000)
    .withColumn("vehicle_mileage_computed", "double", minValue=0, maxValue=300000)
    .withColumn("engine_speed_rpm", "integer", minValue=0, maxValue=8000)
    .withColumn("oil_level_status", "boolean", expr="floor(rand() * 2)")
    .withColumn("engine_temperature", "double", minValue=-40, maxValue=150)
    .withColumn("vehicle_speed", "double", minValue=0, maxValue=200)
    .withColumn("accelerator_pedal_position", "double", minValue=0, maxValue=100)

    .withColumn("lights_parking", "boolean", expr="floor(rand() * 2)")
    .withColumn("lights_dipped", "boolean", expr="floor(rand() * 2)")
    .withColumn("lights_full_beam", "boolean", expr="floor(rand() * 2)")
    .withColumn("lights_fog_front", "boolean", expr="floor(rand() * 2)")
    .withColumn("lights_fog_rear", "boolean", expr="floor(rand() * 2)")

    .withColumn("door_front_left", "boolean", expr="floor(rand() * 2)")
    .withColumn("door_front_right", "boolean", expr="floor(rand() * 2)")
    .withColumn("door_rear_right", "boolean", expr="floor(rand() * 2)")
    .withColumn("door_rear_left", "boolean", expr="floor(rand() * 2)")
    .withColumn("trunk_cover", "boolean", expr="floor(rand() * 2)")
    .withColumn("engine_hood", "boolean", expr="floor(rand() * 2)")

    .withColumn("charging_cable_status", "boolean", expr="floor(rand() * 2)")
    .withColumn("charging_status", "boolean", expr="floor(rand() * 2)")
    .withColumn("battery_level", "double", minValue=0, maxValue=100)
    .withColumn("vehicle_range_km", "double", minValue=0, maxValue=500)
    .withColumn("battery_temperature", "double", minValue=-40, maxValue=60)
    
    .withColumn("cng_level", "double", minValue=0, maxValue=100)
    .withColumn("total_cng_consumption", "double", minValue=0, maxValue=1000)
    .withColumn("engine_on_cng", "boolean", expr="floor(rand() * 2)")
    .withColumn("adblue_level_dashboard", "double", minValue=0, maxValue=100)
    
    .withColumn("seatbelt_driver", "boolean", expr="floor(rand() * 2)")
    .withColumn("seatbelt_passenger", "boolean", expr="floor(rand() * 2)")
    
    .withColumn("check_engine", "boolean", expr="floor(rand() * 2)")
    .withColumn("lights_failure", "boolean", expr="floor(rand() * 2)")
    .withColumn("low_tire_pressure", "boolean", expr="floor(rand() * 2)")
)

# Build the DataFrame
df_telematics = data_gen.build()

# Apply rare condition simulations (1% high acceleration, speed, rpm)
df_telematics = df_telematics.withColumn(
    "accelerator_pedal_position",
    when(rand() < 0.01, 100.0).otherwise(col("accelerator_pedal_position"))
)

df_telematics = df_telematics.withColumn(
    "vehicle_speed",
    when(rand() < 0.01, 200.0).otherwise(col("vehicle_speed"))
)

df_telematics = df_telematics.withColumn(
    "engine_speed_rpm",
    when(rand() < 0.01, 8000).otherwise(col("engine_speed_rpm"))
)

In [None]:
display(df_telematics.summary())

In [None]:
# Write the data to the Delta table
df_telematics.write.mode("append").insertInto(f"{catalog_name}.{schema_name}.telematics_bronze")

#### Silver Tables

##### Collisions 

Perform clean up and basic transformations.

In [None]:
from pyspark.sql.functions import to_date, col, concat_ws, unix_timestamp, regexp_replace, trim

df = spark.sql(f"select * from {catalog_name}.{schema_name}.collision_bronze")
df = df.drop("location")

# Clean date
df = df.withColumn("crash_date", regexp_replace("crash_date", "[a-zA-Z.]", ""))
df = df.withColumn("crash_date", to_date(col("crash_date"), "MM/dd/yyyy"))

# Clean and sanitize crash_time
df = df.withColumn("crash_time", trim(col("crash_time")))

# Filter out rows where crash_time doesn't look like a time (e.g., missing digits or non-numeric)
df = df.filter(col("crash_time").rlike("^[0-9]{1,2}:[0-9]{2}$"))

# Safely parse timestamp
df = df.withColumn(
    "crash_time",
    unix_timestamp(concat_ws(" ", col("crash_date"), col("crash_time")), "yyyy-MM-dd H:mm").cast("timestamp")
)

# Cast columns
df = df.withColumn("number_of_persons_injured", col("number_of_persons_injured").cast("int"))
df = df.withColumn("number_of_cyclist_killed", col("number_of_cyclist_killed").cast("int"))
df = df.withColumn("number_of_motorist_injured", col("number_of_motorist_injured").cast("int"))

# Final filter
df = df.filter(col("crash_date").isNotNull())

# Write to table
df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable(
    f"{catalog_name}.{schema_name}.collision_silver"
)

##### **Volume**

Build Silver Table for Traffic Volume using Databricks Built-in Geospatial Function

For demo purposes, to speed up the pipeline run, we’re populating the silver layer with events from 2024 only.



In [None]:
spark.sql(f"""
WITH computed AS (
  SELECT 
    vol,
    ST_Transform(ST_Centroid(ST_GeomFromText(wktgeom, 2263)), 4326) AS geom,
    MAKE_TIMESTAMP(yr, m, d, hh, mm, 0) AS timestamp
  FROM {catalog_name}.{schema_name}.traffic_volume_counts_bronze
  WHERE wktgeom IS NOT NULL AND vol IS NOT NULL and yr = 2024
)
MERGE INTO {catalog_name}.{schema_name}.traffic_volume_counts_silver AS target
USING (
  SELECT
    vol,
    ST_Y(geom) AS lat,
    ST_X(geom) AS long,
    timestamp,
    h3_longlatash3(ST_X(geom), ST_Y(geom), 9) AS h3_index
  FROM computed
) AS source
ON target.h3_index = source.h3_index AND target.timestamp = source.timestamp
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")

##### **Trips**

Compute latitude and longitude from zip_code

Example using UDF in UC

In [None]:
#spark.sql(f"""
#CREATE OR REPLACE FUNCTION get_lat_long_from_zipcode(zipcode STRING)
#RETURNS STRUCT<latitude: DOUBLE, longitude: DOUBLE>
#LANGUAGE PYTHON
#ENVIRONMENT (
#  dependencies = '["requests"]',
#  environment_version = "None"
#)
#AS $$
#import requests
#url = f"http://api.zippopotam.us/us/{zipcode}"
#response = requests.get(url)
#if response.status_code != 200:
#    return None
#data = response.json()
#try:
#    latitude = float(data['places'][0]['latitude'])
#    longitude = float(data['places'][0]['longitude'])
#except (KeyError, ValueError):
#    return None
#return (latitude, longitude)
#$$;
#""")

In [None]:
sql_query = f"""
MERGE INTO {catalog_name}.{schema_name}.trips_silver AS target
USING (
  SELECT
    tpep_pickup_datetime,
    tpep_dropoff_datetime,
    trip_distance,
    pickup_zip,
    dropoff_zip,
    {catalog_name}.{schema_name}.get_lat_long_from_zipcode(pickup_zip).latitude AS pickup_latitude,
    {catalog_name}.{schema_name}.get_lat_long_from_zipcode(pickup_zip).longitude AS pickup_longitude,
    {catalog_name}.{schema_name}.get_lat_long_from_zipcode(dropoff_zip).latitude AS dropoff_latitude,
    {catalog_name}.{schema_name}.get_lat_long_from_zipcode(dropoff_zip).longitude AS dropoff_longitude,
    trip_type
  FROM {catalog_name}.{schema_name}.trips_bronze
) AS source
ON target.tpep_pickup_datetime = source.tpep_pickup_datetime
   AND target.tpep_dropoff_datetime = source.tpep_dropoff_datetime
   AND target.pickup_zip = source.pickup_zip
   AND target.dropoff_zip = source.dropoff_zip
WHEN MATCHED THEN UPDATE SET
  target.trip_distance = source.trip_distance,
  target.pickup_latitude = source.pickup_latitude,
  target.pickup_longitude = source.pickup_longitude,
  target.dropoff_latitude = source.dropoff_latitude,
  target.dropoff_longitude = source.dropoff_longitude,
  target.trip_type = source.trip_type
WHEN NOT MATCHED THEN INSERT (
  tpep_pickup_datetime,
  tpep_dropoff_datetime,
  trip_distance,
  pickup_zip,
  dropoff_zip,
  pickup_latitude,
  pickup_longitude,
  dropoff_latitude,
  dropoff_longitude,
  trip_type
) VALUES (
  source.tpep_pickup_datetime,
  source.tpep_dropoff_datetime,
  source.trip_distance,
  source.pickup_zip,
  source.dropoff_zip,
  source.pickup_latitude,
  source.pickup_longitude,
  source.dropoff_latitude,
  source.dropoff_longitude,
  source.trip_type
);
"""

#spark.sql(sql_query)

Compute latitude/longitude using traditional UDFs.


In [None]:
from pyspark.sql.functions import pandas_udf
import pgeocode
import pandas as pd
from pyspark.sql.types import StructType, StructField, DoubleType


# Define the schema for the output
schema = StructType([
    StructField("latitude", DoubleType(), True),
    StructField("longitude", DoubleType(), True)
])
nomi = pgeocode.Nominatim('US')

# Define the Pandas UDF
@pandas_udf(schema)
def get_lat_long_from_zipcode(zipcodes: pd.Series) -> pd.DataFrame:
    results = zipcodes.apply(lambda x: nomi.query_postal_code(x).to_dict())
    latitudes = results.apply(lambda x: x['latitude'] if not pd.isna(x['latitude']) else None)
    longitudes = results.apply(lambda x: x['longitude'] if not pd.isna(x['longitude']) else None)
    return pd.DataFrame({'latitude': latitudes, 'longitude': longitudes})

trips_df = spark.sql(f"SELECT * FROM {catalog_name}.{schema_name}.trips_bronze")

# Apply the Pandas UDF to the pickup_zip and dropoff_zip columns
trips_df_with_coords = trips_df.withColumn(
    "pickup", get_lat_long_from_zipcode(trips_df.pickup_zip)
).withColumn(
    "dropoff", get_lat_long_from_zipcode(trips_df.dropoff_zip)
)

# Select the original columns along with the new latitude and longitude columns
result_df = trips_df_with_coords.select(
    "tpep_pickup_datetime", "tpep_dropoff_datetime", "trip_distance", "pickup_zip", "dropoff_zip", trips_df_with_coords["pickup.latitude"].alias("pickup_latitude"), trips_df_with_coords["pickup.longitude"].alias("pickup_longitude") , trips_df_with_coords["dropoff.latitude"].alias("dropoff_latitude"), trips_df_with_coords["dropoff.longitude"].alias("dropoff_longitude"), "trip_type"
)



In [None]:
display(result_df.head(5))
result_df.write.mode("overwrite").saveAsTable(f"{catalog_name}.{schema_name}.trips_silver")

#### Gold Tables


##### Compute Routes for trips

We need to compute routes since our trips only have pickup and drop-off. We want to correlate any collision, road volume, and weather along the way.



In [None]:
class RouteProcessor:
    def __init__(self):
        import osmnx as ox
        self.ox = ox
        self.nx = __import__('networkx')
        self.graph = None

    def load_graph_from_location(self, location):
        self.graph = self.ox.graph_from_place(location, network_type="drive")

    def set_graph(self, graph):
        self.graph = graph

    def load_graph_from_dataset(self, trips_df):
        bounds = trips_df.selectExpr(
            "min(pickup_latitude) as min_lat",
            "max(pickup_latitude) as max_lat",
            "min(pickup_longitude) as min_lon",
            "max(pickup_longitude) as max_lon",
            "min(dropoff_latitude) as min_dlat",
            "max(dropoff_latitude) as max_dlat",
            "min(dropoff_longitude) as min_dlon",
            "max(dropoff_longitude) as max_dlon"
        ).collect()[0]

        min_lat = min(bounds.min_lat, bounds.min_dlat) - 0.01
        max_lat = max(bounds.max_lat, bounds.max_dlat) + 0.01
        min_lon = min(bounds.min_lon, bounds.min_dlon) - 0.01
        max_lon = max(bounds.max_lon, bounds.max_dlon) + 0.01

        bbox = (min_lon, min_lat, max_lon, max_lat)
        logger.info(f"Loading graph for bbox (west, south, east, north): {bbox}")
        self.graph = self.ox.graph_from_bbox(bbox=bbox, network_type="drive")

    def _get_route_path(self, pickup_lat, pickup_lon, dropoff_lat, dropoff_lon):
        try:
            orig_node = self.ox.nearest_nodes(self.graph, pickup_lon, pickup_lat)
            dest_node = self.ox.nearest_nodes(self.graph, dropoff_lon, dropoff_lat)
            route = self.nx.shortest_path(self.graph, orig_node, dest_node, weight='length')
            lats, lons = zip(*[(self.graph.nodes[node]['y'], self.graph.nodes[node]['x']) for node in route])
            return list(lats), list(lons)
        except Exception as e:
            logger.info(f"Routing failed: {e}")
            logger.info(f"Pickup: ({pickup_lat}, {pickup_lon}) | Dropoff: ({dropoff_lat}, {dropoff_lon})")
            return [], []

    def compute_routes_single_node(self, trips_df):
        import pandas as pd
        import concurrent.futures

        pandas_df = trips_df.select(
            "id", "tpep_pickup_datetime", "tpep_dropoff_datetime",
            "pickup_zip", "dropoff_zip", 
            "pickup_latitude", "pickup_longitude", 
            "dropoff_latitude", "dropoff_longitude"
        ).dropna().distinct().toPandas()

        def process_row(row):
            # Compute the route path (list of latitudes and longitudes)
            path_lat, path_lon = self._get_route_path(
                row['pickup_latitude'], row['pickup_longitude'],
                row['dropoff_latitude'], row['dropoff_longitude']
            )
            exploded_rows = []
            for i, (lat, lon) in enumerate(zip(path_lat, path_lon)):
                # *** Change 2: Populate the output with new schema columns ***
                exploded_rows.append({
                    "trip_id": row["id"],
                    "tpep_pickup_datetime": row["tpep_pickup_datetime"],
                    "tpep_dropoff_datetime": row["tpep_dropoff_datetime"],
                    "path_latitude": lat,
                    "path_longitude": lon,
                    "order": i
                })
            return exploded_rows

        with concurrent.futures.ThreadPoolExecutor() as executor:
            nested_results = list(executor.map(process_row, [row for _, row in pandas_df.iterrows()]))

        # Flatten nested list of exploded routes
        all_points = [point for sublist in nested_results for point in sublist]
        enriched_df = pd.DataFrame(all_points)
        
        return enriched_df


    def compute_routes_multi_node(self,trips_df,logger, graph_path):
        import pandas as pd
        from pyspark.sql.functions import col
        from pyspark.sql.types import (
            StructType, StructField, LongType, DoubleType, IntegerType, TimestampType
        )

        # Define output schema for the result DataFrame
        route_schema = StructType([
            StructField("trip_id", LongType()),
            StructField("tpep_pickup_datetime", TimestampType()),
            StructField("tpep_dropoff_datetime", TimestampType()),
            StructField("path_latitude", DoubleType()),
            StructField("path_longitude", DoubleType()),
            StructField("order", IntegerType())
        ])

        # Load the graph once and broadcast it
        import osmnx as ox
        import networkx as nx
        graph = ox.load_graphml(graph_path)
        broadcast_graph = trips_df.sparkSession.sparkContext.broadcast(graph)

        logger.info(f"Broadcasting graph finished.")

        # Define the route processor as a Pandas function
        def route_processor(pdf: pd.DataFrame) -> pd.DataFrame:
            import networkx as nx
            # Retrieve the broadcast graph instead of reloading it every time.
            graph = broadcast_graph.value

            results = []
            for _, row in pdf.iterrows():
                try:
                    orig_node = ox.nearest_nodes(graph, row['pickup_longitude'], row['pickup_latitude'])
                    dest_node = ox.nearest_nodes(graph, row['dropoff_longitude'], row['dropoff_latitude'])
                    route = nx.shortest_path(graph, orig_node, dest_node, weight='length')
                    lats, lons = zip(*[(graph.nodes[n]['y'], graph.nodes[n]['x']) for n in route])
                    for i, (lat, lon) in enumerate(zip(lats, lons)):
                        results.append({
                            "trip_id": row["id"],
                            "tpep_pickup_datetime": row["tpep_pickup_datetime"],
                            "tpep_dropoff_datetime": row["tpep_dropoff_datetime"],
                            "path_latitude": lat,
                            "path_longitude": lon,
                            "order": i
                        })
                except Exception as Ex:
                    print(f"Error processing trip {row['id']}: {Ex}")
                    continue
            return pd.DataFrame(results)

        # Prepare and clean the input DataFrame
        selected_df = trips_df.select(
            col("id"),
            col("tpep_pickup_datetime"),
            col("tpep_dropoff_datetime"),
            col("pickup_latitude"),
            col("pickup_longitude"),
            col("dropoff_latitude"),
            col("dropoff_longitude")
        ).dropna().distinct()

        
        # Apply the route processor using applyInPandas with groupBy("id")

        # Repartition the DataFrame to ensure applyInPandas will autoscale on a small data set (250K rows).
        result_df = selected_df.repartition(50000).groupBy("id").applyInPandas(route_processor, schema=route_schema).coalesce(20)

        return result_df
   
   

Load and save graph to volume because all Spark nodes need access to Graph file.

In [None]:

processor = RouteProcessor()
processor.load_graph_from_location("New York City, New York, USA") 

graph_path = None


local_file_path = "./cache/graph.graphml"
volume_path = f"/Volumes/{catalog_name}/{schema_name}/{volume_name}"

processor.ox.save_graphml(processor.graph, filepath=local_file_path)

upload_file_to_volume(local_file_path, volume_path, "graph.graphml")

graph_path = f"{volume_path}/graph.graphml"



Compute the route

_**Note:**_ Based on the sample dataset, this can take an average of one hour to compute routes for 250K records on a 2-node small cluster. Scale up the cluster to process faster.
  

In [None]:
trips_df = spark.table(f"{catalog_name}.{schema_name}.trips_silver")

In [None]:
logger.info(f"multi_node_route: {multi_node_route}")
logger.info(f"is_spark_connect: {is_spark_connect}")

logger.info(f"Computing route for {trips_df.count()} trips")

trips_df = processor.compute_routes_multi_node(trips_df, logger, graph_path)

#our multi node implementation does not work on Spark Connect sessions yet.

#if multi_node_route and multi_node_route == True and is_spark_connect == False :

#    trips_df = processor.compute_routes_multi_node(trips_df,logger, graph_path)
#else:
    
#    pandas_df = processor.compute_routes_single_node(trips_df)
    
#    trips_df = spark.createDataFrame(pandas_df)

logger.info(f"Generated {trips_df.count()} route points")



In [None]:
from pyspark.sql.functions import col
trips_df = trips_df.select(
    col("trip_id").cast("bigint"),
    col("path_latitude").cast("double"),
    col("path_longitude").cast("double"),
    col("order").cast("long"),
    col("tpep_pickup_datetime"),
    col("tpep_dropoff_datetime")
)

# Write the final DataFrame to the trips_route_gold table
trips_df.write.mode("overwrite").saveAsTable(f"{catalog_name}.{schema_name}.trips_route_gold")

##### Plot a Route

In [None]:
import pandas as pd
import json
from keplergl import KeplerGl

def get_route_geojson(route_df: pd.DataFrame):
    route_df = route_df.sort_values("order")
    coordinates = list(zip(route_df['path_longitude'], route_df['path_latitude']))
    return {
        "type": "FeatureCollection",
        "features": [{
            "type": "Feature",
            "geometry": {
                "type": "LineString",
                "coordinates": coordinates
            },
            "properties": {}
        }]
    }

single_route_df = spark.table(f"{catalog_name}.{schema_name}.trips_route_gold").filter("trip_id = 10000").orderBy("order")
pandas_route = single_route_df.toPandas()

geojson_data = get_route_geojson(pandas_route)

kepler_map = KeplerGl(height=500)
kepler_map.add_data(data=geojson_data, name="Route Path")
kepler_map

##### Generate final table - trip analytics 

Build final table with several features, combining trips, road condition, weather, traffic volume, and telematics.

_Note: We use rounding to two decimal places when joining tables on latitude and longitude, which results in approximately 800 meters of precision loss. We’re intentionally doing this, as our analytics is not at street-level granularity._

In [None]:
# Build trip analytics synthesis gold table
sql_query = f"""
CREATE OR REPLACE TABLE {catalog_name}.{schema_name}.trip_analyics_synthesis_gold
CLUSTER BY (trip_h3_index, trip_start_datetime)
AS
WITH ordered_routes AS (
  SELECT
    trip_id,
    tpep_pickup_datetime,
    tpep_dropoff_datetime,
    path_latitude AS trip_latitude,
    path_longitude AS trip_longitude,
    order as path_order,
    COUNT(*) OVER (
      PARTITION BY trip_id
    ) AS total_points
  FROM {catalog_name}.{schema_name}.trips_route_gold
),
route_with_labels AS (
  SELECT DISTINCT
    trip_id,
    tpep_pickup_datetime,
    tpep_dropoff_datetime,
    trip_latitude,
    trip_longitude,
    path_order,
    CASE
      WHEN path_order <= total_points * 0.30 THEN 'init'
      WHEN path_order > total_points * 0.30 
         AND path_order < total_points * 0.80 THEN 'mid'
      ELSE 'end'
    END AS path_description,
    h3_longlatash3(trip_longitude, trip_latitude, 9) AS trip_h3_index
  FROM ordered_routes
),
trip_with_collision AS (
  SELECT
    t.trip_id AS trip_trip_id,
    t.tpep_pickup_datetime AS trip_start_datetime,
    t.tpep_dropoff_datetime AS trip_end_datetime,
    t.trip_latitude AS trip_latitude,
    t.trip_longitude AS trip_longitude,
    t.path_order AS trip_path_order,
    t.path_description AS trip_path_description,
    t.trip_h3_index AS trip_h3_index,
    c.crash_date AS collision_crash_date,
    c.crash_time AS collision_crash_time,
    c.borough AS collision_borough,
    c.zip_code AS collision_zip_code,
    c.latitude AS collision_latitude,
    c.longitude AS collision_longitude,
    c.on_street_name AS collision_on_street_name,
    c.cross_street_name AS collision_cross_street_name,
    c.off_street_name AS collision_off_street_name,
    c.number_of_persons_injured AS collision_number_of_persons_injured,
    c.number_of_persons_killed AS collision_number_of_persons_killed,
    c.number_of_pedestrians_injured AS collision_number_of_pedestrians_injured,
    c.number_of_pedestrians_killed AS collision_number_of_pedestrians_killed,
    c.number_of_cyclist_injured AS collision_number_of_cyclist_injured,
    c.number_of_cyclist_killed AS collision_number_of_cyclist_killed,
    c.number_of_motorist_injured AS collision_number_of_motorist_injured,
    c.number_of_motorist_killed AS collision_number_of_motorist_killed,
    c.contributing_factor_vehicle_1 AS collision_contributing_factor_vehicle_1,
    c.contributing_factor_vehicle_2 AS collision_contributing_factor_vehicle_2,
    c.contributing_factor_vehicle_3 AS collision_contributing_factor_vehicle_3,
    c.contributing_factor_vehicle_4 AS collision_contributing_factor_vehicle_4,
    c.contributing_factor_vehicle_5 AS collision_contributing_factor_vehicle_5,
    c.collision_id AS collision_collision_id,
    c.vehicle_type_code_1 AS collision_vehicle_type_code_1,
    c.vehicle_type_code_2 AS collision_vehicle_type_code_2,
    c.vehicle_type_code_3 AS collision_vehicle_type_code_3,
    c.vehicle_type_code_4 AS collision_vehicle_type_code_4,
    c.vehicle_type_code_5 AS collision_vehicle_type_code_5
  FROM route_with_labels t
  LEFT JOIN {catalog_name}.{schema_name}.collision_silver c
    ON ROUND(t.trip_longitude,2) = ROUND(c.longitude,2)
    AND ROUND(t.trip_latitude,2) = ROUND(c.latitude,2)
    AND c.crash_time BETWEEN CAST(t.tpep_pickup_datetime AS DATE) AND CAST(t.tpep_dropoff_datetime AS DATE)
),
trip_with_cf AS (
  SELECT
    t.*,
    CASE
      WHEN collision_contributing_factor_vehicle_1 IS NOT NULL AND collision_contributing_factor_vehicle_1 <> 'Unspecified'
      THEN collision_contributing_factor_vehicle_1
      WHEN collision_contributing_factor_vehicle_2 IS NOT NULL AND collision_contributing_factor_vehicle_2 <> 'Unspecified'
      THEN collision_contributing_factor_vehicle_2
      WHEN collision_contributing_factor_vehicle_3 IS NOT NULL AND collision_contributing_factor_vehicle_3 <> 'Unspecified'
      THEN collision_contributing_factor_vehicle_3
      WHEN collision_contributing_factor_vehicle_4 IS NOT NULL AND collision_contributing_factor_vehicle_4 <> 'Unspecified'
      THEN collision_contributing_factor_vehicle_4
      ELSE collision_contributing_factor_vehicle_5
    END AS contributing_factor
  FROM trip_with_collision t
),
trip_with_weather AS (
  SELECT
    t.*,
    w.temperature_2m AS weather_temperature_2m,
    w.precipitation AS weather_precipitation,
    w.rain AS weather_rain,
    w.snowfall AS weather_snowfall,
    w.snow_depth AS weather_snow_depth,
    w.weather_code AS weather_weather_code,
    w.wind_speed_10m AS weather_wind_speed_10m,
    w.wind_direction_10m AS weather_wind_direction_10m,
    w.wind_gusts_10m AS weather_wind_gusts_10m
  FROM trip_with_cf t
  LEFT JOIN {catalog_name}.{schema_name}.weather_history_bronze w
    ON ROUND(t.trip_longitude,2) = ROUND(w.longitude,2)
    AND ROUND(t.trip_latitude,2) = ROUND(w.latitude,2)
    AND CAST(w.datetime AS DATE) BETWEEN CAST(t.trip_start_datetime AS DATE) AND CAST(t.trip_end_datetime AS DATE)
),
trip_with_volume AS (
  SELECT
    t.*,
    v.vol AS traffic_volume
  FROM trip_with_weather t
  LEFT JOIN {catalog_name}.{schema_name}.traffic_volume_counts_silver v
    ON ROUND(t.trip_longitude,2) = ROUND(v.long,2)
    AND ROUND(t.trip_latitude,2) = ROUND(v.lat,2)
    AND v.timestamp BETWEEN CAST(t.trip_start_datetime AS DATE) AND CAST(t.trip_end_datetime AS DATE)
),
trip_with_road_conditions AS (
  SELECT
    t.*,
    r.severity AS road_severity,
    r.roadway_name AS road_roadway_name,
    r.direction_of_travel AS road_direction_of_travel,
    r.description AS road_description,
    r.lanes_affected AS road_lanes_affected,
    r.lanes_status AS road_lanes_status,
    r.navteq_link_id AS road_navteq_link_id,
    r.primary_location AS road_primary_location,
    r.secondary_location AS road_secondary_location,
    r.first_article_city AS road_first_article_city,
    r.second_city AS road_second_city,
    r.event_type AS road_event_type,
    r.event_sub_type AS road_event_sub_type,
    r.reported_date AS road_reported_date,
    r.start_date AS road_start_date,
    r.planned_end_date AS road_planned_end_date
  FROM trip_with_volume t
  LEFT JOIN {catalog_name}.{schema_name}.road_condition_bronze r
    ON ROUND(t.trip_longitude,2) = ROUND(r.longitude,2)
    AND ROUND(t.trip_latitude,2) = ROUND(r.latitude,2)
    AND t.trip_start_datetime BETWEEN r.start_date AND r.planned_end_date
    AND t.trip_end_datetime BETWEEN r.start_date AND r.planned_end_date
),
trip_with_telematics_tags AS (
  SELECT
    t.*,
    CAST(tag.abs AS INT) AS telematics_abs,
    CAST(tag.accelerometer AS INT) AS telematics_accelerometer,
    CAST(tag.blind_spot AS INT) AS telematics_blind_spot,
    CAST(tag.brake_pad AS INT) AS telematics_brake_pad,
    CAST(tag.door_ajar AS INT) AS telematics_door_ajar,
    CAST(tag.engine_light AS INT) AS telematics_engine_light,
    CAST(tag.fcw AS INT) AS telematics_fcw,
    CAST(tag.fog_light AS INT) AS telematics_fog_light,
    CAST(tag.lane_departure AS INT) AS telematics_lane_departure,
    CAST(tag.parking_sensor AS INT) AS telematics_parking_sensor,
    CAST(tag.rain_sensor AS INT) AS telematics_rain_sensor,
    CAST(tag.rear_view_camera AS INT) AS telematics_rear_view_camera,
    CAST(tag.seatbelt_off AS INT) AS telematics_seatbelt_off,
    CAST(tag.tpms AS INT) AS telematics_tpms,
    CAST(tag.wiper_baldes AS INT) AS telematics_wiper_blades
  FROM trip_with_road_conditions t
  LEFT JOIN {catalog_name}.{schema_name}.contributing_factor_tags_lkp_gold tag
    ON t.contributing_factor = tag.contributing_factor
),
final_with_vehicle_id AS (
  SELECT
    *,
    monotonically_increasing_id() AS vehicle_id
  FROM trip_with_telematics_tags
)
SELECT
  *,
  collision_collision_id IS NOT NULL AS is_collision
FROM final_with_vehicle_id;
"""

logger.info(sql_query)

spark.sql(sql_query)
