# Synthetic Potential Failures Data Generator

This notebook generates scalable, configurable synthetic data for `customer_success.app_potential_failures` with the exact output schema requested. It supports:

- Parameterized generation size (target ~15k rows; adjustable)
- KPI code filtering (all or selected); optional non-KPI tasks
- Duration profiles (short/medium/long) per KPI with random start/end times
- Cross-period boundaries and cross-financial-year spans (per KPI at least one)
- Overlapping tasks by station and time window for duplicate-tracking tests
- All jobs in COMP status by default; optional status history (WAPPR→APPR→COMP)
- Station distribution across all non-null sections from `customer_success.dimStation`
- Date enrichment by joining to `core_dimdate` for Period, PeriodWeek, PeriodYear
- Start date baseline 25/05/2025 (GTS EL) over a 2-year window
- Write-first to lakehouse/local for validation; optional write to SQL Server after validation

Configuration is controlled via a single config dict below.


In [None]:
from __future__ import annotations
import os
import math
import random
import string
from datetime import datetime, timedelta
from typing import Optional, Sequence, Dict, Any, List, Tuple

import numpy as np
import pandas as pd

try:
    import pyodbc  # For SQL Server optional write
except Exception:  # keep notebook runnable without driver
    pyodbc = None

# ---------------------------
# Configuration
# ---------------------------
CONFIG: Dict[str, Any] = {
    # Generation controls
    "seed": 42,
    "target_row_count": 15000,  # dial up/down
    "start_date": "2025-05-25",  # inclusive
    "months_span": 24,  # ~2 years window

    # KPI controls
    "kpi_include": "all",  # "all" or list of codes
    "kpi_exclude": [],
    "allow_non_kpi": False,  # nice-to-have optional

    # Duration profile per KPI (minutes). Will randomize within buckets.
    # You can override by providing a mapping {kpi_code: {"short":(min,max),"medium":(...),"long":(...)}}
    "duration_profiles": {},
    # Probability weights for picking short/medium/long durations
    "duration_weights": {"short": 0.4, "medium": 0.4, "long": 0.2},

    # Overlap control (duplicates testing): number of clusters and tightness (minutes)
    "overlap_clusters_per_kpi": 3,
    "overlap_window_minutes": 120,  # events within +/- 2 hours
    "overlap_events_per_cluster": (2, 6),  # min/max per cluster

    # Cross-boundary controls
    "ensure_cross_fin_year_per_kpi": True,
    "financial_year_end": "2026-03-31",  # UK rail FY can differ; configurable
    "ensure_period_crossing_ratio": 0.1,  # ~10% of tasks will cross period boundaries

    # Station distribution
    "station_distribution_bias": "uniform",  # or 'by_volume' if dimStation has volumes

    # Statuses
    "all_comp_status": True,  # required default
    "include_status_history": False,  # nice-to-have
    "status_history_steps": ["WAPPR", "APPR", "COMP"],

    # Output controls
    "local_validate_dir": "./data/outputs",
    "write_parquet": True,
    "write_csv": False,

    # SQL Server write (post-validate)
    "sqlserver_write": False,
    "sqlserver_dsn": None,  # e.g., "Driver={ODBC Driver 17 for SQL Server};Server=...;Database=...;Trusted_Connection=yes;"
    "sqlserver_table": "customer_success.app_potential_failures_synth",
}

random.seed(CONFIG["seed"])
np.random.seed(CONFIG["seed"])

# Make output dir
os.makedirs(CONFIG["local_validate_dir"], exist_ok=True)

print("Config loaded. Seeded RNGs.")

In [None]:
# ---------------------------
# Schema and helpers
# ---------------------------
SCHEMA_COLUMNS = [
    ("TaskId", "string"),
    ("RecordID", "string"),
    ("Instruction_Code", "string"),
    ("Building", "string"),
    ("BuildingName", "string"),
    ("LocationName", "string"),
    ("ShortDescription", "string"),
    ("LongDescription", "string"),
    ("Reporter", "string"),
    ("ReporterEmail", "string"),
    ("Notes", "string"),
    ("ReportedDate", "datetime64[ns]"),
    ("DueBy", "datetime64[ns]"),
    ("ScheduledFor", "datetime64[ns]"),
    ("Finished", "datetime64[ns]"),
    ("Status", "string"),
    ("LoggedBy", "string"),
    ("LoggedOn", "datetime64[ns]"),
    ("ModifiedOn", "datetime64[ns]"),
    ("SLAStatus", "string"),
    ("CreatedTimestamp", "datetime64[ns]"),
    ("LastUploaded", "datetime64[ns]"),
    ("IsCurrent", "bool"),
    ("Period", "string"),
    ("PeriodWeek", "Int64"),  # pandas nullable int
    ("PeriodYear", "Int64"),
    ("StationSection", "string"),
    ("KPIDescription", "string"),
    ("KPICategory", "string"),
]

# Some generic helpers

def rand_string(prefix: str, n: int = 8) -> str:
    return f"{prefix}-" + "".join(random.choices(string.ascii_uppercase + string.digits, k=n))


def choose_weighted(options: Sequence[str], weights: Sequence[float]) -> str:
    return random.choices(options, weights=weights, k=1)[0]


def to_dt(x: str | datetime) -> datetime:
    return x if isinstance(x, datetime) else datetime.fromisoformat(x)


START_DATE: datetime = to_dt(CONFIG["start_date"])  # baseline
END_DATE: datetime = START_DATE + pd.DateOffset(months=CONFIG["months_span"]).to_pydatetime()  # approx
FY_END: datetime = to_dt(CONFIG["financial_year_end"])

In [None]:
# ---------------------------
# Load dimensions (KPI, Station, Date)
# ---------------------------
# The notebook supports three strategies:
# 1) Load from source database via pyodbc if configured
# 2) Load from CSVs placed in ./data/inputs
# 3) Fallback synthetic dims with reasonable defaults

INPUT_DIR = "./data/inputs"
os.makedirs(INPUT_DIR, exist_ok=True)


def load_kpi_dim() -> pd.DataFrame:
    # Expectation: 'bronze.fms_dimkpiclassification' has columns including kpi code and description/category
    # We'll look for a CSV first, else fabricate a set covering examples like Graffiti, Track Side Cleaning
    csv_path = os.path.join(INPUT_DIR, "fms_dimkpiclassification.csv")
    if os.path.exists(csv_path):
        df = pd.read_csv(csv_path)
        # normalize expected columns
        # expected columns: Code, KPIDescription, KPICategory
        colmap = {}
        for c in df.columns:
            lc = c.lower()
            if lc in ("code", "kpicode", "instruction_code"):
                colmap[c] = "Instruction_Code"
            elif lc in ("kpidescription", "description"):
                colmap[c] = "KPIDescription"
            elif lc in ("kpicategory", "category"):
                colmap[c] = "KPICategory"
        df = df.rename(columns=colmap)
        required = ["Instruction_Code", "KPIDescription", "KPICategory"]
        for r in required:
            if r not in df.columns:
                raise ValueError(f"KPI dim missing required column: {r}")
        df = df[required].drop_duplicates()
        return df

    # Fallback: fabricate a representative set of KPI codes
    data = [
        ("GRF", "Graffiti Removal", "Cleaning"),
        ("TSC", "Track Side Cleaning", "Cleaning"),
        ("LIT", "Litter Removal", "Cleaning"),
        ("LGT", "Lighting Fault", "Asset"),
        ("LFT", "Lift Fault", "Asset"),
        ("ESC", "Escalator Fault", "Asset"),
        ("WC", "Toilet Outage", "Facilities"),
        ("SIG", "Signage Damage", "Asset"),
        ("PA", "Public Address Fault", "Asset"),
        ("VND", "Vending Machine Issue", "Retail"),
    ]
    return pd.DataFrame(data, columns=["Instruction_Code", "KPIDescription", "KPICategory"]).drop_duplicates()


def load_station_dim() -> pd.DataFrame:
    # Expectation: 'customer_success.dimStation' with StationSection non-null to include
    csv_path = os.path.join(INPUT_DIR, "dimStation.csv")
    if os.path.exists(csv_path):
        df = pd.read_csv(csv_path)
        colmap = {}
        for c in df.columns:
            lc = c.lower()
            if lc in ("stationsection", "section", "station_code", "station"):
                colmap[c] = "StationSection"
            elif lc in ("building",):
                colmap[c] = "Building"
            elif lc in ("buildingname",):
                colmap[c] = "BuildingName"
            elif lc in ("locationname", "location"):
                colmap[c] = "LocationName"
        df = df.rename(columns=colmap)
        if "StationSection" not in df.columns:
            raise ValueError("Station dim missing StationSection")
        df = df[df["StationSection"].notna()].copy()
        # keep only needed for generation, allow missing building/name
        return df[[c for c in ["StationSection", "Building", "BuildingName", "LocationName"] if c in df.columns]].drop_duplicates()

    # Fallback synthetic stations
    stations = [
        ("STN001", "STN001-BLD", "Central Station", "Platform 1"),
        ("STN002", "STN002-BLD", "Riverside", "Platform 2"),
        ("STN003", "STN003-BLD", "Hilltop", "Concourse"),
        ("STN004", "STN004-BLD", "Parkway", "Entrance A"),
        ("STN005", "STN005-BLD", "West End", "Platform 4"),
    ]
    return pd.DataFrame(stations, columns=["StationSection", "Building", "BuildingName", "LocationName"]).drop_duplicates()


def load_date_dim(start: datetime, end: datetime) -> pd.DataFrame:
    # Expectation: core_dimdate providing Period, PeriodWeek, PeriodYear per calendar date
    csv_path = os.path.join(INPUT_DIR, "core_dimdate.csv")
    if os.path.exists(csv_path):
        df = pd.read_csv(csv_path, parse_dates=[c for c in ["Date", "date"] if c in pd.read_csv(csv_path, nrows=0).columns])
        # normalize
        colmap = {}
        for c in df.columns:
            lc = c.lower()
            if lc in ("date", "calendardate"):
                colmap[c] = "Date"
            elif lc in ("period", "railperiod"):
                colmap[c] = "Period"
            elif lc in ("periodweek", "railperiodweek", "period_week"):
                colmap[c] = "PeriodWeek"
            elif lc in ("periodyear", "railperiodyear", "period_year"):
                colmap[c] = "PeriodYear"
        df = df.rename(columns=colmap)
        required = ["Date", "Period", "PeriodWeek", "PeriodYear"]
        for r in required:
            if r not in df.columns:
                raise ValueError(f"Date dim missing required column: {r}")
        # filter to range
        mask = (df["Date"] >= start.date()) & (df["Date"] <= end.date())
        return df.loc[mask, required].drop_duplicates().sort_values("Date").reset_index(drop=True)

    # Fallback synthetic: build daily rows with simple 4-week periods and 13 periods per FY
    dates = pd.date_range(start=start.date(), end=end.date(), freq="D")

    def rail_period(d: pd.Timestamp) -> Tuple[str, int, int]:
        # Simplified: FY starts April 1, 13 periods of 28 days each
        fy_year = d.year if d >= pd.Timestamp(year=d.year, month=4, day=1) else d.year - 1
        epoch = pd.Timestamp(year=fy_year, month=4, day=1)
        delta_days = (d - epoch).days
        period_index = (delta_days // 28) % 13 + 1
        week_in_period = (delta_days % 28) // 7 + 1
        period_label = f"P{period_index:02d}/{fy_year}"
        return period_label, week_in_period, fy_year

    rows = []
    for d in dates:
        p, w, y = rail_period(d)
        rows.append({"Date": d.normalize(), "Period": p, "PeriodWeek": w, "PeriodYear": y})
    return pd.DataFrame(rows)


kpi_dim = load_kpi_dim()
station_dim = load_station_dim()
date_dim = load_date_dim(START_DATE, END_DATE)

print(f"Loaded KPI: {len(kpi_dim)} codes; Stations: {len(station_dim)}; Dates: {len(date_dim)}")

In [None]:
# ---------------------------
# KPI selection and duration profiles
# ---------------------------
# Filter to KPI codes only, as requested (exclude non-KPI)

if CONFIG["kpi_include"] == "all":
    selected_kpis = kpi_dim["Instruction_Code"].tolist()
else:
    selected_kpis = [k for k in kpi_dim["Instruction_Code"].tolist() if k in set(CONFIG["kpi_include"])]

selected_kpis = [k for k in selected_kpis if k not in set(CONFIG["kpi_exclude"])]

# Build duration profiles for each KPI code
DEFAULT_DURATION_PROFILES = {
    # minutes: ranges are inclusive of variability
    "short": (15, 120),
    "medium": (121, 480),
    "long": (481, 2880),  # up to 2 days for a single task
}

kpi_to_profiles: Dict[str, Dict[str, Tuple[int, int]]] = {}
for code in selected_kpis:
    override = CONFIG["duration_profiles"].get(code) if isinstance(CONFIG.get("duration_profiles"), dict) else None
    if override:
        prof = {k: tuple(v) for k, v in override.items()}
    else:
        prof = DEFAULT_DURATION_PROFILES.copy()
    kpi_to_profiles[code] = prof

print(f"Selected KPI codes: {len(selected_kpis)}")

In [None]:
# ---------------------------
# Core generators
# ---------------------------

def random_duration_minutes(code: str) -> int:
    buckets = list(CONFIG["duration_weights"].keys())
    weights = list(CONFIG["duration_weights"].values())
    bucket = choose_weighted(buckets, weights)
    low, high = kpi_to_profiles[code][bucket]
    return random.randint(low, high)


def random_datetime_within_window(start: datetime, end: datetime) -> datetime:
    total_seconds = int((end - start).total_seconds())
    if total_seconds <= 0:
        return start
    offset = random.randint(0, total_seconds)
    return start + timedelta(seconds=offset)


def generate_task_row(code: str, station: pd.Series, date_dim: pd.DataFrame) -> Dict[str, Any]:
    # base times
    reported = random_datetime_within_window(START_DATE, END_DATE)
    duration_min = random_duration_minutes(code)
    start_time = reported + timedelta(minutes=random.randint(0, 240))  # start after reported
    finish_time = start_time + timedelta(minutes=duration_min)
    due_by = reported + timedelta(hours=random.randint(6, 72))
    scheduled_for = reported + timedelta(hours=random.randint(1, 48))

    # join to date dim for Period fields using date component of LoggedOn or ReportedDate
    logged_on_date = pd.Timestamp(reported.date())
    dd = date_dim.loc[date_dim["Date"] == logged_on_date]
    if dd.empty:
        dd = date_dim.iloc[[random.randrange(0, len(date_dim))]]

    period = dd.iloc[0]["Period"]
    period_week = int(dd.iloc[0]["PeriodWeek"]) if not pd.isna(dd.iloc[0]["PeriodWeek"]) else None
    period_year = int(dd.iloc[0]["PeriodYear"]) if not pd.isna(dd.iloc[0]["PeriodYear"]) else None

    # descriptions
    kpi_row = kpi_dim.loc[kpi_dim["Instruction_Code"] == code].iloc[0]
    short_desc = f"{kpi_row['KPIDescription']} at {station['StationSection']}"
    long_desc = f"{short_desc}. Duration approx {duration_min} minutes."

    status = "COMP" if CONFIG["all_comp_status"] else choose_weighted(["WAPPR", "APPR", "COMP"], [0.05, 0.15, 0.8])

    row = {
        "TaskId": rand_string("TASK"),
        "RecordID": rand_string("REC"),
        "Instruction_Code": code,
        "Building": station.get("Building", None),
        "BuildingName": station.get("BuildingName", None),
        "LocationName": station.get("LocationName", None),
        "ShortDescription": short_desc,
        "LongDescription": long_desc,
        "Reporter": random.choice(["John Smith", "Jane Doe", "Ops Bot", "Station Manager", "Anonymous"]),
        "ReporterEmail": random.choice(["john@example.com", "jane@example.com", "ops@example.com", None]),
        "Notes": random.choice([None, "Urgent", "Follow-up required", "Photographic evidence attached"]),
        "ReportedDate": reported,
        "DueBy": due_by,
        "ScheduledFor": scheduled_for,
        "Finished": finish_time,
        "Status": status,
        "LoggedBy": random.choice(["OpsConsole", "MobileApp", "WebPortal"]),
        "LoggedOn": reported,
        "ModifiedOn": finish_time,
        "SLAStatus": random.choice(["MET", "BREACH", "AT_RISK"]),
        "CreatedTimestamp": reported,
        "LastUploaded": finish_time,
        "IsCurrent": True,
        "Period": period,
        "PeriodWeek": period_week,
        "PeriodYear": period_year,
        "StationSection": station["StationSection"],
        "KPIDescription": kpi_row["KPIDescription"],
        "KPICategory": kpi_row["KPICategory"],
    }
    return row


def force_cross_fin_year_case(code: str, station: pd.Series) -> Dict[str, Any]:
    # Start before FY end and finish after FY end
    start_before = FY_END - timedelta(days=random.randint(1, 14))
    # ensure spanning many hours/days
    finish_after = FY_END + timedelta(days=random.randint(1, 30), hours=random.randint(0, 23))

    # Use reported/logged around start_before to keep consistency
    reported = start_before - timedelta(hours=random.randint(0, 48))

    # Join Period by reported date
    dd = date_dim.loc[date_dim["Date"] == pd.Timestamp(reported.date())]
    if dd.empty:
        dd = date_dim.iloc[[random.randrange(0, len(date_dim))]]

    kpi_row = kpi_dim.loc[kpi_dim["Instruction_Code"] == code].iloc[0]

    return {
        "TaskId": rand_string("TASK"),
        "RecordID": rand_string("REC"),
        "Instruction_Code": code,
        "Building": station.get("Building", None),
        "BuildingName": station.get("BuildingName", None),
        "LocationName": station.get("LocationName", None),
        "ShortDescription": f"{kpi_row['KPIDescription']} cross-FY test",
        "LongDescription": f"Spans FY end at {FY_END.date()} for rollover testing.",
        "Reporter": "Station Manager",
        "ReporterEmail": "manager@example.com",
        "Notes": "Cross-year scenario",
        "ReportedDate": reported,
        "DueBy": start_before + timedelta(days=7),
        "ScheduledFor": start_before,
        "Finished": finish_after,
        "Status": "COMP",
        "LoggedBy": "OpsConsole",
        "LoggedOn": reported,
        "ModifiedOn": finish_after,
        "SLAStatus": random.choice(["MET", "BREACH", "AT_RISK"]),
        "CreatedTimestamp": reported,
        "LastUploaded": finish_after,
        "IsCurrent": True,
        "Period": dd.iloc[0]["Period"],
        "PeriodWeek": int(dd.iloc[0]["PeriodWeek"]) if not pd.isna(dd.iloc[0]["PeriodWeek"]) else None,
        "PeriodYear": int(dd.iloc[0]["PeriodYear"]) if not pd.isna(dd.iloc[0]["PeriodYear"]) else None,
        "StationSection": station["StationSection"],
        "KPIDescription": kpi_row["KPIDescription"],
        "KPICategory": kpi_row["KPICategory"],
    }


def generate_overlap_cluster(code: str, station: pd.Series, center: datetime) -> List[Dict[str, Any]]:
    rows: List[Dict[str, Any]] = []
    n = random.randint(*CONFIG["overlap_events_per_cluster"])
    for _ in range(n):
        # reported around the center time within overlap window
        half = CONFIG["overlap_window_minutes"]
        reported = center + timedelta(minutes=random.randint(-half, half))
        duration_min = random_duration_minutes(code)
        start_time = reported + timedelta(minutes=random.randint(0, 90))
        finish_time = start_time + timedelta(minutes=duration_min)

        dd = date_dim.loc[date_dim["Date"] == pd.Timestamp(reported.date())]
        if dd.empty:
            dd = date_dim.iloc[[random.randrange(0, len(date_dim))]]
        kpi_row = kpi_dim.loc[kpi_dim["Instruction_Code"] == code].iloc[0]

        rows.append({
            "TaskId": rand_string("TASK"),
            "RecordID": rand_string("REC"),
            "Instruction_Code": code,
            "Building": station.get("Building", None),
            "BuildingName": station.get("BuildingName", None),
            "LocationName": station.get("LocationName", None),
            "ShortDescription": f"{kpi_row['KPIDescription']} possible duplicate",
            "LongDescription": "Intentional temporal overlap for duplicate testing",
            "Reporter": random.choice(["Ops Bot", "Station Manager"]),
            "ReporterEmail": random.choice(["ops@example.com", "manager@example.com", None]),
            "Notes": "overlap-cluster",
            "ReportedDate": reported,
            "DueBy": reported + timedelta(hours=random.randint(1, 48)),
            "ScheduledFor": reported + timedelta(hours=random.randint(0, 24)),
            "Finished": finish_time,
            "Status": "COMP",
            "LoggedBy": "MobileApp",
            "LoggedOn": reported,
            "ModifiedOn": finish_time,
            "SLAStatus": random.choice(["MET", "BREACH", "AT_RISK"]),
            "CreatedTimestamp": reported,
            "LastUploaded": finish_time,
            "IsCurrent": True,
            "Period": dd.iloc[0]["Period"],
            "PeriodWeek": int(dd.iloc[0]["PeriodWeek"]) if not pd.isna(dd.iloc[0]["PeriodWeek"]) else None,
            "PeriodYear": int(dd.iloc[0]["PeriodYear"]) if not pd.isna(dd.iloc[0]["PeriodYear"]) else None,
            "StationSection": station["StationSection"],
            "KPIDescription": kpi_row["KPIDescription"],
            "KPICategory": kpi_row["KPICategory"],
        })
    return rows

In [None]:
# ---------------------------
# Main generation routine
# ---------------------------

def generate_tasks() -> pd.DataFrame:
    total = CONFIG["target_row_count"]

    # Distribute roughly evenly across KPI codes
    per_kpi_base = total // max(1, len(selected_kpis))
    surplus = total - per_kpi_base * len(selected_kpis)

    rows: List[Dict[str, Any]] = []

    # Pre-calc some stations index for speed
    stations_idx = station_dim.index.tolist()

    # 1) Base distribution
    for i, code in enumerate(selected_kpis):
        count = per_kpi_base + (1 if i < surplus else 0)
        for _ in range(count):
            st = station_dim.loc[random.choice(stations_idx)]
            rows.append(generate_task_row(code, st, date_dim))

    # 2) Ensure cross-financial-year per KPI if requested
    if CONFIG["ensure_cross_fin_year_per_kpi"]:
        for code in selected_kpis:
            st = station_dim.loc[random.choice(stations_idx)]
            rows.append(force_cross_fin_year_case(code, st))

    # 3) Period crossing ratio: lengthen some tasks to cross period boundaries
    if CONFIG["ensure_period_crossing_ratio"] > 0:
        n_cross = max(1, int(len(rows) * CONFIG["ensure_period_crossing_ratio"]))
        for _ in range(n_cross):
            code = random.choice(selected_kpis)
            st = station_dim.loc[random.choice(stations_idx)]
            # pick a start close to the end of a period date, then long duration
            # choose a date from last days of a period
            last_days = date_dim.groupby("Period").tail(2)["Date"].tolist()
            if last_days:
                start_date = random.choice(last_days)
            else:
                start_date = pd.Timestamp(random_datetime_within_window(START_DATE, END_DATE).date())
            reported = datetime.combine(start_date.to_pydatetime().date(), datetime.min.time()) + timedelta(hours=random.randint(0, 20))
            # long duration to cross into next period
            duration_min = random.randint(24 * 60, 3 * 24 * 60)
            start_time = reported + timedelta(minutes=random.randint(0, 180))
            finish_time = start_time + timedelta(minutes=duration_min)

            dd = date_dim.loc[date_dim["Date"] == start_date]
            if dd.empty:
                dd = date_dim.iloc[[random.randrange(0, len(date_dim))]]
            kpi_row = kpi_dim.loc[kpi_dim["Instruction_Code"] == code].iloc[0]

            rows.append({
                "TaskId": rand_string("TASK"),
                "RecordID": rand_string("REC"),
                "Instruction_Code": code,
                "Building": st.get("Building", None),
                "BuildingName": st.get("BuildingName", None),
                "LocationName": st.get("LocationName", None),
                "ShortDescription": f"{kpi_row['KPIDescription']} period-crossing",
                "LongDescription": "Intentionally spans rail period boundary",
                "Reporter": "Ops Bot",
                "ReporterEmail": "ops@example.com",
                "Notes": "period-cross",
                "ReportedDate": reported,
                "DueBy": reported + timedelta(days=3),
                "ScheduledFor": reported,
                "Finished": finish_time,
                "Status": "COMP",
                "LoggedBy": "OpsConsole",
                "LoggedOn": reported,
                "ModifiedOn": finish_time,
                "SLAStatus": random.choice(["MET", "BREACH", "AT_RISK"]),
                "CreatedTimestamp": reported,
                "LastUploaded": finish_time,
                "IsCurrent": True,
                "Period": dd.iloc[0]["Period"],
                "PeriodWeek": int(dd.iloc[0]["PeriodWeek"]) if not pd.isna(dd.iloc[0]["PeriodWeek"]) else None,
                "PeriodYear": int(dd.iloc[0]["PeriodYear"]) if not pd.isna(dd.iloc[0]["PeriodYear"]) else None,
                "StationSection": st["StationSection"],
                "KPIDescription": kpi_row["KPIDescription"],
                "KPICategory": kpi_row["KPICategory"],
            })

    # 4) Overlap clusters by station/time for selected KPIs
    for code in selected_kpis:
        for _ in range(CONFIG["overlap_clusters_per_kpi"]):
            st = station_dim.loc[random.choice(stations_idx)]
            center = random_datetime_within_window(START_DATE, END_DATE)
            rows.extend(generate_overlap_cluster(code, st, center))

    df = pd.DataFrame(rows)

    # Trim to exact target if overshot
    if len(df) > CONFIG["target_row_count"]:
        df = df.sample(CONFIG["target_row_count"], random_state=CONFIG["seed"]).reset_index(drop=True)

    # Enforce dtypes and column order
    for col, dtype in SCHEMA_COLUMNS:
        if col not in df.columns:
            df[col] = pd.Series([None] * len(df))
        if dtype.startswith("datetime"):
            df[col] = pd.to_datetime(df[col], errors="coerce")
        elif dtype == "bool":
            df[col] = df[col].astype("boolean").fillna(True)
        elif dtype == "Int64":
            df[col] = df[col].astype("Int64")
        else:
            df[col] = df[col].astype("string")

    df = df[[c for c, _ in SCHEMA_COLUMNS]]
    return df


df_tasks = generate_tasks()
print(df_tasks.shape)
df_tasks.head(3)