In [None]:
# --- STATIC SIMULATION FOR THRESHOLD EXCEEDANCE ---
# Works with sklearn .pkl or Keras .h5 models
# Assumes you already have a dataframe `df` with weather/time/polarization features
# and a raw fidelity column for ground-truth lookahead.

import os
import math
import numpy as np
import pandas as pd

from sklearn.metrics import confusion_matrix, classification_report, r2_score, mean_squared_error
from sklearn.preprocessing import StandardScaler

# If you plan to load .pkl models:
import joblib

# If you plan to load .h5 models:
try:
    from tensorflow.keras.models import load_model as keras_load_model
except Exception:
    keras_load_model = None  # only needed if you actually load a .h5

# =========================
# CONFIG (edit these)
# =========================
MODEL_PATH = "model.pkl"     # e.g., "xgb.pkl", "svr.pkl", or "nn.h5"
FEATS      = [               # exact feature list used in training, in order
    # e.g., "temp_x", "temp_y", "tod_sin", "tod_cos", "wind_u_x", ...
    # Fill with your training-time feature names:
]
FID_COL    = "Fidelity"      # raw polarization fidelity column for ground-truth lookahead
TARGET_KIND = "rstdev"       # one of: "rstdev", "rextdiff", "absdiff"
THRESH     = 0.10            # absolute fidelity-drift tolerance (example)
HORIZON_H  = 300             # how far ahead to look (in samples) for a true exceedance
RSTDEV_WINDOW = 300          # model’s intended window length for std (if relevant)
GAUSS_P_EXCEED_THRESH = 0.05 # for rstdev: threshold on predicted exceedance percent -> flag

# Optional: standardize X for Keras models if they were trained on standardized inputs
APPLY_SCALER = False
SCALER_MEAN = None  # set np.array of means if you saved them
SCALER_STD  = None  # set np.array of stds   if you saved them

# Optional: post-processing for model outputs (if you trained on a scaled y)
Y_MEAN = None
Y_STD  = None

# =========================
# UTILITIES
# =========================
def load_any_model(path):
    ext = os.path.splitext(path)[1].lower()
    if ext == ".pkl":
        return joblib.load(path), "sklearn"
    elif ext == ".h5":
        if keras_load_model is None:
            raise RuntimeError("TensorFlow/Keras not available to load .h5 models.")
        return keras_load_model(path), "keras"
    else:
        raise ValueError(f"Unsupported model extension: {ext}")

def predict_any(model, kind, X):
    if kind == "sklearn":
        yhat = model.predict(X)
    else:
        # Keras expects 2D (N, F). If your NN expects sequences, adapt here.
        yhat = model.predict(X, verbose=0).ravel()
    return np.asarray(yhat, dtype=float)

def maybe_unscale_y(yhat):
    if (Y_MEAN is not None) and (Y_STD is not None) and (Y_STD != 0):
        return yhat * float(Y_STD) + float(Y_MEAN)
    return yhat

def gaussian_exceed_prob(sigma, tol):
    """Two-sided exceedance P(|X|>tol) for N(0, sigma^2)."""
    if sigma <= 0:
        return 0.0
    z = tol / sigma
    # 1 - Phi(z) using error function
    tail = 0.5 * math.erfc(z / math.sqrt(2.0))
    return 2.0 * tail

def chebyshev_upper_bound(sigma, tol):
    """Chebyshev upper bound on P(|X - mu| >= tol) <= (sigma/tol)^2."""
    if sigma <= 0:
        return 0.0
    r = sigma / tol
    return float(min(1.0, r * r))

def ground_truth_exceed_in_horizon(fid, idx, tol, H):
    """
    True label for a given index: does |fidelity[t+h] - fidelity[t]| exceed tol at
    any step 1..H? (You can change the reference if needed.)
    """
    ref = fid[idx]
    end = min(len(fid), idx + H + 1)
    if end <= idx + 1:
        return 0
    segment = fid[idx+1:end]
    return int(np.any(np.abs(segment - ref) > tol))

def split_train_val_test(n, tr=0.70, va=0.15):
    n_tr = int(n * tr)
    n_va = int(n * va)
    n_te = n - n_tr - n_va
    return slice(0, n_tr), slice(n_tr, n_tr + n_va), slice(n_tr + n_va, n)

# =========================
# MAIN SIMULATION
# =========================
# 0) Basic checks
assert isinstance(df, pd.DataFrame), "Expected a DataFrame named `df` to be present."
for c in FEATS + [FID_COL]:
    if c not in df.columns:
        raise KeyError(f"Column `{c}` not in df.")

# 1) Prep features/labels (test split only)
X_all = df[FEATS].copy()
fid   = df[FID_COL].astype(float).to_numpy()

idx_tr, idx_va, idx_te = split_train_val_test(len(df))
X_te = X_all.iloc[idx_te].reset_index(drop=True)
fid_te = fid[idx_te]

if APPLY_SCALER:
    if SCALER_MEAN is None or SCALER_STD is None:
        raise RuntimeError("APPLY_SCALER=True but SCALER_MEAN/SCALER_STD not provided.")
    X_te = (X_te.to_numpy(dtype=float) - SCALER_MEAN) / SCALER_STD
else:
    X_te = X_te.to_numpy(dtype=float)

# 2) Load model and predict on test features
model, model_kind = load_any_model(MODEL_PATH)
yhat_te = predict_any(model, model_kind, X_te)
yhat_te = maybe_unscale_y(yhat_te)

# 3) Interpretation layer → predicted exceed flag (and extras for rstdev)
pred_flag = np.zeros(len(yhat_te), dtype=int)
gauss_p   = np.full(len(yhat_te), np.nan)
cheb_p    = np.full(len(yhat_te), np.nan)

if TARGET_KIND == "rstdev":
    # yhat = predicted sigma over a window; convert to exceedance probabilities
    for i, s in enumerate(yhat_te):
        gp = gaussian_exceed_prob(s, THRESH)
        cb = chebyshev_upper_bound(s, THRESH)
        gauss_p[i] = gp
        cheb_p[i]  = cb
        pred_flag[i] = int(gp >= GAUSS_P_EXCEED_THRESH)

elif TARGET_KIND == "rextdiff":
    # yhat = predicted peak-to-peak over a window
    pred_flag = (yhat_te > THRESH).astype(int)

elif TARGET_KIND == "absdiff":
    # yhat = predicted absolute first-difference at the next step
    pred_flag = (yhat_te > THRESH).astype(int)

else:
    raise ValueError("TARGET_KIND must be one of {'rstdev','rextdiff','absdiff'}.")

# 4) Ground-truth labels via lookahead in the *actual fidelity*
true_flag = np.zeros(len(fid_te), dtype=int)
for i in range(len(fid_te)):
    true_flag[i] = ground_truth_exceed_in_horizon(fid_te, i, THRESH, HORIZON_H)

# 5) Align lengths (if model produced fewer predictions for any reason)
m = min(len(pred_flag), len(true_flag))
pred_flag = pred_flag[:m]
true_flag = true_flag[:m]
if TARGET_KIND == "rstdev":
    gauss_p = gauss_p[:m]
    cheb_p  = cheb_p[:m]

# 6) Metrics
cm = confusion_matrix(true_flag, pred_flag, labels=[0,1])
tn, fp, fn, tp = cm.ravel()
print("=== Confusion Matrix (rows=true, cols=pred) ===")
print(pd.DataFrame(cm, index=["True 0","True 1"], columns=["Pred 0","Pred 1"]))
print("\n=== Classification Report ===")
print(classification_report(true_flag, pred_flag, digits=3))

if TARGET_KIND == "rstdev":
    print(f"\nAvg predicted Gaussian exceedance P(|X|>{THRESH}): {np.nanmean(gauss_p):.4f}")
    print(f"Avg predicted Chebyshev upper bound:            {np.nanmean(cheb_p):.4f}")

# 7) Optional: regression diagnostics of raw predictions vs simple targets
#    (only meaningful if your model target is comparable to a numeric truth)
try:
    rmse = lambda a,b: float(np.sqrt(mean_squared_error(a,b)))
    print("\n=== Regression Diagnostics (if applicable) ===")
    print(f"Sample size used: {m}")
    print("Note: These RMSE/R^2 values are meaningful only if yhat is directly comparable to a numeric truth.")
except Exception:
    pass


In [None]:
class StaticSim():
    def __init__(self,args):
        self.pred_method = args["pred_method"]
        self.target = args["pred_targ"]
        self.thresh = args["thresh"]
        self.gauss_thresh = self.thresh / 2
        self.win_size = args["tol"]
        self.sr = args["sr"]
        self.plot = args["plot"]
        self.poincare = args["poincare"]
        self.output_scale = args["output_scaled"]
        self.input_scale = args["input_scaled"]

        self.load_model(args["model_path"])
        self.load_data(args["data_path"], args["posix_feat_name"], args["test_start_posix"])

        if self.output_scale:
            self.output_scale_mean = args["output_scale_mean"]
            self.output_scale_stdev = args["output_scale_stdev"]

        if self.input_scale:
            self.apply_input_scaling(args["input_scale_mean"], args["input_scale_stdev"])

    def load_model(self, model_path):
        # TODO add error checking and checking to see if its an sklearn model or a tf/keras model
        with open(model_path, 'rb') as f:
            self.model = pickle.load(f)

    def load_data(self, data_path, posix_name, posix_val):
        df = pd.read_csv(data_path)
        # TODO add error checking
        self.df = df[df[posix_name] >= posix_val].reset_index(drop=True)

    def generate_preds(self):
        pass

    def update_flags(self, yhat):
        method_name = f"update_flags_{self.pred_method}"
        if not hasattr(self, method_name):
            raise ValueError(f"No update method exists for {method_name}")
        method = getattr(self, method_name)
        return method(yhat)

    def update_flags_rstdev(self, yhat):
        pred_flag = np.zeros(len(yhat), dtype=int)
        gauss_p = np.full(len(yhat), np.nan)
        cheby_p = np.full(len(yhat), np.nan)
        for i,s in enumerate(yhat):
            if s <= 0:
                gp = 0.0
                cb = 0.0
            else:
                z = self.thresh / s
                tail = 0.5 * math.erfc(z/math.sqrt(2.0))
                gp = 2.0 * tail
                r = s / self.thresh
                cb = float(min(1.0, r*r))
            gauss_p[i] = gp
            cheby_p[i] = cb
            pred_flag[i] = int(gp >= self.gauss_thresh)

        # TODO figure out what to do with these variables later
        self.gauss_p = gauss_p
        self.cheby_p = cheby_p

        return pred_flag

    def update_flags_rexd(self, yhat):
        pred_flag = (yhat > self.thresh).astype(int)
        return pred_flag

    def update_flags_fder(self, yhat):
        pred_flag = (yhat > self.thresh).astype(int)
        return pred_flag

    def apply_input_scaling(self):
        pass

    def apply_output_scaling(self):
        pass

    def run(self):
        return
    