In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

plt.rcParams["figure.figsize"] = (8, 5)
plt.rcParams["axes.grid"] = True

# Load data
political_events = pd.read_csv("data/political_events.csv")
sp500 = pd.read_csv("data/sp500.csv")

# Clean up sp500 (same as in Stochastic.ipynb)
if "Unnamed: 0" in sp500.columns:
    sp500 = sp500.drop(columns="Unnamed: 0")

sp500 = sp500.copy()
political_events = political_events.copy()

sp500["Date"] = pd.to_datetime(sp500["Date"])
sp500 = sp500.sort_values("Date").reset_index(drop=True)

political_events["date"] = pd.to_datetime(political_events["date"])
political_events = political_events.sort_values("date").reset_index(drop=True)

print(sp500.head())
print(political_events.head())


In [None]:
# Helpers

def add_event_windows(events: pd.DataFrame, pre_days=3, post_days=3):
    """
    Attach calendar windows [date - pre_days, date + post_days] to each event.
    """
    ev = events.copy()
    ev["win_start"] = ev["date"] - pd.Timedelta(days=pre_days)
    ev["win_end"]   = ev["date"] + pd.Timedelta(days=post_days)
    return ev


def mark_event_days(sp: pd.DataFrame, ev_windows: pd.DataFrame):
    """
    Mark SP500 trading days that fall in *any* event window.
    """
    sp = sp.copy()
    sp["is_event_window"] = False
    for _, row in ev_windows.iterrows():
        mask = (sp["Date"] >= row["win_start"]) & (sp["Date"] <= row["win_end"])
        sp.loc[mask, "is_event_window"] = True
    return sp


def realized_vol(returns: pd.Series, window: int):
    """
    Rolling standard deviation of daily returns (simple realized volatility).
    """
    return returns.rolling(window).std()


# Compute log returns and realized volatility

sp500["log_close"] = np.log(sp500["Close"])
sp500["r"] = sp500["log_close"].diff()

# Remove first NaN return
sp500 = sp500.dropna(subset=["r"]).reset_index(drop=True)

# Realized volatility with m = 5 (matches RV_5 in the report)
sp500["rv_5"] = realized_vol(sp500["r"], 5)

ev_windows = add_event_windows(political_events, pre_days=3, post_days=3)
sp500 = mark_event_days(sp500, ev_windows)

print("Trading days:", len(sp500))
print("Event-window trading days:", sp500["is_event_window"].sum())
print("Non-event trading days:", (~sp500["is_event_window"]).sum())


In [None]:
# Use non-event days to fit the null model
non_event = sp500.loc[~sp500["is_event_window"]].copy()

# Time-varying volatility sigma_t via rolling std of returns
sigma_window = 20
sp500["sigma_t"] = sp500["r"].rolling(sigma_window).std()

# Fill early/missing sigma_t with expanding std, then global std
sp500["sigma_t"] = sp500["sigma_t"].fillna(
    sp500["r"].expanding(min_periods=5).std()
)
sp500["sigma_t"] = sp500["sigma_t"].fillna(sp500["r"].std())

# Estimate drift mu from non-event days
mu_hat = non_event["r"].mean()

# Merge sigma_t onto non-event days and compute standardized residuals eps
non_event = non_event.merge(
    sp500[["Date", "sigma_t"]],
    on="Date",
    how="left",
    suffixes=("", "_sp")
)

non_event["eps"] = (non_event["r"] - mu_hat) / non_event["sigma_t"]

# Pool of empirical shocks under the null
eps_pool = non_event["eps"].replace([np.inf, -np.inf], np.nan).dropna().values

print("mu_hat:", mu_hat)
print("eps_pool size:", eps_pool.size)


In [None]:
def simulate_null_paths(n_paths: int,
                        dates: pd.Series,
                        sigma_t: np.ndarray,
                        mu: float,
                        eps_pool: np.ndarray,
                        seed: int = 0):
    """
    Simulate return paths under the null:
        r_t = mu + sigma_t * eps_t,
    where eps_t are sampled i.i.d. from the empirical eps_pool.
    """
    rng = np.random.default_rng(seed)
    T = len(dates)
    eps = rng.choice(eps_pool, size=(n_paths, T), replace=True)
    r_sim = mu + eps * sigma_t[None, :]
    return r_sim


def rolling_std_matrix(r_mat: np.ndarray, window: int):
    """
    Rolling std along axis 1 (time) for a matrix of paths (N_paths x T).
    Returns an (N_paths x T) array of realized volatility.
    """
    N, T = r_mat.shape
    out = np.full((N, T), np.nan)
    for t in range(window - 1, T):
        seg = r_mat[:, t - window + 1:t + 1]
        out[:, t] = np.std(seg, axis=1, ddof=1)
    return out


In [None]:
def mc_pvalue_for_event(event_date: pd.Timestamp,
                        sp500_df: pd.DataFrame,
                        mu: float,
                        sigma_t: np.ndarray,
                        eps_pool: np.ndarray,
                        B: int = 2000,
                        vol_window: int = 5,
                        pre_days: int = 3,
                        post_days: int = 3,
                        seed: int = 123):
    """
    Monte Carlo p-value for the event-window volatility statistic T(d)
    around a single event_date.

    Returns: (T_obs, sim_stats, p_right)
    """
    rng = np.random.default_rng(seed)

    # 1. Observed T(d): mean RV_5 in [d - pre_days, d + post_days]
    win_start = event_date - pd.Timedelta(days=pre_days)
    win_end   = event_date + pd.Timedelta(days=post_days)
    window_df = sp500_df[(sp500_df["Date"] >= win_start) &
                         (sp500_df["Date"] <= win_end)]

    T_obs = window_df["rv_5"].mean()

    # 2. Simulate B null paths
    r_sim = simulate_null_paths(
        n_paths=B,
        dates=sp500_df["Date"],
        sigma_t=sigma_t,
        mu=mu,
        eps_pool=eps_pool,
        seed=seed
    )

    # 3. Compute realized volatility (RV_5) for each simulated path
    rv_sim = rolling_std_matrix(r_sim, window=vol_window)  # shape (B, T)

    # 4. Indices of the event window in trading-day time
    mask = (sp500_df["Date"] >= win_start) & (sp500_df["Date"] <= win_end)
    idxs = sp500_df.index[mask].values

    # 5. T*_b(d): mean RV_5 in that window for each path
    block = rv_sim[:, idxs]     # (B, |W(d)|)
    T_sim = np.nanmean(block, axis=1)

    # 6. Right-tailed Monte Carlo p-value with small-sample correction
    p_right = (1 + np.sum(T_sim >= T_obs)) / (B + 1)

    return T_obs, T_sim, p_right


# Quick sanity check on a real event, e.g., first presidential election in table
example_event = political_events.iloc[0]["date"]
T_obs_ex, T_sim_ex, p_ex = mc_pvalue_for_event(
    event_date=example_event,
    sp500_df=sp500,
    mu=mu_hat,
    sigma_t=sp500["sigma_t"].values,
    eps_pool=eps_pool,
    B=2000,
    vol_window=5,
    pre_days=3,
    post_days=3,
    seed=123
)

print("Example event:", example_event.date())
print("Observed T(d):", T_obs_ex)
print("Monte Carlo p_right:", p_ex)

plt.hist(T_sim_ex, bins=40, density=True, alpha=0.7)
plt.axvline(T_obs_ex, color="red", linewidth=2, label="Observed T(d)")
plt.title(f"Null distribution of T(d) around {example_event.date()}")
plt.xlabel("Simulated T*(d)")
plt.ylabel("Density")
plt.legend()
plt.show()


In [None]:
def estimate_type1_error(sp500_df: pd.DataFrame,
                         mu: float,
                         sigma_t: np.ndarray,
                         eps_pool: np.ndarray,
                         alpha: float = 0.05,
                         num_experiments: int = 200,
                         pre_days: int = 3,
                         post_days: int = 3,
                         B: int = 1000,
                         seed: int = 999):
    """
    Monte Carlo study of Type I error: treat randomly chosen non-event dates
    as 'fake events' and see how often the Monte Carlo test rejects at level alpha.
    """
    rng = np.random.default_rng(seed)

    # Build event windows so we can exclude them from "null" dates
    ev_win = add_event_windows(political_events, pre_days=pre_days, post_days=post_days)
    sp_marked = mark_event_days(sp500_df, ev_win)

    # Candidate dates that are not in any event window
    candidate_dates = sp_marked.loc[~sp_marked["is_event_window"], "Date"].values

    rejections = 0
    pvals = []

    for _ in range(num_experiments):
        fake_date = rng.choice(candidate_dates)
        T_obs, T_sim, p_right = mc_pvalue_for_event(
            event_date=fake_date,
            sp500_df=sp_marked,
            mu=mu,
            sigma_t=sigma_t,
            eps_pool=eps_pool,
            B=B,
            vol_window=5,
            pre_days=pre_days,
            post_days=post_days,
            seed=rng.integers(0, 10**9)
        )
        pvals.append(p_right)
        if p_right <= alpha:
            rejections += 1

    type1_est = rejections / num_experiments
    return type1_est, np.array(pvals)


type1_est, pvals_null = estimate_type1_error(
    sp500_df=sp500,
    mu=mu_hat,
    sigma_t=sp500["sigma_t"].values,
    eps_pool=eps_pool,
    alpha=0.05,
    num_experiments=200,
    pre_days=3,
    post_days=3,
    B=1000,
    seed=2025
)

print(f"Estimated Type I error (alpha=0.05): {type1_est:.3f}")

plt.hist(pvals_null, bins=20, range=(0, 1), edgecolor="k")
plt.title("Empirical distribution of Monte Carlo p-values under the null")
plt.xlabel("p_MC")
plt.ylabel("Count")
plt.show()


In [None]:
# Choose one real event to study p-value stability, e.g. 2016 election if present
# Here we'll just pick the event with the smallest p-value from Drake's table
# but for now use the earliest presidential election as an example.
target_event = political_events.iloc[0]["date"]

Bs = [200, 500, 1000, 2000, 5000]
p_estimates = []

for B in Bs:
    _, _, pB = mc_pvalue_for_event(
        event_date=target_event,
        sp500_df=sp500,
        mu=mu_hat,
        sigma_t=sp500["sigma_t"].values,
        eps_pool=eps_pool,
        B=B,
        vol_window=5,
        pre_days=3,
        post_days=3,
        seed=123  # fixed seed so differences are from B only
    )
    p_estimates.append(pB)

for B, p in zip(Bs, p_estimates):
    print(f"B = {B:5d}  =>  p_MC = {p:.4f}")

plt.plot(Bs, p_estimates, marker="o")
plt.xscale("log")
plt.xlabel("Number of Monte Carlo simulations B (log scale)")
plt.ylabel("Monte Carlo p-value")
plt.title(f"Stability of p_MC for event {target_event.date()}")
plt.grid(True)
plt.show()
