<a href="https://colab.research.google.com/github/alan-w25/ese4380-project/blob/main/sims.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import numpy as np
from google.colab import files


Process Simulation Functions

In [None]:
def simulate_cir_paths(kappa, theta, sigma, v0, T, n_steps, n_paths, seed=None):
    """
    Exact CIR simulation via noncentral chi-square transition.
    Returns V with shape (n_paths, n_steps+1), including t=0.
    """
    rng = np.random.default_rng(seed)
    dt = T / n_steps

    # deterministic case (sigma=0)
    if sigma == 0.0:
        times = np.linspace(0, T, n_steps + 1)
        V_det = theta + (v0 - theta) * np.exp(-kappa * times)
        return np.tile(V_det, (n_paths, 1))

    exp_kdt = np.exp(-kappa * dt)
    one_minus_exp = -np.expm1(-kappa * dt)          # 1 - e^{-kappa dt}, stable
    nu = 4.0 * kappa * theta / (sigma**2)           # dof
    c  = (sigma**2 * one_minus_exp) / (4.0 * kappa) # scale

    V = np.empty((n_paths, n_steps + 1), dtype=np.float64)
    V[:, 0] = v0

    for i in range(n_steps):
        v_t = V[:, i]
        lam = (4.0 * kappa * exp_kdt / (sigma**2 * one_minus_exp)) * v_t
        X = rng.noncentral_chisquare(df=nu, nonc=lam, size=n_paths)
        V[:, i + 1] = c * X

    return V


In [None]:
def simulate_double_well_paths(alpha, beta, sigma, x0, T, n_steps, n_paths, seed=None):
    """
    Simulate the double-well SDE:
        dX_t = (alpha * X_t - beta * X_t^3) dt + sigma dW_t
    using Euler–Maruyama.

    Parameters
    ----------
    alpha, beta, sigma : floats
        Drift and diffusion parameters.
    x0 : float or array-like
        Initial condition (scalar for all paths, or array of length n_paths).
    T : float
        Total simulation time.
    n_steps : int
        Number of time steps.
    n_paths : int
        Number of independent paths.
    seed : int or None
        RNG seed.

    Returns
    -------
    V : ndarray, shape (n_paths, n_steps+1)
        Simulated paths.
    """
    rng = np.random.default_rng(seed)

    dt = T / n_steps
    sqrt_dt = np.sqrt(dt)

    # allocate
    V = np.empty((n_paths, n_steps + 1), dtype=np.float64)

    # set initial condition
    if np.isscalar(x0):
        V[:, 0] = x0
    else:
        x0_arr = np.asarray(x0, dtype=float)
        assert x0_arr.shape[0] == n_paths, "x0 must have length n_paths"
        V[:, 0] = x0_arr

    # simulation loop
    for t in range(n_steps):
        X = V[:, t]
        dW = rng.normal(0.0, sqrt_dt, size=n_paths)
        drift = alpha * X - beta * X**3
        V[:, t+1] = X + drift * dt + sigma * dW

    return V



In [None]:
def eu_simulator(f_t, g_t, W_t, x0, T, dt):
    """
    Generic Euler–Maruyama SDE simulator.

    dX_t = f_t(X_t) dt + g_t(X_t) dW_t

    Parameters
    ----------
    f_t : callable
        Drift function f(x).
    g_t : callable
        Diffusion function g(x).
    W_t : array-like, shape (N,)
        Wiener increments ~ N(0, dt).
    x0 : float
        Initial condition X_0.
    T : float
        Total time.
    dt : float
        Time step size.

    Returns
    -------
    X : ndarray, shape (N+1,)
        Simulated path.
    """
    N = int(T / dt)
    X = np.zeros(N + 1, dtype=float)
    X[0] = x0

    for i in range(1, N + 1):
        dW = W_t[i - 1]
        X[i] = X[i - 1] + f_t(X[i - 1]) * dt + g_t(X[i - 1]) * dW

    return X


def make_ou_drift(mu, theta):
    """Drift for OU: f(x) = theta * (mu - x)."""
    return lambda x: theta * (mu - x)


def make_ou_diffusion(sigma):
    """Diffusion for OU: g(x) = sigma (state-independent)."""
    return lambda x: sigma


def simulate_ou_paths(mu, theta, sigma, x0, T, n_steps, n_paths, seed=None):
    """
    Simulate OU process:
        dX_t = theta * (mu - X_t) dt + sigma dW_t
    using Euler–Maruyama.

    Parameters
    ----------
    mu, theta, sigma : floats
        OU parameters.
    x0 : float
        Initial value (same for all paths).
    T : float
        Total time horizon.
    n_steps : int
        Number of time steps.
    n_paths : int
        Number of independent paths.
    seed : int or None
        Random seed.

    Returns
    -------
    X_panel : ndarray, shape (n_paths, n_steps+1)
        Simulated OU paths.
    """
    dt = T / n_steps
    N = n_steps

    rng = np.random.default_rng(seed)
    X_panel = np.empty((n_paths, N + 1), dtype=float)

    f_t = make_ou_drift(mu, theta)
    g_t = make_ou_diffusion(sigma)

    for i in range(n_paths):
        W = rng.normal(0.0, np.sqrt(dt), size=N)
        X_panel[i] = eu_simulator(f_t, g_t, W, x0, T, dt)

    return X_panel

Moment Computation

In [None]:
import numpy as np

def power_mean_moments(V, K=10):
    """
    For each time t (columns of V), compute the length-K vector:

        m_1(t) = (1/N) * sum_i V[i, t]
        m_k(t) = sign(m_k_raw) * |m_k_raw|^(1/k),  k >= 2

    where m_k_raw = (1/N) * sum_i V[i, t]^k.

    This agrees with the original power-mean definition when m_k_raw >= 0,
    and extends it in a sign-preserving way when m_k_raw < 0 (important for
    OU / double-well, where the state can be negative).

    Parameters
    ----------
    V : ndarray of shape (n_paths, T_plus)
        Simulated paths (each column is a time step t).
    K : int
        Highest order.

    Returns
    -------
    M : ndarray of shape (T_plus, K)
        Row t contains [m_1(t), ..., m_K(t)].
    """
    n_paths, T_plus = V.shape
    M = np.empty((T_plus, K), dtype=np.float64)

    pow_k = V.copy()  # V^k
    for k in range(1, K + 1):
        mean_pow = pow_k.mean(axis=0)  # shape (T_plus,)

        if k == 1:
            # m_1(t) = E[X]
            M[:, k - 1] = mean_pow
        else:
            # sign-preserving root of the raw moment
            sign = np.sign(mean_pow)
            M[:, k - 1] = sign * np.abs(mean_pow)**(1.0 / k)

        pow_k *= V  # update to V^(k+1)

    return M


In [None]:
def run_many_sims_no_chunk_then_moments(
    sim_func,
    n_steps,
    paths_per_run,
    n_runs,
    K=10,
    base_seed=123,
    save_path=None,
    download=False,
):
    """
    Generic runner:
      - sim_func(n_steps, n_paths, seed) -> V (n_paths, n_steps+1)
      - repeats n_runs times,
      - stacks all paths,
      - computes power-mean moments up to order K.

    Parameters
    ----------
    sim_func : callable
        Function with signature sim_func(n_steps, n_paths, seed) -> V.
    n_steps : int
    paths_per_run : int
    n_runs : int
    K : int
        Highest moment order.
    base_seed : int or None
        Base for seeding each run (for decorrelation).
    save_path : str or None
        If not None, save M to this CSV.
    download : bool
        If True and save_path is not None, trigger Colab download.

    Returns
    -------
    M : (T_plus, K) array
    V_all : (n_runs * paths_per_run, T_plus) array
    """
    all_V = []
    for r in range(n_runs):
        seed = None if base_seed is None else (base_seed + 9973 * r)
        V = sim_func(n_steps=n_steps, n_paths=paths_per_run, seed=seed)
        all_V.append(V)

    V_all = np.vstack(all_V)
    M = power_mean_moments(V_all, K=K)

    if save_path is not None:
        np.savetxt(save_path, M, delimiter=",", fmt="%.8f")
        if download:
            files.download(save_path)

    return M, V_all



Running Simulations and Generating Datasets

In [None]:

#Wrappers
def make_cir_sim(kappa, theta, sigma, v0, T):
    """Return sim_func(n_steps, n_paths, seed) for CIR."""
    def cir_sim(n_steps, n_paths, seed=None):
        return simulate_cir_paths(kappa, theta, sigma, v0, T, n_steps, n_paths, seed)
    return cir_sim


def make_ou_sim(mu, theta, sigma, x0, T):
    """Return sim_func(n_steps, n_paths, seed) for OU."""
    def ou_sim(n_steps, n_paths, seed=None):
        return simulate_ou_paths(mu, theta, sigma, x0, T, n_steps, n_paths, seed)
    return ou_sim


def make_double_well_sim(alpha, beta, sigma, x0, T):
    """Return sim_func(n_steps, n_paths, seed) for double-well."""
    def dw_sim(n_steps, n_paths, seed=None):
        return simulate_double_well_paths(alpha, beta, sigma, x0, T, n_steps, n_paths, seed)
    return dw_sim




# CIR
kappa_cir = 2.0
theta_cir = 0.04
sigma_cir = 0.3
v0_cir    = 0.04
T_cir     = 1.0

# OU
mu_ou    = 0.0
theta_ou = 1.0
sigma_ou = 0.3
x0_ou    = 1.0
T_ou     = 10.0

# Double-well
alpha_dw = 1.0
beta_dw  = 1.0
sigma_dw = 0.25
x0_dw    = 0.0
T_dw     = 50.0


# Build sim funcs

cir_sim = make_cir_sim(kappa_cir, theta_cir, sigma_cir, v0_cir, T_cir)
ou_sim  = make_ou_sim(mu_ou, theta_ou, sigma_ou, x0_ou, T_ou)
dw_sim  = make_double_well_sim(alpha_dw, beta_dw, sigma_dw, x0_dw, T_dw)


#  Run sims + moments and save CSVs

n_steps_cir = 5000
n_steps_ou  = 5000
n_steps_dw  = 5000

paths_per_run = 20000
n_runs        = 1
K             = 10

# CIR
#M_cir, V_cir = run_many_sims_no_chunk_then_moments(
#    sim_func=cir_sim,
#    n_steps=n_steps_cir,
#    paths_per_run=paths_per_run,
#   n_runs=n_runs,
 #   K=K,
 #   base_seed=123,
 #   save_path="cir_moments.csv",
#    download=True,
#)
#print("CIR moments shape:", M_cir.shape)

# OU
M_ou, V_ou = run_many_sims_no_chunk_then_moments(
    sim_func=ou_sim,
    n_steps=n_steps_ou,
    paths_per_run=paths_per_run,
    n_runs=n_runs,
    K=K,
    base_seed=123,
    save_path="ou_moments.csv",
    download=True,
)
print("OU moments shape:", M_ou.shape)

# Double-well
M_dw, V_dw = run_many_sims_no_chunk_then_moments(
    sim_func=dw_sim,
    n_steps=n_steps_dw,
    paths_per_run=paths_per_run,
    n_runs=n_runs,
    K=K,
    base_seed=123,
    save_path="double_well_moments.csv",
    download=True,
)
print("Double-well moments shape:", M_dw.shape)


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

OU moments shape: (5001, 10)


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

Double-well moments shape: (5001, 10)
