# Bronze Layer Ingestion

This notebook ingests raw data from two sources and lands it in Bronze Delta tables.

- **1. Real-time Vehicle Updates**: A continuous stream from Azue Event Hubs.
- **2. Static GTFS Data**: A daily batch ZIP file from ADLS Gen2.

In [0]:
# Imports
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, MapType
import io
import zipfile
from pyspark.sql.functions import col, length, decode
from pyspark.sql.functions import col, from_json, explode, from_unixtime
from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType, ArrayType, TimestampType


In [0]:
# Event body schema
time_schema = StructType([
    StructField("delay", LongType(), True),
    StructField("time", LongType(), True)
])

stop_time_update_schema = StructType([
    StructField("stopId", StringType(), True),
    StructField("arrival", time_schema, True),
    StructField("departure", time_schema, True),
    StructField("stopSequence", LongType(), True)
])

event_body_schema = StructType([
    StructField("id", StringType(), True),
    StructField("tripUpdate", StructType([
        StructField("trip", StructType([
            StructField("tripId", StringType(), True),
            StructField("startDate", StringType(), True)
        ]), True),
        StructField("stopTimeUpdate", ArrayType(stop_time_update_schema), True),
        StructField("vehicle", StructType([
             StructField("id", StringType(), True)
        ]), True),
        StructField("timestamp", LongType(), True)
    ]), True),
    StructField("vehicle", StructType([
        StructField("vehicle", StructType([
             StructField("id", StringType(), True)
        ]), True),
        StructField("trip", StructType([
            StructField("tripId", StringType(), True)
        ]), True),
        StructField("position", StructType([
            StructField("latitude", DoubleType(), True),
            StructField("longitude", DoubleType(), True)
        ]), True),
        StructField("timestamp", LongType(), True)
    ]), True)
])

In [0]:
# Define widgets to accept parameters via DB job
dbutils.widgets.text("secret_scope_name", "delijn-secrets", "Secret Scope Name")
dbutils.widgets.text("eh_conn_str_key", "eh-conn-str", "Event Hub Connection String Key")

# Define file paths
SILVER_PATH = "/mnt/silver/realtime_trips"
BRONZE_REALTIME_CHECKPOINT = "/mnt/bronze/checkpoints/realtime_trips"
DATABASE_NAME = "delijn_bronze"
REALTIME_TABLE_NAME = f"{DATABASE_NAME}.realtime_vehicle_updates"
GTFS_TABLE_NAME = f"{DATABASE_NAME}.gtfs_static_files"


## 1. Real-Time Data Ingestion (From Event Hubs)

In [0]:
# Load RT stream
scope = dbutils.widgets.get("secret_scope_name")
key = dbutils.widgets.get("eh_conn_str_key")
EH_CONN_STR = dbutils.secrets.get(scope=scope, key=key)

eh_conf = {
  'eventhubs.connectionString': sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(EH_CONN_STR),
  'eventhubs.consumerGroup': '$Default',
  'eventhubs.eventPosition': 'earliestEventPosition'
}

df_stream_raw = (
  spark.readStream
    .format("eventhubs")
    .options(**eh_conf)
    .load()
)

In [0]:
# Transform RT data
parsed_stream = (
    df_stream_raw
    .withColumn("body", col("body").cast(StringType()))
    .withColumn("event", from_json(col("body"), event_body_schema))
    .select(
        col("event.id").alias("update_id"),
        col("event.tripUpdate.trip.tripId").alias("trip_id"),
        col("event.tripUpdate.trip.startDate").alias("trip_start_date"),
        col("event.tripUpdate.vehicle.id").alias("vehicle_id"),
        col("event.vehicle.position.latitude").alias("latitude"),
        col("event.vehicle.position.longitude").alias("longitude"),
        from_unixtime(col("event.tripUpdate.timestamp")).cast(TimestampType()).alias("event_timestamp"),
        explode("event.tripUpdate.stopTimeUpdate").alias("stop_update") # Explode the array
    )
)

final_stream = parsed_stream.select(
    "update_id",
    "trip_id",
    "trip_start_date",
    "vehicle_id",
    "latitude",
    "longitude",
    "event_timestamp",
    col("stop_update.stopId").alias("stop_id"),
    col("stop_update.stopSequence").alias("stop_sequence"),
    col("stop_update.departure.delay").alias("departure_delay_seconds"),
    from_unixtime(col("stop_update.departure.time")).cast(TimestampType()).alias("departure_time")
)

In [0]:
# Write stream to silver layer
(
    final_stream.writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", BRONZE_REALTIME_CHECKPOINT)
    .trigger(availableNow=True) # process all available data in a micro-batch; more efficient for scheduled jobs
    .start(SILVER_PATH)
)

## 2. Static GTFS Batch Data Ingestion (From ADLS Gen2)