In [1]:
import pandas as pd
import numpy as np
from pathlib import Path
import glob

DATA_DIR = Path("../data/raw")
files = list(DATA_DIR.glob("*.csv"))

assert len(files) > 0, "No CSV files found in ingestion directory"

df_raw = pd.concat(
    (pd.read_csv(f) for f in files),
    ignore_index=True
)

df_raw.head()


Unnamed: 0,datetime_beginning_utc,datetime_beginning_ept,area,instantaneous_load
0,12/20/2025 4:05:00 AM,12/19/2025 11:05:00 PM,PJM RTO,104096.547
1,12/20/2025 4:00:00 AM,12/19/2025 11:00:00 PM,PJM RTO,104527.977
2,12/20/2025 3:55:00 AM,12/19/2025 10:55:00 PM,PJM RTO,105064.531
3,12/20/2025 3:50:00 AM,12/19/2025 10:50:00 PM,PJM RTO,105139.883
4,12/20/2025 3:45:00 AM,12/19/2025 10:45:00 PM,PJM RTO,105462.5


In [2]:
df = df_raw.rename(columns={
    "datetime_beginning_ept": "timestamp_ept",
    "datetime_beginning_utc": "timestamp_utc",
    "instantaneous_load": "load_mw",
    "area": "region"
})

df["timestamp_ept"] = pd.to_datetime(df["timestamp_ept"], errors="coerce")
df["timestamp_utc"] = pd.to_datetime(df["timestamp_utc"], errors="coerce")

df = df.dropna(subset=["timestamp_ept", "load_mw"]).copy()
df = df.sort_values("timestamp_ept").reset_index(drop=True)

# Timestamp alignment check

df["timestamp_5min"] = df["timestamp_ept"].dt.floor("5min")
misaligned_rate = (df["timestamp_ept"] != df["timestamp_5min"]).mean()

print(f"Misaligned timestamps: {misaligned_rate:.2%}")

#Interval completeness check

expected_intervals = pd.date_range(
    start=df["timestamp_5min"].min(),
    end=df["timestamp_5min"].max(),
    freq="5min"
)

missing_intervals = expected_intervals.difference(df["timestamp_5min"])
missing_rate = len(missing_intervals) / len(expected_intervals)

print("Missing intervals:", len(missing_intervals))
print("Missing rate:", f"{missing_rate:.2%}")

# Freshness check
now = pd.Timestamp.now(tz=df["timestamp_5min"].dt.tz)
latest_ts = df["timestamp_5min"].max()

lag_minutes = (now - latest_ts).total_seconds() / 60

print("Latest timestamp:", latest_ts)
print("Ingestion lag (minutes):", lag_minutes)

# Plausibility rules
if (df["load_mw"] < 0).any():
    raise ValueError("CRITICAL: Negative load detected")

if (df["load_mw"] == 0).any():
    print("WARNING: Zero load values detected")

# Flatline detection
df["delta_mw"] = df["load_mw"].diff()

flatline = (
    df["delta_mw"].abs() < 10
).rolling(window=6).sum() == 6  # 30 minutes

flatline_count = flatline.sum()
print("Flatline windows detected:", flatline_count)

# Alert table (decision output)
alerts = []

def add_alert(check, severity, metric, value):
    alerts.append({
        "check": check,
        "severity": severity,
        "metric": metric,
        "value": value
    })

if missing_rate > 0:
    add_alert("interval_completeness", "warning", "missing_rate", missing_rate)

if lag_minutes > 10:
    add_alert("freshness", "warning", "lag_minutes", lag_minutes)

if flatline_count > 0:
    add_alert("flatline", "critical", "count", flatline_count)

alert_df = pd.DataFrame(alerts)
alert_df


  df["timestamp_ept"] = pd.to_datetime(df["timestamp_ept"], errors="coerce")


Misaligned timestamps: 11.58%
Missing intervals: 2
Missing rate: 0.02%
Latest timestamp: 2025-12-19 23:05:00
Ingestion lag (minutes): 5804.394512083333
Flatline windows detected: 0


Unnamed: 0,check,severity,metric,value
0,interval_completeness,warning,missing_rate,0.00023
1,freshness,warning,lag_minutes,5804.394512
