# Market Shock Alarm

We will here develop an end-to-end example of the uses of the code developped so-far (entropy, KL divergence, alphabet dataclass, etc.). The idea is to create a market shock and regime shift alarm. For this example we will focus on european electricity prices. The following workflow will be implemented:
1. Returns (prices) -> quantile bins -> discrete symbols
2. Rolling AlphabetDistribution
3. Shock alarm = surprisal of today's symbol under the last N days distribution
4. Regime shift = divergence between the last 30d vs last 365d distributions
5. Some figures

## Theory

### Why discretize prices into an alphabet?
Continuous price returns are hard to compare in a distributional way day-to-day. If we bin returns into categories (e.g., very negative, slightly negative, positive, etc.), then each day becomes a symbol from a finitie alphabet. That turns the market into a message stream. 

### Rolling distribution = market model
On each day $t$, we estimate the recent market behaviour by building an empirical distribution over symbols from the last $N$ observations. This distribution answers: "How likely is each kind of move in the current regime?"

### Metric 1 - Shock alarm (surprisal)
If today's return falls into a bin that had low probability over the last $N$ days, that day is surprising. Surprisal is
* high when the event was rare in the recent window
* low when the event was common recently

So we get an "alarm score" that spikes on unusual moves relative to the current regime, not relative to a fixed threshold. 

### Metric 2 - Regime shift (distribution divergence)
Even if today isn't a shock, the market can still drift into a different state. To detect that, we will compare
* short-term distribution (last 30 days) vs
* long-term distribution (last 365 days)

If they differ a lot, our short-term return behaviour no longer matches the long-run baseline: A regime shift. A robust choice to compute this metric is the Jensen-Shannon divergence, which is a symmetric, bounded cousin of KL.

## Code

**Imports**

In [None]:
from pathlib import Path

import numpy as np 
import pandas as pd
import matplotlib.pyplot as plt

from entropy_lab.coding.alphabet import AlphabetDistribution

**Load all CSVs from ```./data/european_wholesale_electricity_prices_data_hourly```into one ```df_all```**

In [None]:
data_dir = Path("/Users/fdolci/projects/entropy_lab/data/european_wholesale_electricity_price_data_hourly")
paths = sorted(data_dir.glob("*.csv"))

frames = []
for p in paths:
    d = pd.read_csv(p)

    d = d.rename(columns={
        "ISO3 Code": "iso3",
        "Country": "country",
        "Datetime (UTC)": "dt_utc",
        "Datetime (Local)": "dt_local",
        "Price (EUR/MWhe)": "price",
    })

    required = {"iso3", "dt_utc", "price"}
    missing = required - set(d.columns)
    if missing:
        raise ValueError(f"{p.name}: missing required columns {missing}")
    
    d["dt_utc"] = pd.to_datetime(d["dt_utc"], errors="coerce")
    d["price"] = pd.to_numeric(d["price"], errors="coerce")

    keep_cols = [c for c in ["iso3", "country", "dt_utc", "price"] if c in d.columns]
    d = d[keep_cols].dropna(subset=["iso3", "dt_utc", "price"])

    frames.append(d)

df_all = pd.concat(frames, ignore_index=True)

df_all = df_all.sort_values(["iso3", "dt_utc"])
df_all = df_all.drop_duplicates(subset=["iso3", "dt_utc"], keep="last")

df_all.head(), df_all["iso3"].nunique(), len(df_all) 

**Pick countries with potentially high contrast dynamics**

In [None]:
iso3_list = ["CHE", "DEU", "NOR", "ESP"] 

df_all_sel = df_all[df_all["iso3"].isin(iso3_list)].copy()
df_all_sel["country"].value_counts(dropna=False).head(20)

**Parameter choices**

In [None]:
returns_method = "change"      # "change" often reads better for electricity prices than log returns
quantile_q = 21                # 20 bins
alpha = 1e-3                   # smoothing for probabilities
base = 2.0                     # bits

shock_window_hours = 24 * 60   # 60 days
short_window_hours = 24 * 30   # 30 days
long_window_hours  = 24 * 365  # 365 days

max_window = max(shock_window_hours, long_window_hours)

**Preparation output container**

In [None]:
all_metrics = []

**Loop per country (bins calibrated per-country; metrics computed per-country)**

In [None]:
for iso3 in iso3_list:
    df_c = df_all_sel[df_all_sel["iso3"] == iso3].copy()
    if df_c.empty:
        print(f"Skipping {iso3}: no data")
        continue

    df_c = df_c.sort_values("dt_utc").reset_index(drop=True)

    # Extract series
    dt = pd.to_datetime(df_c["dt_utc"])
    price = df_c["price"].astype(float).values

    # Returns
    if returns_method == "log":
        ret = np.log(price)
        ret = np.concatenate([[np.nan], np.diff(ret)])
    elif returns_method == "simple":
        ret = np.concatenate([[np.nan], np.diff(price) / price[:-1]])
    elif returns_method == "change":
        ret = np.concatenate([[np.nan], np.diff(price)])
    else:
        raise ValueError("returns_method must be one of: 'log', 'simple', 'change'")

    # --- fixed quantile bins for this country ---
    ret_clean = ret[~np.isnan(ret)]
    qs = np.linspace(0.0, 1.0, quantile_q)
    edges = np.quantile(ret_clean, qs)
    edges = np.unique(edges)

    if len(edges) < 3:
        raise ValueError(f"{iso3}: not enough unique quantile edges; reduce bins or inspect data.")

    # Labels for each bin interval
    labels = []
    for i in range(len(edges) - 1):
        a, b = edges[i], edges[i + 1]
        labels.append(f"[{a:.4g}, {b:.4g})")
    labels = tuple(labels)

    # Discretize returns to symbols
    sym = pd.cut(ret, bins=edges, labels=list(labels), include_lowest=True, right=False).astype("object")

    # Build a working dataframe
    out = pd.DataFrame({
        "dt_utc": dt,
        "price": price,
        "ret": ret,
        "symbol": sym,
        "iso3": iso3,
        "country": df_c["country"].iloc[0] if "country" in df_c.columns and len(df_c) else iso3
    })

    # Prepare arrays for metrics
    shock = np.full(len(out), np.nan, dtype=float)
    regime = np.full(len(out), np.nan, dtype=float)

    # For faster access
    symbol_series = out["symbol"]

    # Precompute symbol->index once (alphabet is fixed)
    all_symbols = labels
    sym_to_idx = {s: i for i, s in enumerate(all_symbols)}

    # --- Rolling computation ---
    for t in range(max_window, len(out)):
        s_today = out["symbol"].iloc[t]
        if pd.isna(s_today):
            continue
        s_today = str(s_today)

        # -------- Shock Alarm: surprisal under last N hours distribution --------
        w = symbol_series.iloc[t - shock_window_hours : t]
        counts = w.value_counts(dropna=True).to_dict()
        c = np.array([float(counts.get(s, 0.0)) for s in all_symbols], dtype=np.float64)
        c = c + float(alpha)
        p_hat = AlphabetDistribution.from_probs(all_symbols, c, normalize=True)

        j = sym_to_idx.get(s_today)
        if j is None:
            continue
        prob = float(p_hat.p[j])
        shock[t] = float(-np.log(prob) / np.log(base)) if prob > 0 else float("inf")

        # -------- Regime Shift: JSD(short vs long) --------
        w_short = symbol_series.iloc[t - short_window_hours : t]
        w_long = symbol_series.iloc[t - long_window_hours : t]

        counts_s = w_short.value_counts(dropna=True).to_dict()
        counts_l = w_long.value_counts(dropna=True).to_dict()

        c_s = np.array([float(counts_s.get(s, 0.0)) for s in all_symbols], dtype=np.float64) + float(alpha)
        c_l = np.array([float(counts_l.get(s, 0.0)) for s in all_symbols], dtype=np.float64) + float(alpha)

        p_short = AlphabetDistribution.from_probs(all_symbols, c_s, normalize=True)
        p_long  = AlphabetDistribution.from_probs(all_symbols, c_l, normalize=True)

        # Jensen–Shannon divergence
        m_probs = 0.5 * (p_short.p + p_long.p)
        m = AlphabetDistribution.from_probs(all_symbols, m_probs, normalize=True)

        jsd = 0.5 * p_short.kl_divergence(m, base=base) + 0.5 * p_long.kl_divergence(m, base=base)
        regime[t] = float(jsd)

    out["shock"] = shock
    out["regime"] = regime

    all_metrics.append(out)

df_metrics = pd.concat(all_metrics, ignore_index=True)
df_metrics.head()

**Plot 3-panel figure per country**

In [None]:
for iso3 in iso3_list:
    d = df_metrics[df_metrics["iso3"] == iso3].copy()
    if d.empty:
        continue
    d = d.sort_values("dt_utc")
    x = pd.to_datetime(d["dt_utc"])

    country_name = d["country"].iloc[0] if "country" in d.columns and len(d) else iso3
    title = f"{country_name} ({iso3}) — Price + Shock + Regime Shift"

    fig, axes = plt.subplots(3, 1, figsize=(12, 8), sharex=True)

    axes[0].plot(x, d["price"].values)
    axes[0].set_ylabel("Price")
    axes[0].set_title(title)

    axes[1].plot(x, d["shock"].values)
    axes[1].set_ylabel("Shock\n(surprisal)")

    axes[2].plot(x, d["regime"].values)
    axes[2].set_ylabel("Regime\n(JSD)")
    axes[2].set_xlabel("Date")

    plt.tight_layout()
    plt.show()

In [None]:
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
import matplotlib.ticker as ticker
import numpy as np
import pandas as pd

# ── Style ──────────────────────────────────────────────────────────────────────
plt.rcParams.update({
    "figure.facecolor": "#0f1117",
    "axes.facecolor":   "#0f1117",
    "axes.edgecolor":   "#3a3a4a",
    "axes.labelcolor":  "#ccccdd",
    "xtick.color":      "#ccccdd",
    "ytick.color":      "#ccccdd",
    "text.color":       "#ccccdd",
    "grid.color":       "#2a2a3a",
    "grid.linewidth":   0.5,
    "font.family":      "monospace",
    "legend.framealpha": 0.25,
    "legend.edgecolor": "#3a3a4a",
})

PALETTE = [
    "#4fc3f7", "#81c784", "#ffb74d",
    "#f06292", "#ba68c8", "#4dd0e1",
    "#aed581", "#ff8a65", "#90caf9", "#e57373",
]

# ── Thresholds ─────────────────────────────────────────────────────────────────
SHOCK_SPIKE_PERCENTILE  = 99.5  # stricter — only the sharpest events
REGIME_SHADE_PERCENTILE = 70

# ── Figure layout ──────────────────────────────────────────────────────────────
fig, axes = plt.subplots(
    3, 1, figsize=(14, 9), sharex=True,
    gridspec_kw={"height_ratios": [2, 1.4, 1.4], "hspace": 0.08}
)
ax_price, ax_shock, ax_regime = axes

for ax in axes:
    ax.grid(True, axis="both", linestyle="--", alpha=0.35)
    ax.yaxis.set_minor_locator(ticker.AutoMinorLocator())
    ax.spines[["top", "right"]].set_visible(False)

# ── Global reference ───────────────────────────────────────────────────────────
global_shock_median = df_metrics["shock"].median()

# ── Per-country plots ──────────────────────────────────────────────────────────
for i, iso3 in enumerate(iso3_list):
    d = df_metrics[df_metrics["iso3"] == iso3].copy()
    if d.empty:
        continue
    d = d.sort_values("dt_utc")
    x     = pd.to_datetime(d["dt_utc"])
    color = PALETTE[i % len(PALETTE)]
    label = d["country"].iloc[0] if "country" in d.columns else iso3

    # ── Panel 1 : z-score price ────────────────────────────────────────────────
    price   = d["price"].values.astype(float)
    price_z = (price - np.nanmean(price)) / (np.nanstd(price) + 1e-9)
    ax_price.plot(x, price_z, color=color, lw=1.4, label=label, alpha=0.85)

    # ── Panel 2 : Shock — faint context line + sparse event stems ─────────────
    shock = d["shock"].values.astype(float)

    # Very faint background line just to show the signal shape
    ax_shock.plot(x, shock, color=color, lw=0.8, alpha=0.18)

    # Vertical stems only at p99.5+ — a handful of truly extreme events
    spike_thresh = np.nanpercentile(shock, SHOCK_SPIKE_PERCENTILE)
    spike_mask   = shock >= spike_thresh
    spike_x      = x.values[spike_mask]
    spike_y      = shock[spike_mask]

    if len(spike_x) > 0:
        # Draw stems: vertical line from baseline to value, dot on top
        ax_shock.vlines(
            spike_x, global_shock_median, spike_y,
            color=color, lw=1.0, alpha=0.7, zorder=4,
        )
        ax_shock.scatter(
            spike_x, spike_y,
            color=color, s=30, zorder=5,
            edgecolors="white", linewidths=0.4,
            label=f"{label}",
        )

    # ── Panel 3 : Regime (JSD) + shaded elevated periods ──────────────────────
    regime       = d["regime"].values.astype(float)
    shade_thresh = np.nanpercentile(regime, REGIME_SHADE_PERCENTILE)
    ax_regime.plot(x, regime, color=color, lw=1.3, alpha=0.85, label=label)
    ax_regime.fill_between(
        x, shade_thresh, regime,
        where=(regime >= shade_thresh),
        color=color, alpha=0.15, interpolate=True,
    )

# ── Global reference lines ─────────────────────────────────────────────────────
ax_price.axhline(0, color="white", lw=0.6, ls="--", alpha=0.3, label="z = 0")

ax_shock.axhline(
    global_shock_median,
    color="white", lw=0.9, ls="--", alpha=0.5, label="global median",
)

# ── Annotate dominant regime peak ──────────────────────────────────────────────
peak_row = df_metrics.loc[df_metrics["regime"].idxmax()]
peak_x   = pd.to_datetime(peak_row["dt_utc"])
peak_y   = peak_row["regime"]
peak_lbl = peak_row.get("country", peak_row["iso3"])
ax_regime.annotate(
    f"{peak_lbl}  {peak_x.strftime('%b %Y')}",
    xy=(peak_x, peak_y),
    xytext=(18, 6), textcoords="offset points",
    fontsize=7.5, color="white", alpha=0.85,
    arrowprops=dict(arrowstyle="-", color="white", lw=0.6, alpha=0.45),
)

# ── Labels & legends ───────────────────────────────────────────────────────────
ax_price.set_ylabel("Price\n(z-score)", labelpad=8)
ax_price.legend(loc="upper left", fontsize=8, ncol=2)

ax_shock.set_ylabel("Shock\n(surprisal)", labelpad=8)
ax_shock.legend(
    loc="upper right", fontsize=7.5, ncol=2,
    title=f"p{SHOCK_SPIKE_PERCENTILE}+ spikes", title_fontsize=7,
)

ax_regime.set_ylabel("Regime\n(JSD)", labelpad=8)
ax_regime.set_xlabel("Date", labelpad=8)
ax_regime.legend(loc="upper left", fontsize=8, ncol=2)

# ── X-axis ─────────────────────────────────────────────────────────────────────
ax_regime.xaxis.set_major_formatter(mdates.DateFormatter("%b %Y"))
ax_regime.xaxis.set_major_locator(mdates.AutoDateLocator())
fig.autofmt_xdate(rotation=30, ha="right")

# ── Title ──────────────────────────────────────────────────────────────────────
fig.suptitle(
    "Price · Shock Spikes · Regime Shifts — All Countries",
    fontsize=13, fontweight="bold", color="#e0e0f0", y=0.998,
)

plt.tight_layout()
plt.savefig("regime_shock_overview.png", dpi=180, bbox_inches="tight",
            facecolor=fig.get_facecolor())
plt.show()