### Define Simulation Targets

In [0]:
dbutils.widgets.text("volume_path", "/Volumes/gshen_catalog/yvr_airport/bag_files/", "Volume Path")
dbutils.widgets.text("passenger_table", "gshen_catalog.yvr_airport.passenger_details_yvr", "Passenger Table")
dbutils.widgets.text("flight_table", "gshen_catalog.yvr_airport.flight_details_yvr", "Flight Table")
dbutils.widgets.text("gate_table", "gshen_catalog.yvr_airport.gate_assignments_yvr", "Gate Table")

### Clear Everything 

In [0]:
# dbutils.fs.rm("/Volumes/gshen_catalog/yvr_airport/bag_files", recurse=True)

In [0]:
%sql
-- DROP TABLE IF EXISTS gshen_catalog.yvr_airport.passenger_details_yvr;
-- DROP TABLE IF EXISTS gshen_catalog.yvr_airport.baggage_data_stream;
-- DROP TABLE IF EXISTS gshen_catalog.yvr_airport.flight_details_yvr;
-- DROP TABLE IF EXISTS gshen_catalog.yvr_airport.gate_assignments_yvr;

In [0]:
# Databricks notebook to simulate airport baggage events
# Save as CSV files to a Unity Catalog volume

from pyspark.sql.functions import current_timestamp
import uuid
import time
from datetime import datetime, timedelta
import pandas as pd
import os
import random

class Simulation:
    def __init__(self, spark, volume_path, passenger_table, flight_table, gate_table, events_per_batch=20, interval_sec=10, num_batches=5):
        self.spark = spark
        self.volume_path = volume_path
        self.passenger_table = passenger_table
        self.flight_table = flight_table
        self.gate_table = gate_table
        self.events_per_batch = events_per_batch
        self.interval_sec = interval_sec
        self.num_batches = num_batches
        self.airport = "YVR"
        self.airlines = [
            {"name": "Air Canada", "code": "AC"},
            {"name": "WestJet", "code": "WS"},
            {"name": "Flair Airlines", "code": "F8"},
            {"name": "KLM Royal Dutch Airlines", "code": "KL"},
            {"name": "United Airlines", "code": "UA"},
            {"name": "Delta Air Lines", "code": "DL"},
            {"name": "American Airlines", "code": "AA"},
            {"name": "Lynx Air", "code": "Y9"},
            {"name": "Porter Airlines", "code": "PD"},
            {"name": "Sunwing Airlines", "code": "WG"},
            {"name": "Condor Airlines", "code": "DE"},
            {"name": "Icelandair", "code": "FI"},
            {"name": "Swoop", "code": "WO"},
            {"name": "Air North", "code": "4N"}
        ]
        self.origins = [
            "Toronto", "Vancouver", "Calgary", "Ottawa", "Montreal", "Halifax",
            "Winnipeg", "Kelowna", "Victoria", "Regina", "Saskatoon", "Fort McMurray",
            "Yellowknife", "Whitehorse", "Las Vegas", "Phoenix", "Los Angeles",
            "Cancun", "Amsterdam", "Frankfurt", "Reykjavik", "Chicago", "Denver",
            "Minneapolis", "San Francisco", "Seattle"
        ]
        self.gates = [
            "A1", "A2", "A3", "A4", "A5",
            "B1", "B2", "B3", "B4",
            "C1", "C2", "C3",
            "D1", "D2", "E1", "E2", "F1"
        ]
        self.status_sequence = [
            "checked_in", "security_screened", "loaded_on_plane",
            "in_transit", "unloaded", "claimed"
        ]
        self.baggage_journeys = {}
        self.passenger_details = {}
        self.flight_actual_arrival = {}
        self.flight_passengers = {}

    def generate_passenger(self, passenger_id, flight_id, airline_name, origin):
        return {
            "passenger_id": passenger_id,
            "name": f"Passenger_{str(passenger_id)[:8]}",
            "flight_id": flight_id,
            "airline": airline_name,
            "origin": origin,
            "seat_number": f"{random.randint(1, 30)}{random.choice(['A','B','C','D','E','F'])}"
        }

    def generate_flight_data(self):
        flights = []
        gates = []
        for i, airline in enumerate(self.airlines):
            flight_id = f"{airline['code']}{100 + i}"
            origin = self.origins[i % len(self.origins)]
            scheduled_departure = datetime.utcnow() + timedelta(hours=random.randint(1, 12))
            scheduled_arrival = scheduled_departure + timedelta(hours=random.randint(2, 8))
            delay_minutes = random.choice([0, 0, 15, 30, 45])
            actual_arrival = scheduled_arrival + timedelta(minutes=delay_minutes)
            flight_status = "delayed" if delay_minutes > 0 else "on_time"

            flights.append({
                "flight_id": flight_id,
                "airline": airline['name'],
                "origin": origin,
                "scheduled_departure": scheduled_departure.isoformat(),
                "scheduled_arrival": scheduled_arrival.isoformat(),
                "actual_arrival": actual_arrival.isoformat(),
                "status": flight_status
            })
            self.flight_actual_arrival[flight_id] = actual_arrival

            gates.append({
                "flight_id": flight_id,
                "gate_number": self.gates[i % len(self.gates)],
                "gate_open_time": (scheduled_departure - timedelta(minutes=45)).isoformat(),
                "gate_close_time": (scheduled_departure - timedelta(minutes=5)).isoformat()
            })

            # Generate 30–100 passengers per flight
            num_passengers = random.randint(30, 150)
            passengers = []
            for _ in range(num_passengers):
                passenger_id = str(uuid.uuid4())
                passenger = self.generate_passenger(passenger_id, flight_id, airline['name'], origin)
                self.passenger_details[passenger_id] = passenger
                passengers.append(passenger)
            self.flight_passengers[flight_id] = passengers

        return pd.DataFrame(flights), pd.DataFrame(gates)

    def generate_baggage_events(self, batch_num):
        events = []
        new_passengers = []
        timestamp = datetime.utcnow()

        # Progress existing bags in their journey
        for bag_id in list(self.baggage_journeys.keys()):
            journey = self.baggage_journeys[bag_id]
            if journey['status_index'] < len(self.status_sequence):
                new_status = self.status_sequence[journey['status_index']]
                event_time = timestamp + timedelta(seconds=batch_num * self.interval_sec)
                event = {
                    "bag_id": bag_id,
                    "passenger_id": journey['passenger_id'],
                    "flight_id": journey['flight_id'],
                    "airport": self.airport,
                    "airline": journey['airline'],
                    "origin": journey['origin'],
                    "status": new_status,
                    "weight_kg": journey['weight_kg'],
                    "event_time": event_time.isoformat()
                }
                if new_status == "claimed":
                    actual_arrival = self.flight_actual_arrival.get(journey['flight_id'])
                    if actual_arrival:
                        event["actual_arrival"] = actual_arrival.isoformat()
                events.append(event)
                journey['status_index'] += 1

        # Add new bags
        available_flights = list(self.flight_passengers.keys())
        random.shuffle(available_flights)

        for i in range(self.events_per_batch - len(events)):
            flight_id = available_flights[i % len(available_flights)]
            passengers = self.flight_passengers[flight_id]
            unassigned = [p for p in passengers if p['passenger_id'] not in [j['passenger_id'] for j in self.baggage_journeys.values()]]

            if not unassigned:
                continue  # All passengers on this flight already have bags

            passenger = random.choice(unassigned)
            passenger_id = passenger['passenger_id']
            airline = passenger['airline']
            origin = passenger['origin']
            weight_kg = round(random.uniform(15.0, 32.0), 2)
            bag_id = str(uuid.uuid4())

            event = {
                "bag_id": bag_id,
                "passenger_id": passenger_id,
                "flight_id": flight_id,
                "airport": self.airport,
                "airline": airline,
                "origin": origin,
                "status": self.status_sequence[0],
                "weight_kg": weight_kg,
                "event_time": (timestamp + timedelta(seconds=batch_num * self.interval_sec)).isoformat()
            }
            events.append(event)

            self.baggage_journeys[bag_id] = {
                "passenger_id": passenger_id,
                "flight_id": flight_id,
                "airline": airline,
                "origin": origin,
                "weight_kg": weight_kg,
                "status_index": 1
            }
            new_passengers.append(passenger)

        return pd.DataFrame(events), pd.DataFrame(new_passengers)

    def run(self):
        # Write flight and gate tables
        df_flights, df_gates = self.generate_flight_data()
        spark_flight_df = self.spark.createDataFrame(df_flights)
        spark_flight_df.write.mode("overwrite").format("delta").saveAsTable(self.flight_table)
        print(f"Created flight data in Delta table: {self.flight_table}")

        spark_gate_df = self.spark.createDataFrame(df_gates)
        spark_gate_df.write.mode("overwrite").format("delta").saveAsTable(self.gate_table)
        print(f"Created gate assignments in Delta table: {self.gate_table}")

        for batch in range(self.num_batches):
            df_events, df_passengers = self.generate_baggage_events(batch)
            timestamp_str = datetime.utcnow().strftime("%Y%m%d_%H%M%S")

            file_name = f"baggage_events_{timestamp_str}_batch{batch}.csv"
            output_path = os.path.join(self.volume_path, file_name)
            spark_df = self.spark.createDataFrame(df_events)
            spark_df.coalesce(1).write.mode("overwrite").option("header", True).csv(output_path)
            print(f"Wrote baggage batch {batch+1} to {output_path}")

            if not df_passengers.empty:
                self.write_passenger_data(df_passengers)

            time.sleep(self.interval_sec)

        print("Baggage and passenger simulation complete.")

    def write_passenger_data(self, df_passengers):
        spark_passenger_df = self.spark.createDataFrame(df_passengers)
        spark_passenger_df.write.mode("append").format("delta").saveAsTable(self.passenger_table)
        print(f"Appended passenger data to Delta table: {self.passenger_table}")


# Instantiate and run the simulation
simulation = Simulation(
    spark=spark,
    volume_path=dbutils.widgets.get("volume_path"),
    passenger_table=dbutils.widgets.get("passenger_table"),
    flight_table=dbutils.widgets.get("flight_table"),
    gate_table=dbutils.widgets.get("gate_table"),
    events_per_batch=150,
    interval_sec=10,
    num_batches=10
)
simulation.run()