In [2]:
import numpy as np
import pandas as pd

from typing import Dict, Any

# Global constants
TRADING_DAYS = 252


In [9]:
import yfinance as yf

def download_price_history(symbol: str, start="2010-01-01", end=None):
    df = yf.download(symbol, start=start, end=end)
    df = df.reset_index()  # ensures Date column exists
    df.to_csv(f"{symbol}.csv", index=False)
    print(f"Saved {symbol}.csv")
    return df

# Example use
df = download_price_history("AAPL", start="2015-01-01")
df.head()

  df = yf.download(symbol, start=start, end=end)
[*********************100%***********************]  1 of 1 completed

Saved AAPL.csv





Price,Date,Close,High,Low,Open,Volume
Ticker,Unnamed: 1_level_1,AAPL,AAPL,AAPL,AAPL,AAPL
0,2015-01-02,24.237556,24.705326,23.798606,24.694241,212818400
1,2015-01-05,23.554741,24.086801,23.368521,24.006992,257142000
2,2015-01-06,23.556961,23.81634,23.195602,23.619034,263188400
3,2015-01-07,23.887276,23.987036,23.654499,23.765345,160423600
4,2015-01-08,24.80508,24.862721,24.097883,24.215381,237458000


In [3]:
def load_and_process_data(
    path: str,
    train_ratio: float = 0.8,
    price_col: str = "Close",
    date_col: str = "Date"
) -> Dict[str, Any]:
    """
    Load historical price data and prepare:
      - full price series
      - train/test split
      - log-returns for training
      - metadata for later use

    Assumes a CSV with at least:
      - a date column (default: 'Date')
      - a price column (default: 'Close')
    """
    df = pd.read_csv(path)

    if date_col in df.columns:
        df[date_col] = pd.to_datetime(df[date_col])
        df = df.sort_values(date_col)
        df = df.reset_index(drop=True)

    if price_col not in df.columns:
        raise ValueError(f"Price column '{price_col}' not found in CSV.")

    prices = df[price_col].astype(float).values

    n = len(prices)
    if n < 3:
        raise ValueError("Not enough data points to split and compute returns.")

    split_idx = int(n * train_ratio)
    split_idx = max(2, min(split_idx, n - 1))  # keep at least 2 points in train & 1 in test

    train_prices = prices[:split_idx]
    test_prices = prices[split_idx:]

    # Log-returns for training (daily)
    train_returns = np.diff(np.log(train_prices))

    return {
        "df": df,
        "prices": prices,
        "train_prices": train_prices,
        "test_prices": test_prices,
        "train_returns": train_returns,
        "S0": prices[-1],       # most recent spot
        "train_ratio": train_ratio,
        "price_col": price_col,
        "date_col": date_col,
    }


In [4]:
def realized_vol(returns: np.ndarray) -> float:
    """
    Simple realized (historical) volatility, annualized.
    """
    if len(returns) < 2:
        raise ValueError("Need at least 2 returns to estimate volatility.")
    daily_std = np.std(returns, ddof=1)
    return float(daily_std * np.sqrt(TRADING_DAYS))


def ewma_vol(returns: np.ndarray, lam: float = 0.94) -> float:
    """
    Exponentially weighted moving average (EWMA) volatility, annualized.
    RiskMetrics-style.
    """
    if len(returns) == 0:
        raise ValueError("Need at least 1 return to estimate EWMA volatility.")

    # Start with unconditional variance as simple average of squared returns
    var = np.mean(returns ** 2)
    for r in returns[::-1]:  # newest last
        var = lam * var + (1.0 - lam) * r ** 2

    daily_vol = np.sqrt(var)
    return float(daily_vol * np.sqrt(TRADING_DAYS))


def estimate_volatility_all(returns_data) -> Dict[str, float]:
    """
    Wrapper that returns a dict of different volatility estimates.
    - returns_data: either a dict containing 'train_returns'
                    or a numpy array of returns directly.
    """
    if isinstance(returns_data, dict):
        returns = returns_data["train_returns"]
    else:
        returns = np.asarray(returns_data)

    vols = {}

    vols["realized_vol"] = realized_vol(returns)

    try:
        vols["ewma_vol"] = ewma_vol(returns)
    except Exception:
        vols["ewma_vol"] = np.nan

    # placeholder for future methods (e.g., GARCH, Parkinson, etc.)
    return vols


In [5]:
def simulate_gbm_paths(
    S0: float,
    r: float,
    sigma: float,
    T: float,
    steps: int,
    n_paths: int,
    seed: int | None = None
) -> np.ndarray:
    """
    Simulate GBM price paths under risk-neutral measure.

    Returns:
      array of shape (n_paths, steps + 1) including S0 at t=0.
    """
    if seed is not None:
        np.random.seed(seed)

    dt = T / steps
    # standard normals
    Z = np.random.normal(size=(n_paths, steps))
    # log increments
    increments = (r - 0.5 * sigma ** 2) * dt + sigma * np.sqrt(dt) * Z

    log_paths = np.cumsum(increments, axis=1)
    S_paths = S0 * np.exp(log_paths)
    S_paths = np.concatenate(
        [np.full((n_paths, 1), S0), S_paths],
        axis=1
    )
    return S_paths  # shape: (n_paths, steps+1)


def payoff_european(S_T: np.ndarray, K: float, option_type: str = "call") -> np.ndarray:
    """
    European option payoff at maturity.
    """
    option_type = option_type.lower()
    if option_type == "call":
        return np.maximum(S_T - K, 0.0)
    elif option_type == "put":
        return np.maximum(K - S_T, 0.0)
    else:
        raise ValueError("option_type must be 'call' or 'put'")


def price_option_mc(
    S0: float,
    K: float,
    r: float,
    sigma: float,
    T: float,
    steps: int = 252,
    n_paths: int = 10_000,
    option_type: str = "call",
    seed: int | None = None,
    return_ci: bool = False,
    ci_level: float = 0.95
):
    """
    Monte Carlo price for a European call/put under GBM.
    Optionally returns a confidence interval for the estimate.
    """
    S_paths = simulate_gbm_paths(S0, r, sigma, T, steps, n_paths, seed=seed)
    S_T = S_paths[:, -1]
    payoffs = payoff_european(S_T, K, option_type)

    discount_factor = np.exp(-r * T)
    discounted_payoffs = discount_factor * payoffs

    price_estimate = float(np.mean(discounted_payoffs))

    if not return_ci:
        return price_estimate

    # Normal-approximation confidence interval on the mean
    std_err = np.std(discounted_payoffs, ddof=1) / np.sqrt(len(discounted_payoffs))
    from scipy.stats import norm
    z = norm.ppf(0.5 + ci_level / 2.0)
    ci_low = price_estimate - z * std_err
    ci_high = price_estimate + z * std_err
    return price_estimate, (float(ci_low), float(ci_high))


In [6]:
def log_posterior_sigma(sigma: float, returns: np.ndarray) -> float:
    """
    Log-posterior of sigma given returns under:
      - Prior: p(sigma) ‚àù 1/sigma (improper, scale-invariant)
      - Likelihood: returns ~ N(0, sigma^2 / TRADING_DAYS)

    We work with annualized sigma.
    """
    if sigma <= 0:
        return -np.inf

    daily_var = (sigma ** 2) / TRADING_DAYS
    if daily_var <= 0:
        return -np.inf

    # Gaussian likelihood for zero-mean returns
    ll = -0.5 * np.sum(returns ** 2 / daily_var + np.log(2 * np.pi * daily_var))
    # log prior: -log(sigma)
    return float(ll - np.log(sigma))


def sample_sigma_mcmc(
    returns: np.ndarray,
    n_samples: int = 5000,
    burn_in: int = 1000,
    step_size: float = 0.05,
    init_sigma: float | None = None,
    seed: int | None = None
) -> np.ndarray:
    """
    Simple 1D random-walk Metropolis sampler for sigma (annualized).
    Proposal: log-normal step on sigma.

    Returns:
      array of sigma samples (post burn-in).
    """
    returns = np.asarray(returns)
    if len(returns) == 0:
        raise ValueError("Need non-empty returns to run MCMC.")

    if seed is not None:
        np.random.seed(seed)

    if init_sigma is None:
        # use realized vol as a starting point
        init_sigma = realized_vol(returns)

    n_total = n_samples + burn_in
    sigmas = np.zeros(n_total)
    current = float(init_sigma)
    current_logp = log_posterior_sigma(current, returns)

    for i in range(n_total):
        # log-normal proposal: sigma' = sigma * exp(step * N(0,1))
        proposal = current * np.exp(step_size * np.random.normal())
        proposal_logp = log_posterior_sigma(proposal, returns)

        alpha = np.exp(proposal_logp - current_logp)
        if np.random.rand() < min(1.0, alpha):
            current, current_logp = proposal, proposal_logp

        sigmas[i] = current

    return sigmas[burn_in:]


def price_option_mcmc(
    S0: float,
    K: float,
    r: float,
    T: float,
    option_type: str,
    returns: np.ndarray,
    steps: int = 252,
    n_paths: int = 5000,
    n_sigma_samples: int = 2000,
    burn_in: int = 1000,
    seed: int | None = None
) -> float:
    """
    Price the option by integrating over posterior uncertainty in sigma.

    Steps:
      1. Run MCMC to get posterior samples of sigma.
      2. For each sigma sample, run Monte Carlo pricing.
      3. Average the prices.

    This is expensive but conceptually clean.
    """
    sigma_samples = sample_sigma_mcmc(
        returns=returns,
        n_samples=n_sigma_samples,
        burn_in=burn_in,
        seed=seed
    )

    prices = []
    for i, sigma in enumerate(sigma_samples):
        # optional: reuse seed pattern or vary by i
        p = price_option_mc(
            S0=S0,
            K=K,
            r=r,
            sigma=float(sigma),
            T=T,
            steps=steps,
            n_paths=n_paths,
            option_type=option_type,
            seed=None,
            return_ci=False
        )
        prices.append(p)

    return float(np.mean(prices))


In [7]:
def run_backtest(
    processed_data: Dict[str, Any],
    r: float,
    option_type: str,
    K: float,
    T: float,
    steps: int = 252,
    n_paths: int = 5000,
    window_size: int = 252,
    use_mcmc: bool = False,
    n_sigma_samples: int = 1000
) -> Dict[str, float]:
    """
    Rolling backtest of the pricing method.

    Simplified version:
      - For each time t >= window_size, take the previous `window_size` prices
        as the training window.
      - Estimate sigma (either realized or via MCMC).
      - Price the option with maturity T (we keep T fixed here for simplicity).
      - "Realized payoff" is 1-step-ahead payoff at t+1 (toy setup).

    Returns summary statistics of pricing error.
    """
    prices_all = processed_data["prices"]
    n = len(prices_all)

    model_prices = []
    realized_payoffs = []

    for t in range(window_size, n - 1):
        window_prices = prices_all[t - window_size : t]
        window_returns = np.diff(np.log(window_prices))

        if use_mcmc:
            # integrate over sigma via MCMC
            S_t = prices_all[t]
            price_t = price_option_mcmc(
                S0=S_t,
                K=K,
                r=r,
                T=T,
                option_type=option_type,
                returns=window_returns,
                steps=steps,
                n_paths=n_paths,
                n_sigma_samples=n_sigma_samples,
                burn_in=max(500, n_sigma_samples // 5),
                seed=None
            )
        else:
            # simple realized-vol-based pricing
            sigma_t = realized_vol(window_returns)
            S_t = prices_all[t]
            price_t = price_option_mc(
                S0=S_t,
                K=K,
                r=r,
                sigma=sigma_t,
                T=T,
                steps=steps,
                n_paths=n_paths,
                option_type=option_type,
                seed=None,
                return_ci=False
            )

        model_prices.append(price_t)

        # very simplified "realized payoff": payoff using S at t+1
        S_realized = prices_all[t + 1]
        if option_type.lower() == "call":
            payoff = max(S_realized - K, 0.0)
        else:
            payoff = max(K - S_realized, 0.0)
        realized_payoffs.append(payoff)

    model_prices = np.array(model_prices)
    realized_payoffs = np.array(realized_payoffs)

    errors = model_prices - realized_payoffs
    abs_errors = np.abs(errors)
    # to avoid division by zero, floor the denominator
    pct_errors = abs_errors / np.maximum(np.abs(realized_payoffs), 1e-8)

    return {
        "n_points": int(len(model_prices)),
        "mean_model_price": float(np.mean(model_prices)) if len(model_prices) > 0 else np.nan,
        "mean_realized_payoff": float(np.mean(realized_payoffs)) if len(realized_payoffs) > 0 else np.nan,
        "mean_abs_error": float(np.mean(abs_errors)) if len(abs_errors) > 0 else np.nan,
        "mean_pct_error": float(np.mean(pct_errors)) if len(pct_errors) > 0 else np.nan,
        "rmse": float(np.sqrt(np.mean(errors ** 2))) if len(errors) > 0 else np.nan,
    }


In [12]:
# === Example: basic pipeline ===

data_path = "your_prices.csv"  # e.g., 'AAPL_daily.csv'
train_ratio = 0.8

processed = load_and_process_data(
    path=data_path,
    train_ratio=train_ratio,
    price_col="Close",    # adjust if needed
    date_col="Date"       # adjust if needed
)

vol_results = estimate_volatility_all(processed)
print("Volatility estimates:")
for name, val in vol_results.items():
    print(f"  {name}: {val:.4f}")

sigma = vol_results["realized_vol"]  # choose your estimator
S0 = processed["S0"]

# --- Option parameters (example) ---
option_type = "call"
K = S0  # at-the-money
T = 0.5 # 6 months
r = 0.03

# --- Monte Carlo pricing ---
mc_price, ci = price_option_mc(
    S0=S0,
    K=K,
    r=r,
    sigma=sigma,
    T=T,
    steps=252,
    n_paths=20_000,
    option_type=option_type,
    seed=42,
    return_ci=True
)

print(f"\nMonte Carlo price: {mc_price:.4f}")
print(f"95% CI: [{ci[0]:.4f}, {ci[1]:.4f}]")


Volatility estimates:
  realized_vol: 0.2935
  ewma_vol: 0.3187

Monte Carlo price: 24.3317
95% CI: [23.7936, 24.8698]


In [13]:
# === Example: MCMC-based pricing ===

returns = processed["train_returns"]

mcmc_price = price_option_mcmc(
    S0=S0,
    K=K,
    r=r,
    T=T,
    option_type=option_type,
    returns=returns,
    steps=252,
    n_paths=5000,
    n_sigma_samples=1000,
    burn_in=500,
    seed=42
)

print(f"MCMC-integrated option price: {mcmc_price:.4f}")


MCMC-integrated option price: 24.8736


In [15]:
# === Example: Backtest with realized vol ===

bt_stats_simple = run_backtest(
    processed_data=processed,
    r=r,
    option_type=option_type,
    K=K,
    T=T,
    steps=252,
    n_paths=5000,
    window_size=252,
    use_mcmc=False
)

print("Backtest (realized vol) stats:")
for k, v in bt_stats_simple.items():
    print(f"  {k}: {v}")


# === Example: Backtest with MCMC (more expensive) ===

# bt_stats_mcmc = run_backtest(
#     processed_data=processed,
#     r=r,
#     option_type=option_type,
#     K=K,
#     T=T,
#     steps=252,
#     n_paths=2000,
#     window_size=252,
#     use_mcmc=True,
#     n_sigma_samples=500
# )
#
# print("\nBacktest (MCMC) stats:")
# for k, v in bt_stats_mcmc.items():
#     print(f"  {k}: {v}")


Backtest (realized vol) stats:
  n_points: 2499
  mean_model_price: 0.9522496034909161
  mean_realized_payoff: 0.013129214087978942
  mean_abs_error: 0.9391203894029372
  mean_pct_error: 83656136.45941249
  rmse: 3.4878838544929875


In [16]:

def run_cli_app():
    print("=== Option Pricing Monte Carlo App ===")

    data_path = input("Path to historical price CSV: ").strip()
    option_type = input("Option type (call/put): ").strip().lower()
    K = float(input("Strike price K: "))
    T = float(input("Time to maturity in years (e.g. 0.5): "))
    r = float(input("Risk-free rate (e.g. 0.03 for 3%): "))

    train_ratio = float(input("Train ratio (e.g. 0.8): ") or "0.8")

    processed = load_and_process_data(data_path, train_ratio=train_ratio)

    vol_results = estimate_volatility_all(processed)

    sigma = vol_results["realized_vol"]  # or whichever method you choose
    print(f"Using volatility estimate sigma={sigma:.4f}")

    n_paths = int(input("Number of MC paths (e.g. 10000): ") or "10000")
    steps = int(input("Steps per path (e.g. 252): ") or "252")

    # Plain Monte Carlo
    mc_price = price_option_mc(
        S0=processed["S0"],
        K=K,
        r=r,
        sigma=sigma,
        T=T,
        steps=steps,
        n_paths=n_paths,
        option_type=option_type,
    )
    print(f"Monte Carlo price: {mc_price:.4f}")

    # Markov Chain Monte Carlo
    use_mcmc = input("Run MCMC-based pricing as well? (y/n): ").strip().lower()
    if use_mcmc == "y":
        mcmc_price = price_option_mcmc(
            S0=processed["S0"],
            K=K,
            r=r,
            T=T,
            option_type=option_type,
            returns=processed["train_returns"],
        )
        print(f"MCMC price (integrating over sigma): {mcmc_price:.4f}")

    # Backtest
    do_backtest = input("Run backtest? (y/n): ").strip().lower()
    if do_backtest == "y":
        stats = run_backtest(
            processed_data=processed,
            r=r,
            option_type=option_type,
            K=K,
            T=T,
            steps=steps,
            n_paths=n_paths,
        )
        print("Backtest stats:")
        for k, v in stats.items():
            print(f"  {k}: {v}")
