In [0]:
from pyspark.dbutils import DBUtils

dbutils = DBUtils(spark)

dbutils.widgets.text("catalog", "") # gk_los_angeles
dbutils.widgets.text("schema", "") # default
dbutils.widgets.text("table", "") # events
dbutils.widgets.text("start_ts", "") # 2025-04-21 00:00:00
dbutils.widgets.text("end_ts", "") # 2025-05-31 00:00:00
dbutils.widgets.text("start_orders","") # 100
dbutils.widgets.text("end_orders","") # 200
dbutils.widgets.text("metastore_root","") # only if you need to create the catalog with this notebook (see cell 3)
dbutils.widgets.text("gk_location", "") # 115 Penn St, El Segundo, CA 90245
dbutils.widgets.text("speed_up_factor", "60") # 1 means real time, 60 means 60 simulated minutes in 1 real minute, make this high to get fast flowing data, note that notebook will terminate when `end_ts` is hit (does not run indefinitely)

In [0]:
"""
Ghost-kitchen menu generator
————————————
• Creates four relational datasets: brands, menus, categories, items
• Adds deterministic, “reasonable” USD prices to each item
"""

import pandas as pd
import hashlib
from pathlib import Path

# ---------- 1. SOURCE DATA (renamed real-world menus) ----------
brands_data = [
    ("McDoodles", {
        "Burgers": ["Big Stack", "Quarter Poundish", "Double Quarter Poundish", "Cheese Burgerette"],
        "Chicken & Fish Sandwiches": ["McCrisper", "Spicy McCrisper", "Filet-O-Fishish", "McChicklet"],
        "Nuggets": ["Chick Nuggies 6pc"],
        "Fries & Sides": ["Famous Frites", "Apple Slice Bites"],
        "Breakfast": ["Egg McMuffle", "Bacon Biscuit"],
        "Coffees & Beverages": ["Iced Brew", "Fizzy Cola"]
    }),
    ("ChipoLot", {
        "Burritos": ["Chicken Burrito", "Steak Burrito", "Barbacoa Burrito", "Veggie Burrito"],
        "Bowls": ["Chicken Bowl", "Keto Bowl", "High Protein Bowl"],
        "Tacos": ["Chicken Tacos", "Carnitas Tacos"],
        "Salads": ["Supergreens Salad"],
        "Kids": ["Build-Your-Own Kid Meal"],
        "Sides & Extras": ["Chips & Guac", "Chips & Queso"]
    }),
    ("Shack Stack", {
        "Burgers": ["ShackBurger Single", "ShackBurger Double", "Smoke Stack Single", "Veggie Stack"],
        "Chicken": ["Chicken Shack", "Avocado Bacon Chicken"],
        "Dogs": ["Flat-Top Dog"],
        "Fries": ["Crinkle Fries", "Cheese Fries"],
        "Shakes": ["Vanilla Shake", "Chocolate Shake"],
        "Drinks": ["Shack-made Lemonade"]
    }),
    ("GreenSprout", {
        "Bowls": ["Harvest Bowl", "Crispy Rice Bowl", "Shroomami"],
        "Salads": ["Kale Caesar", "Guac Greens", "Buffalo Chicken Salad"],
        "Protein Plates": ["Garlic Steak Plate", "Hot Honey Chicken Plate"],
        "Kids": ["Little Harvest"],
        "Sides": ["Rosemary Focaccia", "Hummus & Focaccia"],
        "Drinks": ["Jasmine Green Tea"]
    }),
    ("Pando Dash", {
        "Entrees": ["Orange Chickadee", "Kung Pow Chicken", "Beijing Beefy", "Honey Walnut Shrimpish"],
        "Sides": ["Chow Mein Noodles", "Fried Rice"],
        "Appetizers": ["Cream Cheese Rangoon", "Chicken Egg Roll"],
        "Drinks": ["Fountain Soda"],
        "Bundles": ["Bowl Combo"]
    }),
    ("Kava Kitchen", {
        "Bowls": ["Spicy Lamb & Avo Bowl", "Falafel Crunch Bowl", "Chicken + Rice Bowl"],
        "Pitas": ["Steak & Feta Pita", "Crispy Falafel Pita"],
        "Kids": ["Mini Pita Meal"],
        "Drinks": ["Strawberry Citrus Lemonade"],
        "Sides": ["Hot Harissa Chips"],
        "Desserts": ["Salted Choco Cookie"]
    }),
    ("MaruNoodle", {
        "Udon": ["Beef Udon", "Chicken Katsu Udon", "Curry Udon"],
        "Rice Bowls": ["Gyudon Rice", "Yakitori Rice"],
        "Tempura": ["Shrimp Tempura", "Sweet Potato Tempura"],
        "Onigiri": ["Spam Musubi Onigiri"],
        "Add-Ons": ["BK Sauce"]
    }),
    ("Jinya Noodle Bar", {
        "Ramen": ["Tonkotsu Ramen", "Spicy Chicken Ramen", "Vegan Ramen"],
        "Small Plates": ["Pork Gyoza", "Takoyaki"],
        "Rice Bowls": ["Chicken Karaage Rice Bowl"],
        "Toppings": ["Extra Chashu", "Spicy Sauce"],
        "Drinks": ["Green Tea"]
    }),
    ("Fo Ho", {
        "Pho": ["Pho Combo", "Pho Chicken"],
        "Rolls": ["Fried Spring Roll", "Summer Roll"],
        "Rice Plates": ["Grilled Pork Rice Plate"],
        "Vermicelli": ["Lemongrass Chicken Vermicelli"],
        "Drinks": ["Vietnamese Iced Coffee"]
    }),
    ("Yo! Sushii", {
        "Maki": ["California Roll", "Spicy Tuna Roll"],
        "Nigiri": ["Salmon Nigiri", "Ebi Nigiri"],
        "Sashimi": ["Tuna Sashimi"],
        "Street Food": ["Chicken Katsu Curry", "Yakisoba", "Miso Soup"],
        "Desserts": ["Mochi Ice Cream"],
        "Drinks": ["Matcha Soda"]
    }),
    ("Taco Ring", {
        "Tacos": ["Crunchy Taco", "Soft Taco Supreme"],
        "Burritos": ["Beefy 5-Layer Burrito"],
        "Specialties": ["Doritos Locos Taco"],
        "Quesadillas": ["Chicken Quesadilla"],
        "Nachos": ["Nacho Bellgrande"],
        "Sides": ["Cinnamon Twists"],
        "Drinks": ["Baja Blast Soda"]
    }),
    ("Starbrews", {
        "Hot Coffee": ["Caffè Latte", "Pike Place Brew"],
        "Cold Coffee": ["Cold Brew"],
        "Refreshers": ["Strawberry Açaí Refresher"],
        "Frappuccinos": ["Caramel Frappuccino"],
        "Bakery": ["Plain Bagel"],
        "Breakfast Sandwiches": ["Sausage Cheddar Egg Sandwich"],
        "Tea": ["Green Tea"]
    }),
    ("Dominni Pizza", {
        "Specialty Pizzas": ["ExtravaganZZza Pizza", "MeatZZa Pizza", "Pacific Veggie Pizza"],
        "Build Your Own Pizza": ["Custom Pizza"],
        "Pasta": ["Chicken Alfredo Pasta"],
        "Chicken": ["Boneless Chicken"],
        "Sides": ["Bread Twists"],
        "Desserts": ["Chocolate Lava Cake"],
        "Drinks": ["Coke"]
    }),
    ("Five Gals", {
        "Burgers": ["Bacon Cheeseburger", "Little Cheeseburger"],
        "Dogs": ["Kosher Style Hot Dog"],
        "Sandwiches": ["BLT Sandwich"],
        "Fries": ["Cajun Fries"],
        "Milkshakes": ["Vanilla Milkshake"],
        "Drinks": ["Bottled Soda"]
    }),
    ("PokeCraft", {
        "Poke Bowls": ["Signature Hawaiian Bowl", "Spicy Ahi Bowl"],
        "Burritos": ["Chicken Katsu Burrito"],
        "Salads": ["Vegan Salad"],
        "Sides": ["Miso Soup Side"],
        "Drinks": ["Japanese Soda"]
    })
]

# ---------- 2. PRICE BANDS BY CATEGORY KEYWORD ----------
pricing_rules = [
    (["pizza"], (12, 18)),
    (["burger"], (6, 11)),
    (["burrito"], (7, 10)),
    (["bowl"], (9, 12)),
    (["udon", "ramen"], (10, 14)),
    (["salad"], (8, 11)),
    (["pho", "rice plate", "plate"], (9, 13)),
    (["kids"], (4, 6)),
    (["fries"], (2, 4)),
    (["side", "sides"], (2, 5)),
    (["drink", "coffee", "tea", "soda", "lemonade"], (2, 4)),
    (["shake", "frappuccino"], (4, 6)),
    (["dessert", "mochi", "cookie", "cake"], (3, 5)),
    (["sandwich"], (4, 7)),
    (["dog"], (3, 6)),
    (["nugget"], (3, 5)),
    (["taco"], (2, 4)),
    (["quesa", "nacho"], (3, 6)),
    (["roll", "nigiri"], (4, 7)),
    (["sashimi"], (5, 9)),
    (["tempura"], (4, 7)),
    (["onigiri"], (3, 5)),
    (["rice"], (8, 11)),
]

def deterministic_random(item_name: str, low: int, high: int) -> float:
    """Hash item name to a deterministic price within [low, high)."""
    h = hashlib.md5(item_name.encode()).hexdigest()
    cents_range = (high - low) * 100
    return round(low + (int(h[:8], 16) % cents_range) / 100, 2)

def get_price(item_name: str, category_name: str) -> float:
    lower_cat = category_name.lower()
    for keywords, (lo, hi) in pricing_rules:
        if any(k in lower_cat for k in keywords):
            return deterministic_random(item_name, lo, hi)
    return deterministic_random(item_name, 6, 10)  # fallback

# ---------- 3. BUILD TABLE ROWS ----------
brands_rows, menus_rows, categories_rows, items_rows = [], [], [], []

brand_id = menu_id = category_id = item_id = 1
for brand, menu_struct in brands_data:
    brands_rows.append({"id": brand_id, "name": brand})
    menus_rows.append({"id": menu_id, "brand_id": brand_id, "name": f"{brand} Main Menu"})
    for cat, items in menu_struct.items():
        categories_rows.append({"id": category_id, "menu_id": menu_id, "brand_id": brand_id,"name": cat})
        for it in items:
            price = get_price(it, cat)
            items_rows.append({
                "id": item_id,
                "category_id": category_id,
                "menu_id": menu_id,
                "brand_id": brand_id,
                "name": it,
                "price": price
            })
            item_id += 1
        category_id += 1
    brand_id += 1
    menu_id += 1

# ---------- 4. TO DATAFRAMES ----------
brands_df     = pd.DataFrame(brands_rows)
menus_df      = pd.DataFrame(menus_rows)
categories_df = pd.DataFrame(categories_rows)
items_df      = pd.DataFrame(items_rows)


In [0]:
%sql
CREATE CATALOG IF NOT EXISTS ${catalog} MANAGED LOCATION '${metastore_root}'

In [0]:
%sql
USE CATALOG ${catalog};
DROP SCHEMA ${schema} CASCADE;
CREATE SCHEMA ${schema};

CREATE TABLE IF NOT EXISTS ${catalog}.${schema}.brand (
  id   BIGINT NOT NULL,
  name STRING NOT NULL,
  CONSTRAINT brand_pk PRIMARY KEY (id)
);

CREATE TABLE IF NOT EXISTS ${catalog}.${schema}.menu (
  id       BIGINT NOT NULL,
  name     STRING NOT NULL,
  brand_id BIGINT NOT NULL,

  CONSTRAINT menu_pk        PRIMARY KEY (id),
  CONSTRAINT menu_brand_fk  FOREIGN KEY (brand_id)
    REFERENCES ${catalog}.${schema}.brand(id)
);

CREATE TABLE IF NOT EXISTS ${catalog}.${schema}.category (
  id       BIGINT NOT NULL,
  name     STRING NOT NULL,
  menu_id  BIGINT NOT NULL,
  brand_id BIGINT NOT NULL,

  CONSTRAINT category_pk         PRIMARY KEY (id),
  CONSTRAINT category_menu_fk    FOREIGN KEY (menu_id)
      REFERENCES ${catalog}.${schema}.menu(id),
  CONSTRAINT category_brand_fk   FOREIGN KEY (brand_id)
      REFERENCES ${catalog}.${schema}.brand(id)
);

CREATE TABLE IF NOT EXISTS ${catalog}.${schema}.item (
  id          BIGINT NOT NULL,
  name        STRING NOT NULL,
  description STRING,
  price       DOUBLE NOT NULL,
  image_data  STRING,

  brand_id    BIGINT NOT NULL,
  menu_id     BIGINT NOT NULL,
  category_id BIGINT NOT NULL,

  CONSTRAINT item_pk            PRIMARY KEY (id),
  CONSTRAINT item_brand_fk      FOREIGN KEY (brand_id)
      REFERENCES ${catalog}.${schema}.brand(id),
  CONSTRAINT item_menu_fk       FOREIGN KEY (menu_id)
      REFERENCES ${catalog}.${schema}.menu(id),
  CONSTRAINT item_category_fk   FOREIGN KEY (category_id)
      REFERENCES ${catalog}.${schema}.category(id)
);

CREATE TABLE ${catalog}.${schema}.${table} (
  event_id   STRING,
  event_type STRING,
  ts         TIMESTAMP,
  gk_id      STRING,
  order_id   STRING,
  sequence   INT,
  body       STRING,
  day        DATE
)
USING delta
PARTITIONED BY (day)
TBLPROPERTIES(delta.minReaderVersion=2, delta.minWriterVersion=5);

CREATE VOLUME ${catalog}.${schema}.cache

In [0]:
spark.createDataFrame(brands_df).write.mode("overwrite").saveAsTable("brand")
spark.createDataFrame(menus_df).write.mode("overwrite").saveAsTable("menu")
spark.createDataFrame(categories_df).write.mode("overwrite").saveAsTable("category")
spark.createDataFrame(items_df).write.mode("overwrite").saveAsTable("item")

In [0]:
%pip install osmnx==1.7.1 networkx==3.2.1 geopy==2.4.1 shapely==2.0.3

In [0]:
# ╔═══════════════════════════════════════════════════════════════════════════╗
# ║  Ghost-Kitchen Event Simulator → Delta table                              ║
# ╚═══════════════════════════════════════════════════════════════════════════╝
#
#  ‣ Uses osmnx + networkx for routing
#  ‣ Writes one batch for back-fill, then streams live events in order
#
#  First-time dependency install (once per cluster):
#  %pip install osmnx==1.7.1 networkx==3.2.1 geopy==2.4.1 nest_asyncio==1.6.0
# ----------------------------------------------------------------------------

import asyncio
import datetime as dt
import json
import math
import pickle
import random
import time
import uuid
from pathlib import Path
from typing import Dict, List, Tuple

import nest_asyncio
import networkx as nx
import numpy as np
import osmnx as ox
import pandas as pd
from pyspark.sql import SparkSession, functions as F
from shapely.geometry import LineString

nest_asyncio.apply()                    # allow nested event loops
spark = SparkSession.builder.getOrCreate()

# ─────────────────────────────────────────────────────────────────────────────
#  1.  READ WIDGETS / CONSTANTS
# ─────────────────────────────────────────────────────────────────────────────
CATALOG  : str = dbutils.widgets.get("catalog")      # e.g. "main"
SCHEMA   : str = dbutils.widgets.get("schema")       # e.g. "gk_sim"
TABLE    : str = dbutils.widgets.get("table")        # e.g. "events"
TABLE_FQN          = f"{CATALOG}.{SCHEMA}.{TABLE}"

START_TS           = dbutils.widgets.get("start_ts") # "2025-05-12 00:00:00"
END_TS             = dbutils.widgets.get("end_ts")   # "2025-05-14 00:00:00"
START_ORDERS       = int(dbutils.widgets.get("start_orders"))
END_ORDERS         = int(dbutils.widgets.get("end_orders"))

GK_ADDRESS         = dbutils.widgets.get("gk_location")
GK_RADIUS_MI       = 2.0
GK_DRIVER_MPH      = 25.0

CACHE_DIR          = f"/Volumes/{CATALOG}/{SCHEMA}/cache"
RANDOM_SEED        = 42
SPEED_UP           = int(dbutils.widgets.get("speed_up_factor"))                 # 1 sim-minute = 1 real-second
PING_INTERVAL_SEC  = 60
BATCH_ROWS         = 200                # flush threshold
BATCH_SECONDS      = 1.0

random.seed(RANDOM_SEED); np.random.seed(RANDOM_SEED)

# ─────────────────────────────────────────────────────────────────────────────
#  2.  SIMULATION PARAMETERS THAT REMAIN VARIABLE DAY-TO-DAY
# ─────────────────────────────────────────────────────────────────────────────
C = {
    # day-to-day volume noise (±10 % by default)
    "noise": 0.10,

    # service-stage mean/deviation in minutes:
    #   created→started, started→finished, finished→ready, ready→pickup
    "svc": {
        "cs": (2, 1),     # chef start
        "sf": (10, 3),    # cooking
        "fr": (2, 1),     # boxing
        "rp": (6, 2)      # wait for driver
    }
}

# ─────────────────────────────────────────────────────────────────────────────
#  3.  BUILD / LOAD ROAD GRAPH + CUSTOMER NODE CANDIDATES
# ─────────────────────────────────────────────────────────────────────────────
CACHE_PATH = Path(CACHE_DIR); CACHE_PATH.mkdir(parents=True, exist_ok=True)
GRAPH_PKL  = CACHE_PATH / "roadGraph.pkl"
NODES_PARQ = CACHE_PATH / "nodes.parquet"

def load_road_graph() -> nx.MultiDiGraph:
    """Load from cache or download a drivable road graph around the kitchen."""
    if GRAPH_PKL.exists():
        return pickle.loads(GRAPH_PKL.read_bytes())
    ox.settings.log_console = False
    graph = ox.graph_from_point(
        ox.geocoder.geocode(GK_ADDRESS),
        dist=GK_RADIUS_MI * 1609.34,
        network_type="drive"
    )
    GRAPH_PKL.write_bytes(pickle.dumps(graph))
    return graph

def load_candidate_nodes(graph: nx.MultiDiGraph) -> pd.DataFrame:
    """Return Geo-qualified nodes in the same connected component as the kitchen."""
    if NODES_PARQ.exists():
        return pd.read_parquet(NODES_PARQ)

    rows: List[Dict] = []
    for node_id, data in graph.nodes(data=True):
        house, street = data.get("addr:housenumber"), data.get("addr:street")
        label = f"{house} {street}" if house and street else f"{random.randint(1,9999)} Main St"
        rows.append(
            {"node_id": node_id, "lat": data["y"], "lon": data["x"], "addr": label}
        )
    df = pd.DataFrame(rows)
    df.to_parquet(NODES_PARQ, index=False)
    return df

GRAPH = load_road_graph()
CANDIDATE_NODES = load_candidate_nodes(GRAPH)

GK_LAT, GK_LON = ox.geocoder.geocode(GK_ADDRESS)
GK_NODE = ox.distance.nearest_nodes(GRAPH, GK_LON, GK_LAT)

# keep nodes in same undirected component as the kitchen
component_id = {n: cid for cid, comp in enumerate(nx.connected_components(GRAPH.to_undirected())) for n in comp}
CANDIDATE_NODES = CANDIDATE_NODES[CANDIDATE_NODES["node_id"].map(component_id) == component_id[GK_NODE]].reset_index(drop=True)

def random_customer() -> Tuple[int, float, float, str]:
    """Return random reachable customer node (id, lat, lon, addr)."""
    row = CANDIDATE_NODES.sample(1).iloc[0]
    return int(row.node_id), row.lat, row.lon, row.addr

# ─────────────────────────────────────────────────────────────────────────────
#  4.  MENU SNAPSHOT (simplified)
# ─────────────────────────────────────────────────────────────────────────────
ITEMS_DF = spark.table("item").toPandas()        # expects pre-populated table
ITEMS_BY_BRAND = {bid: grp.to_dict("records") for bid, grp in ITEMS_DF.groupby("brand_id")}
BRAND_IDS = list(ITEMS_BY_BRAND)

def random_basket(single_brand_share: float = 0.7,
                  max_brands: int = 4,
                  items_range: Tuple[int,int] = (1,4)) -> List[Dict]:
    """Return a list of item dicts with qty."""
    if random.random() < single_brand_share:
        brands = [random.choice(BRAND_IDS)]
    else:
        brands = random.sample(BRAND_IDS, random.randint(2, max_brands))

    items: List[Dict] = []
    for brand in brands:
        for item in random.sample(ITEMS_BY_BRAND[brand], random.randint(*items_range)):
            record = item.copy()
            record["qty"] = random.randint(1, 3)
            items.append(record)
    return items

# ─────────────────────────────────────────────────────────────────────────────
#  5.  VOLUME MODELS (daily trend + minute curve)
# ─────────────────────────────────────────────────────────────────────────────
def minute_weight_vector() -> np.ndarray:
    """Sinusoidal lift for lunch/dinner windows (length 1440)."""
    weights = np.ones(1440)
    for peak in (
        ("11:00", "13:30", 3.0),     # lunch
        ("17:00", "20:00", 3.5)      # dinner
    ):
        start, end, mult = peak
        s = dt.datetime.strptime(start, "%H:%M")
        e = dt.datetime.strptime(end,   "%H:%M")
        span = e.hour*60+e.minute - (s.hour*60+s.minute)
        for m in range(s.hour*60+s.minute, e.hour*60+e.minute):
            x = (m - (s.hour*60+s.minute)) / span
            weights[m] += (mult-1) * math.sin(math.pi*x)**2
    return weights

MINUTE_WEIGHTS = minute_weight_vector()

def orders_per_day(day_index: int, total_days: int) -> float:
    """Linear growth from START_ORDERS to END_ORDERS across full range."""
    return START_ORDERS + (END_ORDERS - START_ORDERS) * (day_index / total_days)

def weekday_multiplier(date_obj: dt.date) -> float:
    mapping = {"mon":1,"tue":1.05,"wed":1.08,"thu":1.10,"fri":1.25,"sat":1.35,"sun":1.15}
    return mapping[date_obj.strftime("%a").lower()]

# ─────────────────────────────────────────────────────────────────────────────
#  6.  ROUTING HELPERS
# ─────────────────────────────────────────────────────────────────────────────
def shortest_route(lat: float, lon: float) -> Tuple[List[Tuple[float,float]], float]:
    """
    Return (polyline list[(lat,lon)], distance_m).
    Falls back to undirected graph if directed path not found.
    """
    customer_node = ox.distance.nearest_nodes(GRAPH, lon, lat)

    try:
        path = nx.shortest_path(GRAPH, GK_NODE, customer_node, weight="length")
        graph_used = GRAPH
    except nx.NetworkXNoPath:
        graph_used = GRAPH.to_undirected()
        path = nx.shortest_path(graph_used, GK_NODE, customer_node, weight="length")

    coords = [(graph_used.nodes[n]["y"], graph_used.nodes[n]["x"]) for n in path]

    distance_m = 0.0
    for u, v in zip(path[:-1], path[1:]):
        try:
            lengths = [d["length"] for d in graph_used[u][v].values()]
        except KeyError:
            lengths = [d["length"] for d in graph_used[v][u].values()]
        distance_m += min(lengths)

    return coords, distance_m

# ─────────────────────────────────────────────────────────────────────────────
#  7.  DELTA WRITER UTILITIES
# ─────────────────────────────────────────────────────────────────────────────
EVENT_QUEUE : asyncio.PriorityQueue = asyncio.PriorityQueue()
COUNTER = 0                          # unique tiebreaker
GHOST_KITCHEN_ID = uuid.uuid4().hex

def enqueue_event(ts: dt.datetime,
                  event_type: str,
                  order_id: str,
                  sequence: int,
                  payload: Dict):
    """
    Put event into priority queue ordered by (epoch, COUNTER).
    The queue’s consumer flushes to Delta.
    """
    global COUNTER
    COUNTER += 1
    EVENT_QUEUE.put_nowait((
        ts.timestamp(),
        COUNTER,
        {
            "event_id": uuid.uuid4().hex,
            "event_type": event_type,
            "ts": ts.strftime("%Y-%m-%d %H:%M:%S.%f"),
            "gk_id": GHOST_KITCHEN_ID,
            "order_id": order_id,
            "sequence": sequence,
            "body": json.dumps(payload)
        }
    ))

def flush_batch(rows: List[Dict]):
    """Write a batch of dict rows to Delta."""
    if not rows:
        return
    (
      spark.createDataFrame(rows)
           .withColumn("ts",  F.to_timestamp("ts", "yyyy-MM-dd HH:mm:ss.SSSSSS"))
           .withColumn("day", F.to_date("ts"))
           .withColumn("sequence", F.col("sequence").cast("INT"))
           .write
           .format("delta")
           .mode("append")
           .saveAsTable(TABLE_FQN)
    )

async def queue_consumer():
    """Continuously flush the priority queue to Delta in small batches."""
    buffer, last_flush = [], time.time()
    while True:
        _, _, row = await EVENT_QUEUE.get()
        buffer.append(row)
        if len(buffer) >= BATCH_ROWS or (time.time() - last_flush) >= BATCH_SECONDS:
            flush_batch(buffer)
            buffer.clear(); last_flush = time.time()

# ─────────────────────────────────────────────────────────────────────────────
#  8.  ORDER LIFECYCLE COROUTINE
# ─────────────────────────────────────────────────────────────────────────────
MICRO = lambda n: dt.timedelta(microseconds=n)   # avoid time collisions with this
NORM  = lambda mean_sd: max(0.1, random.gauss(*mean_sd))

async def play_order(created_at: dt.datetime):
    """Generate + enqueue all events for a single order."""
    order_id = uuid.uuid4().hex
    seq_num  = 0

    node_id, lat, lon, addr = random_customer()
    items = random_basket()

    route_points, dist_m = shortest_route(lat, lon)
    drive_minutes = (dist_m / 1609.34) / GK_DRIVER_MPH * 60

    svc = C["svc"]
    t_start   = created_at + dt.timedelta(minutes=NORM(svc["cs"]))
    t_finish  = t_start   + dt.timedelta(minutes=NORM(svc["sf"]))
    t_ready   = t_finish  + dt.timedelta(minutes=NORM(svc["fr"]))
    t_pickup  = t_ready   + dt.timedelta(minutes=NORM(svc["rp"]))
    t_deliver = t_pickup  + dt.timedelta(minutes=drive_minutes)

    enqueue_event(created_at+MICRO(seq_num), "order_created", order_id, seq_num,
                  {"customer_lat": lat, "customer_lon": lon, "customer_addr": addr, "items": items}); seq_num += 1
    enqueue_event(t_start+MICRO(seq_num), "gk_started", order_id, seq_num, {}); seq_num += 1
    enqueue_event(t_finish+MICRO(seq_num), "gk_finished", order_id, seq_num, {}); seq_num += 1
    enqueue_event(t_ready+MICRO(seq_num), "gk_ready", order_id, seq_num, {}); seq_num += 1
    enqueue_event(t_pickup+MICRO(seq_num), "driver_picked_up", order_id, seq_num,
                  {"route_points": route_points, "eta_mins": round(drive_minutes, 1)}); seq_num += 1

    # driver pings
    hops = max(1, int(drive_minutes*60 // PING_INTERVAL_SEC))
    for hop in range(1, hops):
        progress = hop / hops
        lat_h, lon_h = route_points[int(progress*(len(route_points)-1))]
        enqueue_event(
            t_pickup + dt.timedelta(seconds=hop*PING_INTERVAL_SEC) + MICRO(seq_num),
            "driver_ping",
            order_id,
            seq_num,
            {"progress_pct": round(progress*100, 1), "loc_lat": lat_h, "loc_lon": lon_h}
        )
        seq_num += 1

    enqueue_event(t_deliver+MICRO(seq_num), "delivered", order_id, seq_num,
                  {"delivered_lat": lat, "delivered_lon": lon})

# ─────────────────────────────────────────────────────────────────────────────
#  9.  BACK-FILL BATCH  +  REAL-TIME SCHEDULER
# ─────────────────────────────────────────────────────────────────────────────
async def run_scheduler():
    """Generate back-fill instantly, then stream future events in real time."""
    start_dt = dt.datetime.strptime(START_TS, "%Y-%m-%d %H:%M:%S")
    end_dt   = dt.datetime.strptime(END_TS,   "%Y-%m-%d %H:%M:%S")
    now_dt   = dt.datetime.utcnow()

    total_days = (end_dt.date() - start_dt.date()).days

    async def schedule_future(ts: dt.datetime):
        """Sleep (accelerated) then play the order."""
        delay = (ts - now_dt).total_seconds() / SPEED_UP
        await asyncio.sleep(max(0, delay))
        await play_order(ts)

    future_tasks: List[asyncio.Task] = []

    for day_offset in range(total_days + 1):
        current_date = start_dt.date() + dt.timedelta(days=day_offset)

        # expected mean orders for this day
        mean_orders = (orders_per_day(day_offset, total_days)
                       * weekday_multiplier(current_date)
                       * random.uniform(1-C["noise"], 1+C["noise"]))

        minute_lambdas = mean_orders / MINUTE_WEIGHTS.sum() * MINUTE_WEIGHTS

        midnight = dt.datetime.combine(current_date, dt.time.min)

        for minute, lam in enumerate(minute_lambdas):
            count = np.random.poisson(lam)
            for _ in range(count):
                timestamp = midnight + dt.timedelta(minutes=minute,
                                                    seconds=random.randint(0,59))

                if timestamp < now_dt:
                    await play_order(timestamp)               # back-fill
                else:
                    future_tasks.append(asyncio.create_task(schedule_future(timestamp)))

    # start the consumer only AFTER back-fill has filled the queue once
    consumer_task = asyncio.create_task(queue_consumer())

    if future_tasks:
        await asyncio.gather(*future_tasks)
    # allow remaining live events to flush
    await asyncio.sleep(5/SPEED_UP)
    consumer_task.cancel()

# ─────────────────────────────────────────────────────────────────────────────
# 10. MAIN ENTRY
# ─────────────────────────────────────────────────────────────────────────────
print(f"👻  Ghost-Kitchen simulator → {TABLE_FQN}  (speed×{SPEED_UP})")
await run_scheduler()