In [0]:
silver_path = "abfss://silver@livbusdatastore.dfs.core.windows.net/"

ServicePrincipalId = dbutils.secrets.get(scope="livbodsbus-keyvault",key="dbx-client-ID")
ServicePrincipalKey = dbutils.secrets.get(scope="livbodsbus-keyvault",key="dbx-secret")
TenantId = dbutils.secrets.get(scope="livbodsbus-keyvault",key="dbx-tenant-ID")


# Configure access to ADLS Gen2
spark.conf.set("fs.azure.account.auth.type.livbusdatastore.dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type.livbusdatastore.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.livbusdatastore.dfs.core.windows.net", ServicePrincipalId)
spark.conf.set("fs.azure.account.oauth2.client.secret.livbusdatastore.dfs.core.windows.net", ServicePrincipalKey)
spark.conf.set("fs.azure.account.oauth2.client.endpoint.livbusdatastore.dfs.core.windows.net", f"https://login.microsoftonline.com/{TenantId}/oauth2/token")


**Perform initial batch loading of historical data.**

**Enrich the `silver.bus_activity` table by adding additional identifier columns to prepare for building the `gold.bus_activity` table, which will be used for analysis.**



In [0]:
from pyspark.sql.functions import col, lag, when, unix_timestamp, to_date,sum
from pyspark.sql.window import Window

# Load the base data
df = spark.read.format("delta").table("silver.bus_activity")

# 1. Define bounding box for Liverpool
liverpool_min_lat, liverpool_max_lat = 53.33, 53.50
liverpool_min_lon, liverpool_max_lon = -3.00, -2.86

# 2. Add 'in_liverpool' column
df = df.withColumn(
    "in_liverpool",
    when(
        (col("latitude").between(liverpool_min_lat, liverpool_max_lat)) &
        (col("longitude").between(liverpool_min_lon, liverpool_max_lon)),
        True
    ).otherwise(False)
)

# 3. Define window by vehicle and time
vehicle_window = Window.partitionBy("vehicle_ref").orderBy("ingestion_timestamp")

# 4. Lag previous values
df = df.withColumn("prev_latitude", lag("latitude").over(vehicle_window)) \
       .withColumn("prev_longitude", lag("longitude").over(vehicle_window)) \
       .withColumn("prev_in_liverpool", lag("in_liverpool").over(vehicle_window)) \
       .withColumn("prev_recorded_time", lag("recorded_at_time").over(vehicle_window))

# 5. Calculate duration in minutes
df = df.withColumn(
    "dur_min_since_last_recorded",
    (unix_timestamp("recorded_at_time") - unix_timestamp("prev_recorded_time")) / 60
)

# 6. Detect idle state
df = df.withColumn(
    "possibly_idle",
    when(
        ((col("latitude") == col("prev_latitude")) & 
         (col("longitude") == col("prev_longitude"))) |
         (col("recorded_at_time") == col("prev_recorded_time")),
        True
    ).otherwise(False)
)

# 7. Label Liverpool movement
df = df.withColumn(
    "liverpool_movement_status",
    when((col("prev_in_liverpool").isNull()) & (col("in_liverpool") == True), "Only_In_Liverpool")
    .when(col("prev_in_liverpool") != col("in_liverpool"), "Moved_In_Or_Out")
    .otherwise("Other")
)

# 8. Infer routes that visit Liverpool
liverpool_routes = df.filter(col("in_liverpool") == True) \
                     .select("line_ref", "operator_ref") \
                     .distinct()

# 9. Join back to get data for vehicles running on Liverpool-related routes
df_liverpool_routes = df.join(liverpool_routes, on=["line_ref", "operator_ref"], how="inner")


# Filter bus lines inside/outside Liverpool with the rules in config container 
bus_rules_path = "abfss://config@livbusdatastore.dfs.core.windows.net/bus_line_rules.json"
line_rules = spark.read.option("multiline", "true").json(bus_rules_path).collect()[0]

lines_to_remove = line_rules ["lines_to_exclude_completely"]

df_final = df_liverpool_routes.filter(~col("line_ref").isin(lines_to_remove))


# Apply special latitude filters for some lines that pop up in Wirral and Prescot
for rule in line_rules["special_filters"]:
    lines = rule["lines"]
    lat_condition = rule["latitude_condition"]
    lat_value = rule["latitude_value"]
    
    if lat_condition == ">":
        df_final = df_final.filter(
            ~(
                (col("line_ref").isin(lines)) & (col("latitude") > lat_value)
            )
        )
    elif lat_condition == "<":
        df_final = df_final.filter(
            ~(
                (col("line_ref").isin(lines)) & (col("latitude") < lat_value)
            )
        )

# Add date column
df_bus_activity =df_final.withColumn("ingestion_date", to_date(col("ingestion_timestamp")))

In [0]:
bus_activity_path = "abfss://gold@livbusdatastore.dfs.core.windows.net/bus_activity"

df_bus_activity.write.format("delta") \
    .mode("overwrite") \
    .partitionBy("ingestion_date") \
    .save(bus_activity_path)


In [0]:
# Register access to the bus_activity table
spark.sql("CREATE SCHEMA IF NOT EXISTS gold")

spark.sql(f"""
  CREATE TABLE IF NOT EXISTS gold.bus_activity
  USING DELTA
  LOCATION "abfss://gold@livbusdatastore.dfs.core.windows.net/bus_activity"
""")

DataFrame[]


**Perform initial batch loading for counting the daily trips by each vehicle.**

**Create delta table to monitor trip activity from `gold.busactivity`**

In [0]:
from pyspark.sql.functions import col, lag, when, unix_timestamp, sum
from pyspark.sql.window import Window

# Step 1: Read the base table
df = spark.read.format("delta").table("gold.bus_activity")

# Step 2: Define a window
trip_window = Window.partitionBy("vehicle_ref", "line_ref", "ingestion_date").orderBy("ingestion_timestamp")

# Step 3: Lag previous direction
df = df.withColumn("prev_direction_ref", lag("direction_ref").over(trip_window))

# Step 4: Detect direction changes
df = df.withColumn(
    "trip_start",
    (col("direction_ref") != col("prev_direction_ref")).cast("int")  # 1 if direction changed, else 0
)

# Step 5: Sum the trip_start flags per bus per line per day
df_trip_counts = df.groupBy("vehicle_ref", "line_ref", "ingestion_date", "operator_ref") \
    .agg(
        sum("trip_start").alias("trip_count")
    )



In [0]:
bus_trips_path = "abfss://gold@livbusdatastore.dfs.core.windows.net/bus_trips_counts"

df_trip_counts.write.format("delta") \
    .mode("overwrite") \
    .partitionBy("ingestion_date") \
    .save(bus_trips_path)

In [0]:
# Register access to the bus_activity table
spark.sql("CREATE SCHEMA IF NOT EXISTS gold")

spark.sql(f"""
  CREATE TABLE IF NOT EXISTS gold.bus_trip_counts
  USING DELTA
  LOCATION "abfss://gold@livbusdatastore.dfs.core.windows.net/bus_trips_counts"
""")

DataFrame[]