# Casper's Kitchens - Simple Canonical Data Replay

**State management:**
- Watermark: derived from max timestamp in written data
- Sim start time: stored in `_sim_start` file

**Exactly-once:** If job fails before updating watermark, next run processes same window again (idempotent)

## Configuration

In [None]:
import os
from pyspark.sql import functions as F
from datetime import datetime, timedelta
import pandas as pd

# Create widgets if running interactively
try:
    dbutils.widgets.text("CATALOG", "caspersdev")
    dbutils.widgets.text("SCHEMA", "simulator")
    dbutils.widgets.text("VOLUME", "events")
    dbutils.widgets.text("START_DAY", "70")
    dbutils.widgets.text("SPEED_MULTIPLIER", "60.0")
except:
    pass

# Get parameters
CATALOG = dbutils.widgets.get("CATALOG")
SCHEMA = dbutils.widgets.get("SCHEMA")
VOLUME = dbutils.widgets.get("VOLUME")
START_DAY = int(dbutils.widgets.get("START_DAY"))
SPEED_MULTIPLIER = float(dbutils.widgets.get("SPEED_MULTIPLIER"))

# Paths
VOLUME_PATH = f"/Volumes/{CATALOG}/{SCHEMA}/{VOLUME}"
WATERMARK_PATH = f"/Volumes/{CATALOG}/{SCHEMA}/misc/_watermark"
SIM_START_PATH = f"/Volumes/{CATALOG}/{SCHEMA}/misc/_sim_start"

# Constants
DATASET_EPOCH = int(datetime(2024, 1, 1).timestamp())
DATASET_DAYS = 90
CYCLE_SECONDS = DATASET_DAYS * 86400
NOW = datetime.utcnow()

print(f"Config: START_DAY={START_DAY}, SPEED={SPEED_MULTIPLIER}x")
print(f"Output: {VOLUME_PATH}")
print(f"Dataset cycle: {DATASET_DAYS} days ({CYCLE_SECONDS} seconds)")


## Load Canonical Dataset (Pandas)

In [None]:
# Use pandas to load from workspace (spark.read doesn't work with workspace paths)
print("Loading canonical dataset...")
events_pdf = pd.read_parquet("./canonical_dataset/events.parquet")
print(f"Loaded {len(events_pdf):,} events")


## Read State Files

In [None]:
# Try to read watermark (last processed virtual simulation timestamp)
try:
    watermark_data = spark.read.text(WATERMARK_PATH).first()[0]
    last_sim_seconds = int(watermark_data)
    is_first_run = False
    virtual_day = int((last_sim_seconds - DATASET_EPOCH) / 86400)
    loop_index = int((last_sim_seconds - DATASET_EPOCH) / CYCLE_SECONDS)
    print(f"Watermark: {last_sim_seconds} (virtual day {virtual_day}, loop {loop_index})")
except:
    last_sim_seconds = DATASET_EPOCH
    is_first_run = True
    print("No watermark - first run")

# Try to read simulation start time
try:
    sim_start_data = spark.read.text(SIM_START_PATH).first()[0]
    sim_start_time = datetime.fromisoformat(sim_start_data)
    print(f"Sim started: {sim_start_time.isoformat()}")
except:
    sim_start_time = NOW
    print(f"Establishing sim start: {sim_start_time.isoformat()}")


## Calculate New Position

In [None]:
if is_first_run:
    # First run: day 0 -> START_DAY + current time
    current_tod = (NOW.hour * 3600) + (NOW.minute * 60) + NOW.second
    new_end_seconds = int(DATASET_EPOCH + (START_DAY * 86400) + current_tod)
    print(f"\nFIRST RUN: day 0 -> day {START_DAY} @ {NOW.strftime('%H:%M:%S')}")
else:
    # Subsequent runs: apply speed multiplier
    elapsed_real = (NOW - sim_start_time).total_seconds()
    elapsed_sim = int(elapsed_real * SPEED_MULTIPLIER)

    sim_start_tod = (sim_start_time.hour * 3600) + (sim_start_time.minute * 60) + sim_start_time.second
    start_position = int(DATASET_EPOCH + (START_DAY * 86400) + sim_start_tod)
    new_end_seconds = start_position + elapsed_sim

    print("\nSPEED MODE:")
    print(f"   Real elapsed: {elapsed_real:.0f}s ({elapsed_real/60:.1f} min)")
    print(f"   Sim elapsed: {elapsed_sim}s ({elapsed_sim/3600:.1f} hours)")

start_virtual_day = int((last_sim_seconds - DATASET_EPOCH) / 86400)
end_virtual_day = int((new_end_seconds - DATASET_EPOCH) / 86400)
start_loop = int((last_sim_seconds - DATASET_EPOCH) / CYCLE_SECONDS)
end_loop = int((new_end_seconds - DATASET_EPOCH) / CYCLE_SECONDS)

print(f"   Processing: virtual day {start_virtual_day} -> {end_virtual_day}")
print(f"   Loops touched: {start_loop} -> {end_loop}")

if new_end_seconds <= last_sim_seconds:
    print("No new data")
    dbutils.notebook.exit("No new data")


## Filter Events

In [None]:
# Build virtual window across one or more 90-day loops
segments = []
start_loop = int((last_sim_seconds - DATASET_EPOCH) / CYCLE_SECONDS)
end_loop = int((new_end_seconds - DATASET_EPOCH) / CYCLE_SECONDS)

for loop_idx in range(start_loop, end_loop + 1):
    loop_start_virtual = DATASET_EPOCH + (loop_idx * CYCLE_SECONDS)
    loop_end_virtual = loop_start_virtual + CYCLE_SECONDS

    segment_start_virtual = max(last_sim_seconds, loop_start_virtual)
    segment_end_virtual = min(new_end_seconds, loop_end_virtual)

    if segment_end_virtual <= segment_start_virtual:
        continue

    # Map this virtual segment back into the base 90-day dataset window
    segment_start_dataset = DATASET_EPOCH + (segment_start_virtual - loop_start_virtual)
    segment_end_dataset = DATASET_EPOCH + (segment_end_virtual - loop_start_virtual)

    segment_pdf = events_pdf[
        (events_pdf["ts_seconds"] > segment_start_dataset) &
        (events_pdf["ts_seconds"] <= segment_end_dataset)
    ].copy()

    if segment_pdf.empty:
        continue

    # Keep timestamps increasing forever across loops
    segment_pdf["virtual_ts_seconds"] = segment_pdf["ts_seconds"] + (loop_idx * CYCLE_SECONDS)

    # Keep order IDs unique across loops without adding schema columns
    if loop_idx > 0:
        segment_pdf["order_id"] = segment_pdf["order_id"].astype(str) + f"-L{loop_idx}"

    segments.append(segment_pdf)

if not segments:
    print("No events in window")
    dbutils.notebook.exit("No events")

new_events_pdf = pd.concat(segments, ignore_index=True)
event_count = len(new_events_pdf)
print(f"Processing {event_count:,} events")

# Convert to Spark DataFrame
new_events = spark.createDataFrame(new_events_pdf)


## Transform

In [None]:
# Calculate time shift
today_midnight = datetime(NOW.year, NOW.month, NOW.day)
dataset_day_0 = today_midnight - timedelta(days=START_DAY)
TIME_SHIFT = int((dataset_day_0 - datetime(2024, 1, 1)).total_seconds())

# Transform
final_df = new_events     .withColumn("event_type",
        F.when(F.col("event_type_id") == 1, "order_created")
         .when(F.col("event_type_id") == 2, "gk_started")
         .when(F.col("event_type_id") == 3, "gk_finished")
         .when(F.col("event_type_id") == 4, "gk_ready")
         .when(F.col("event_type_id") == 5, "driver_arrived")
         .when(F.col("event_type_id") == 6, "driver_picked_up")
         .when(F.col("event_type_id") == 7, "driver_ping")
         .when(F.col("event_type_id") == 8, "delivered")
    )     .withColumn("ts",
        F.date_format(F.from_unixtime(F.col("virtual_ts_seconds") + F.lit(TIME_SHIFT)), "yyyy-MM-dd HH:mm:ss.SSS")
    )     .withColumn("body",
        F.when(F.col("event_type") == "order_created",
            F.to_json(F.struct(
                F.col("customer_lat").cast("double").alias("customer_lat"),
                F.col("customer_lon").cast("double").alias("customer_lon"),
                F.col("customer_addr"),
                F.from_json(F.col("items_json"), 
                    "array<struct<id:int,category_id:int,menu_id:int,brand_id:int,name:string,price:double,qty:int>>"
                ).alias("items")
            ))
        )
        .when(F.col("event_type") == "driver_picked_up",
            F.when(F.col("route_json").isNotNull(),
                F.to_json(F.struct(F.from_json(F.col("route_json"), "array<array<double>>").alias("route_points")))
            ).otherwise(F.lit("{}"))
        )
        .when(F.col("event_type") == "driver_ping",
            F.when(F.col("ping_lat").isNotNull(),
                F.to_json(F.struct(
                    F.col("ping_progress").cast("double").alias("progress_pct"),
                    F.col("ping_lat").cast("double").alias("loc_lat"),
                    F.col("ping_lon").cast("double").alias("loc_lon")
                ))
            ).otherwise(F.lit("{}"))
        )
        .when(F.col("event_type") == "delivered",
            F.when(F.col("customer_lat").isNotNull(),
                F.to_json(F.struct(
                    F.col("customer_lat").cast("double").alias("delivered_lat"),
                    F.col("customer_lon").cast("double").alias("delivered_lon")
                ))
            ).otherwise(F.lit("{}"))
        )
        .otherwise(F.lit("{}"))
    )     .withColumn("event_id", F.expr("uuid()"))     .select("event_id", "event_type", "ts", "location_id", "order_id", "sequence", "body")

print("Transformed")


## Write & Update State

In [None]:
# Write
final_df.write.mode("append").json(VOLUME_PATH)
print(f"Wrote {event_count:,} events")

# Update watermark (virtual simulation cursor)
spark.createDataFrame([(str(new_end_seconds),)], ["value"]).write.mode("overwrite").text(WATERMARK_PATH)
end_virtual_day = int((new_end_seconds - DATASET_EPOCH) / 86400)
end_loop = int((new_end_seconds - DATASET_EPOCH) / CYCLE_SECONDS)
print(f"Watermark: {new_end_seconds} (virtual day {end_virtual_day}, loop {end_loop})")

# Save sim start time (first run only)
if is_first_run:
    spark.createDataFrame([(sim_start_time.isoformat(),)], ["value"]).write.mode("overwrite").text(SIM_START_PATH)
    print("Saved sim start time")

print("\nComplete!")
