In [0]:
%run ../config/load_config

In [0]:
%run ../common/transformations

In [0]:
%run ../common/data_quality

In [0]:
from pyspark.sql.functions import *

target_table = "arrivals_sv"
silver_table_path = get_storage_path("silver", target_table)

# Ensure silver table
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {catalog}.{schema_silver}.{target_table}(
    arrival_id STRING,
    operation_type BIGINT,
    vehicle_id STRING,
    naptan_id STRING,
    station_name STRING,
    line_id STRING,
    platform_name STRING,
    direction STRING,
    bearing BIGINT,
    trip_id BIGINT,
    base_version BIGINT,
    destination_naptan_id STRING,
    destination_name STRING,
    event_timestamp TIMESTAMP,
    time_to_station BIGINT,
    current_location STRING,
    towards STRING,
    expected_arrival TIMESTAMP,
    time_to_live TIMESTAMP,
    ingestion_timestamp TIMESTAMP,
    dq_flag STRING,
    transformation_timestamp TIMESTAMP
)
LOCATION '{silver_table_path}'
""")

# Transform and load silver table
source_table = get_table_name(schema_bronze, "arrivals_bz")
silver_checkpoint = get_storage_path("checkpoints", "silver")

df_transformed = (
    spark.readStream
        .option("ignoreDeletes", True)
        .table(source_table)
        .select(
            col("id").alias("arrival_id"),
            col("operationType").alias("operation_type"),
            col("vehicleId").alias("vehicle_id"),
            col("naptanId").alias("naptan_id"),
            col("stationName").alias("station_name"),
            col("lineId").alias("line_id"),
            col("platformName").alias("platform_name"),
            col("direction"),
            col("bearing"),
            col("tripId").alias("trip_id"),
            col("baseVersion").alias("base_version"),
            col("destinationNaptanId").alias("destination_naptan_id"),
            col("destinationName").alias("destination_name"),
            col("timestamp").cast("timestamp").alias("event_timestamp"),
            col("timeToStation").alias("time_to_station"),
            col("currentLocation").alias("current_location"),
            col("towards"),
            col("expectedArrival").cast("timestamp").alias("expected_arrival"),
            col("timeToLive").cast("timestamp").alias("time_to_live"),
            col("_ingest_time").alias("ingestion_timestamp")
        )
)

# Clean data
df_cleaned = trim_strings(df_transformed)
df_deduped = (
    df_cleaned
    .withWatermark("event_timestamp", "30 seconds")
    .dropDuplicates(["arrival_id", "event_timestamp"])
)

df_quality = add_quality_flag(df_deduped, not_null_columns=["arrival_id","naptan_id","line_id","event_timestamp"])
df_silver = add_transformation_metadata(df_quality)

# Write data to silver table
query = (
    df_silver.writeStream
    .outputMode("append")
    .option("checkpointLocation", f"{silver_checkpoint}/{target_table}")
    .trigger(availableNow=True)
    .toTable(f"{catalog}.{schema_silver}.{target_table}")
)