In [0]:
%sql
DROP TABLE silver_trip_data

In [0]:
%sql
--first part of silver transformation (CTE inside A CTAS, distance and minutes calculations) --> silver_trip_data
CREATE TABLE silver_trip_data
USING DELTA
AS
WITH T1 AS (
    SELECT
        ride_id,
        rideable_type,
        member_casual,
        start_lat,
        start_lng,
        end_lat,
        end_lng,
        CAST(started_at AS TIMESTAMP) AS trip_start_ts,
        CAST(ended_at AS TIMESTAMP) AS trip_end_ts,
        COALESCE(start_station_name, 'N/A') AS start_station_name,
        COALESCE(start_station_id, 'N/A') AS start_station_id,
        COALESCE(end_station_name, 'N/A') AS end_station_name,
        COALESCE(end_station_id, 'N/A') AS end_station_id
    FROM
        divvy.default.bronze_trip_data
    WHERE 
        _rescued_data IS NULL
        AND started_at IS NOT NULL
        AND ended_at IS NOT NULL
),

Calculated AS (
    SELECT
        *, 
        CASE
            WHEN trip_end_ts > trip_start_ts
            THEN ROUND(TIMESTAMPDIFF(SECOND, trip_start_ts, trip_end_ts) / 60.0, 2)
            ELSE NULL
        END AS Trip_Duration_Min,
        ROUND(
            6371 * 2 * ASIN(
                SQRT(
                    POW(SIN(RADIANS(end_lat - start_lat) / 2), 2) +
                    COS(RADIANS(start_lat)) * COS(RADIANS(end_lat)) *
                    POW(SIN(RADIANS(end_lng - start_lng) / 2), 2)
                )
            ),
            2
        ) AS Trip_Distance_Km,
        CAST(trip_start_ts AS DATE) AS Full_Date
    FROM
        T1
)

SELECT
    T2.trip_start_ts,
    T2.trip_end_ts,
    T2.start_station_name,
    T2.ride_id,
    T2.rideable_type,
    T2.member_casual,
    T2.Full_Date,
    T2.start_station_id,
    T2.end_station_id,
    T2.Trip_Duration_Min,
    T2.Trip_Distance_Km,
    T2.start_lat,
    T2.start_lng,
    T2.end_lat,
    T2.end_lng
FROM
    Calculated AS T2
WHERE
    T2.Trip_Distance_Km > 0.05
    AND T2.Trip_Duration_Min IS NOT NULL;

In [0]:
%sql
-- silver_trip_data_pk
ALTER TABLE silver_trip_data
ALTER COLUMN ride_id SET NOT NULL;

ALTER TABLE silver_trip_data
ADD CONSTRAINT silver_trip_data_pk PRIMARY KEY (ride_id);

In [0]:
%sql
-- Silver Weather Data Transformation

CREATE OR REPLACE TABLE silver_weather_data
USING DELTA
AS
SELECT
    to_timestamp(
        concat(
            CAST(YEAR AS STRING), '-',
            lpad(CAST(MO AS STRING), 2, '0'), '-',
            lpad(CAST(DY AS STRING), 2, '0'), ' ',
            lpad(CAST(HR AS STRING), 2, '0'), ':00:00' 
        ),
        'yyyy-MM-dd HH:mm:ss'
    ) AS weather_ts,
    
    concat(YEAR, lpad(MO, 2, '0'), lpad(DY, 2, '0'), lpad(HR, 2, '0')) AS silver_weather_data_pk,

    CAST(TEMP AS DOUBLE) AS temp_celsius,
    CAST(PRCP AS DOUBLE) AS prcp_mm,
    CAST(HMDT AS DOUBLE) AS hmdt_percent,
    CAST(WND_SPD AS DOUBLE) AS wnd_spd_kph,
    CAST(ATM_PRESS AS DOUBLE) AS atm_press_hpa,
    CAST(
        concat(
            CAST(YEAR AS STRING),
            lpad(CAST(MO AS STRING), 2, '0'),
            lpad(CAST(DY AS STRING), 2, '0')
        ) AS INT
    ) AS Date_Key
    
FROM
    divvy.default.bronze_weather_data
    
WHERE
    _rescued_data IS NULL   
    AND YEAR IS NOT NULL           
    AND TEMP IS NOT NULL

In [0]:
%sql
-- silver_weather_data_pk

ALTER TABLE silver_weather_data
ALTER COLUMN silver_weather_data_pk SET NOT NULL;

ALTER TABLE silver_weather_data
ADD CONSTRAINT silver_weather_data_pk PRIMARY KEY (silver_weather_data_pk)
NOT ENFORCED;

In [0]:
# GEOLOCALISATION ENRICHMENT SCRIPT 

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, h3_longlatash3, lit, monotonically_increasing_id

# --- Configuration ---
H3_RESOLUTION = 10
TRIP_DATA_SOURCE_TABLE = "bronze_trip_data" 
COMMUNITY_H3_TABLE = "dim_community_h3_index"
TARGET_DIM_TABLE = "gold.Dim_Station_Final" 

# --- 1. Union and Deduplicate All Station Points ---

df_trip = spark.table(TRIP_DATA_SOURCE_TABLE)

# 1a. Select all START stations
df_start_stations = df_trip.select(
    col("start_station_name").alias("Station_Name"),
    col("start_lat").alias("Latitude"),
    col("start_lng").alias("Longitude")
).filter(col("start_lat").isNotNull() & col("start_lng").isNotNull())

# 1b. Select all END stations
df_end_stations = df_trip.select(
    col("end_station_name").alias("Station_Name"),
    col("end_lat").alias("Latitude"),
    col("end_lng").alias("Longitude")
).filter(col("end_lat").isNotNull() & col("end_lng").isNotNull())

# 1c. Combine and deduplicate
df_station_base = df_start_stations.union(df_end_stations).distinct()
df_station_base.cache() # Cache the base for efficiency

print(f"âœ… Extracted {df_station_base.count()} unique station records from trip data.")

# --- 2. Calculate H3 Index and Prepare for Join ---

# Calculate the H3 Index for each station point
# NOTE: h3_longlatash3 requires Longitude, then Latitude!
df_station_indexed = df_station_base.withColumn(
    "h3_index_10",
    h3_longlatash3(col("Longitude"), col("Latitude"), lit(H3_RESOLUTION))
).filter(col("h3_index_10").isNotNull()) 

# Load the H3 cell -> Community Name mapping table
df_community_h3 = spark.table(COMMUNITY_H3_TABLE).select(
    col("h3_index_10").alias("c_h3_index"),
    col("community").alias("neighborhood_name")
)

# --- 3. Perform the H3-Based Spatial Join and Final Select ---

df_station_enriched = df_station_indexed.join(
    df_community_h3,
    df_station_indexed["h3_index_10"] == df_community_h3["c_h3_index"],
    "left"
).withColumn(
    # Generate the surrogate key for the dimension table
    "Station_Key", monotonically_increasing_id()
).select(
    "Station_Key", 
    col("Station_Name"),
    col("Latitude"),
    col("Longitude"),
    col("h3_index_10"),
    col("neighborhood_name"),
    (col("neighborhood_name").isNotNull()).alias("is_in_community_area")
)

print("\nâœ… Station-Community Join Complete. Schema of final dim table:")
df_station_enriched.printSchema()

# --- 4. Write to the Final Delta Table (Gold Layer) ---

(df_station_enriched.write
  .format("delta")
  .mode("overwrite")
  .option("overwriteSchema", "true")
  .saveAsTable(TARGET_DIM_TABLE)
)

print(f"\nðŸš€ Successfully wrote enriched dimension table to: {TARGET_DIM_TABLE}")

# --- 5. Optimization (Mandatory for Spatial Queries) ---

spark.sql(f"OPTIMIZE {TARGET_DIM_TABLE} ZORDER BY (h3_index_10)")

print("\nâœ¨ Z-Ordering complete on h3_index_10 for optimal query performance.")