In [None]:
import numpy as np
from tqdm import tqdm
from typing import Optional, List, Union

### 1D

In [None]:
def simulate_gbm(
    S0: float,
    r: float,
    dividend: float,
    sigma: float,
    T: float,
    M: int,
    I: int,
    seed: Union[int, None] = None,
) -> np.ndarray:
    """
    Simulates Geometric Brownian Motion (GBM) paths for a single underlying asset.

    Parameters:
        S0 (float): Initial stock price
        r (float): Risk-free rate (annualized)
        dividend (float): Continuous dividend yield (annualized)
        sigma (float): Volatility (annualized)
        T (float): Time to maturity in years
        M (int): Number of time steps (excluding initial time)
        I (int): Number of simulated paths
        seed (int | None): Optional seed for random number generator

    Returns:
        np.ndarray: Simulated price paths array of shape (M+1, I)

    Raises:
        ValueError: For invalid input parameters
    """
    # Parameter validation
    if any(param <= 0 for param in [S0, T]) or any(param < 0 for param in [sigma]):
        raise ValueError("S0, T must be positive; sigma must be non-negative")
    if not all(isinstance(param, int) and param > 0 for param in [M, I]):
        raise ValueError("M and I must be positive integers")

    dt = T / M
    paths = np.zeros((M + 1, I))
    paths[0] = S0

    if seed is not None:
        np.random.seed(seed)

    # Vectorized GBM calculation
    drift = (r - dividend - 0.5 * sigma**2) * dt
    vol = sigma * np.sqrt(dt)
    
    rand_numbers = np.random.standard_normal((M, I))
    for t in range(1, M + 1):
        paths[t] = paths[t-1] * np.exp(drift + vol * rand_numbers[t-1])

    return paths


def longstaff_schwartz_train(
    S_paths_train: np.ndarray,
    K: float,
    r: float,
    T: float,
    degree: int,
) -> List[Union[np.ndarray, None]]:
    """
    Trains Longstaff-Schwartz algorithm using polynomial regression for American options.

    Parameters:
        S_paths_train (np.ndarray): Price paths array of shape (M+1, I)
        K (float): Option strike price
        r (float): Risk-free rate (annualized)
        T (float): Time to maturity in years
        degree (int): Polynomial regression degree

    Returns:
        list[np.ndarray | None]: Polynomial coefficients for each time step (forward order)

    Raises:
        ValueError: For invalid input parameters
    """
    # Input validation
    if S_paths_train.ndim != 2:
        raise ValueError("S_paths_train must be 2D array")
    M_plus_1, I = S_paths_train.shape
    M = M_plus_1 - 1
    if any(param <= 0 for param in [K, T]) or degree < 0:
        raise ValueError("K, T must be positive; degree must be non-negative")

    dt = T / M
    discount = np.exp(-r * dt)
    V = np.maximum(S_paths_train[-1] - K, 0)
    coefficients = []

    # Backward induction from T-1 to t=1
    for t in reversed(range(1, M)):  # t = M-1, M-2, ..., 1
        current_prices = S_paths_train[t]
        in_the_money = current_prices > K
        X = current_prices[in_the_money]
        Y = V[in_the_money] * discount

        if X.size == 0:
            coefficients.append(None)
            continue

        # Polynomial regression
        coeff = np.polyfit(X, Y, degree)
        coefficients.append(coeff)

        # Update value function
        continuation = np.polyval(coeff, current_prices)
        exercise = np.maximum(current_prices - K, 0)
        V = np.where(exercise > continuation, exercise, V)

    return coefficients[::-1]  # Reverse to forward time order


def longstaff_schwartz_test(
    S_paths_test: np.ndarray,
    K: float,
    r: float,
    T: float,
    coefficients:  List[Union[np.ndarray, None]],
) -> float:
    """
    Estimates American option price using pre-trained Longstaff-Schwartz coefficients.

    Parameters:
        S_paths_test (np.ndarray): Price paths array of shape (M+1, I)
        K (float): Option strike price
        r (float): Risk-free rate (annualized)
        T (float): Time to maturity in years
        degree (int): Polynomial degree (for validation)
        coefficients (list[np.ndarray | None]): Trained polynomial coefficients

    Returns:
        float: Estimated option present value

    Raises:
        ValueError: For invalid input parameters
    """
    # Input validation
    if S_paths_test.ndim != 2:
        raise ValueError("S_paths_test must be 2D array")
    M_plus_1, I = S_paths_test.shape
    M = M_plus_1 - 1
    if len(coefficients) != M - 1:
        raise ValueError("Coefficients length mismatch with time steps")

    dt = T / M
    V = np.maximum(S_paths_test[-1] - K, 0)

    # Forward iteration through coefficients (reverse time order)
    for t in reversed(range(1, M)):  # t = M-1, M-2, ..., 1
        if coefficients[t-1] is None:
            continue

        current_prices = S_paths_test[t]
        continuation = np.polyval(coefficients[t-1], current_prices)
        exercise = np.maximum(current_prices - K, 0)
        V = np.where(exercise > continuation, exercise, V)

    return np.mean(V) * np.exp(-r * dt)  # Discount first time step

In [None]:
# Parameters
S0 = 100 # initial stock price
r = 0.05 # risk-free rate
dividend = 0.1 # dividend yield
sigma = 0.2 # symmetric volatility
K = 100 # strike price
degree = 5 # degree of the polynomial regression
T = 3 # time to maturity
M = 9 # number of stopping times
d = 1
I_train = (1500+d) * 4000 # same number of paths as in our replication
seed_train = 42 # seed for training


# Train the Longstaff-Schwartz algorithm
trained_s = simulate_gbm(S0, r, dividend, sigma, T, M, I_train, seed = seed_train)
coeffs = longstaff_schwartz_train(trained_s, K, r, T, degree)

# Use Longstaff-Schwartz algorithm for pricing
seed_test = 45 # seed for testing
I_test = 100000 # number of paths for testing
tested_s = simulate_gbm(S0, r, dividend, sigma, T, M, I_test, seed = seed_test)
price = longstaff_schwartz_test(tested_s, K, r, T, coeffs)
print(f"Estimated price of the 1D Bermudan Max Call Option: {price:.2f}")

### Muiti D

In [None]:
def simulate_gbm_multi(
    dimension: int,
    risk_free_rate: float,
    dividend_rates: Union[float, np.ndarray],
    sigma: np.ndarray,
    initial_prices: np.ndarray,
    time_horizon: float,
    time_step: float,
    num_paths: int,
    seed: Optional[int] = None,
) -> np.ndarray:
    """
    Simulates multi-dimensional Geometric Brownian Motion paths with dividends.

    Parameters:
        dimension (int): Number of underlying assets (d ≥ 1)
        risk_free_rate (float): Annual risk-free rate (r ≥ 0)
        dividend_rates (Union[float, np.ndarray]): Continuous dividend rates, 
            scalar or array of shape (d,)
        sigma (np.ndarray): Volatility coefficients array of shape (d,)
        initial_prices (np.ndarray): Initial prices array of shape (d,)
        time_horizon (float): Total simulation time in years (T > 0)
        time_step (float): Time step size in years (dt > 0)
        num_paths (int): Number of paths to simulate (≥ 1)
        seed (int | None): Random seed for reproducibility

    Returns:
        np.ndarray: Simulated paths array of shape (num_paths, dimension, n_steps+1)

    Raises:
        ValueError: For invalid input parameters
    """
    # Parameter validation
    if dimension <= 0:
        raise ValueError("Dimension must be ≥ 1")
    if sigma.shape != (dimension,) or initial_prices.shape != (dimension,):
        raise ValueError("Parameter shape mismatch")
    if time_horizon <= 0 or time_step <= 0:
        raise ValueError("Time parameters must be positive")
    if num_paths <= 0:
        raise ValueError("Number of paths must be ≥ 1")
    if risk_free_rate < 0:
        raise ValueError("Risk-free rate cannot be negative")
    if isinstance(dividend_rates, np.ndarray) and dividend_rates.shape != (dimension,):
        raise ValueError("Dividend rates shape mismatch")

    np.random.seed(seed) if seed else None

    # Process dividend rates
    if isinstance(dividend_rates, float):
        dividend_rates = np.full(dimension, dividend_rates)
    
    n_steps = int(time_horizon / time_step)
    dt_sqrt = np.sqrt(time_step)
    
    # Calculate drift term with dividends
    drift_terms = (risk_free_rate - dividend_rates - 0.5 * sigma**2) * time_step
    diffusion_terms = sigma * dt_sqrt

    # Initialize array with optimized dtype
    paths = np.empty((num_paths, dimension, n_steps + 1), dtype=np.float32)
    paths[:, :, 0] = initial_prices

    # Vectorized path generation
    for step in range(1, n_steps + 1):
        rand_nums = np.random.randn(num_paths, dimension).astype(np.float32)
        paths[:, :, step] = paths[:, :, step-1] * np.exp(
            drift_terms + diffusion_terms * rand_nums
        )

    return paths


def bermudan_max_call_payoff(
    underlying_prices: np.ndarray,
    strike_price: float,
) -> np.ndarray:
    """
    Computes Bermudan max call option payoff at given time steps.

    Parameters:
        underlying_prices (np.ndarray): Price array of shape (paths, assets)
        strike_price (float): Option strike price (K > 0)

    Returns:
        np.ndarray: Payoff values array of shape (paths,)
    """
    max_asset = np.amax(underlying_prices, axis=1)
    return np.maximum(max_asset - strike_price, 0)


def longstaff_schwartz_train(
    price_paths: np.ndarray,
    strike_price: float,
    risk_free_rate: float,
    maturity: float,
    poly_degree: int,
) -> List[Optional[np.ndarray]]:
    """
    Trains Longstaff-Schwartz algorithm for American-style options pricing.

    Parameters:
        price_paths (np.ndarray): Training paths array of shape (num_paths, time_steps+1)
        strike_price (float): Option strike price (K > 0)
        risk_free_rate (float): Annual risk-free rate (r ≥ 0)
        maturity (float): Time to maturity in years (T > 0)
        poly_degree (int): Polynomial regression degree (≥ 0)

    Returns:
        List[Optional[np.ndarray]]: Regression coefficients for each exercise time

    Raises:
        ValueError: For invalid input parameters
    """
    if price_paths.ndim != 2:
        raise ValueError("Price paths must be 2D array (paths, timesteps)")
    num_paths, total_steps = price_paths.shape
    exercise_steps = total_steps - 1
    if any(param <= 0 for param in [strike_price, maturity]) or poly_degree < 0:
        raise ValueError("Invalid parameter values")

    dt = maturity / exercise_steps
    discount_factor = np.exp(-risk_free_rate * dt)
    value_func = bermudan_max_call_payoff(price_paths[:, -1:], strike_price)
    coefficients = []

    # Backward induction from last exercise date
    for step in reversed(range(1, exercise_steps)):  # Exclude initial time
        current_prices = price_paths[:, step]
        itm_mask = current_prices > strike_price
        X = current_prices[itm_mask]
        Y = value_func[itm_mask] * discount_factor

        if X.size == 0:
            coefficients.append(None)
            continue

        # Polynomial regression
        coeff = np.polyfit(X, Y, poly_degree)
        coefficients.append(coeff)

        # Update exercise strategy
        continuation = np.polyval(coeff, current_prices)
        exercise = bermudan_max_call_payoff(
            current_prices[:, None], strike_price
        )  # Add dimension for vectorization
        value_func = np.where(exercise > continuation, exercise, value_func)

    return coefficients[::-1]  # Reverse to chronological order


def longstaff_schwartz_evaluate(
    price_paths: np.ndarray,
    strike_price: float,
    risk_free_rate: float,
    maturity: float,
    coefficients: List[Optional[np.ndarray]],
) -> float:
    """
    Evaluates American option price using trained Longstaff-Schwartz model.

    Parameters:
        price_paths (np.ndarray): Test paths array of shape (paths, timesteps+1)
        strike_price (float): Option strike price (K > 0)
        risk_free_rate (float): Annual risk-free rate (r ≥ 0)
        maturity (float): Time to maturity in years (T > 0)
        coefficients (List[Optional[np.ndarray]]): Trained regression coefficients

    Returns:
        float: Estimated option present value

    Raises:
        ValueError: For input validation errors
    """
    # Input validation
    if price_paths.ndim != 2:
        raise ValueError("Price paths must be 2D array")
    num_paths, total_steps = price_paths.shape
    exercise_steps = total_steps - 1
    if len(coefficients) != exercise_steps - 1:
        raise ValueError("Coefficients/exercise steps mismatch")

    dt = maturity / exercise_steps
    value_func = bermudan_max_call_payoff(price_paths[:, -1:], strike_price)

    # Forward iteration through exercise opportunities
    for step in reversed(range(1, exercise_steps)):
        if coefficients[step-1] is None:
            continue

        current_prices = price_paths[:, step]
        continuation = np.polyval(coefficients[step-1], current_prices)
        exercise = bermudan_max_call_payoff(
            current_prices[:, None], strike_price
        )
        value_func = np.where(exercise > continuation, exercise, value_func)

    # Discount first time step
    return np.mean(value_func) * np.exp(-risk_free_rate * dt)

In [None]:
# Global parameters
risk_free_rate = 0.05       # Annual risk-free rate
dividend_rate = 0.1         # Continuous dividend yi
k = 100.0        # Option strike price
maturity = 3.0              # Time to expiration in years
num_exercise_points = 9     # Number of exercise opportunities
dt = maturity / num_exercise_points  # Time step size
num_test_paths = 100000      # Number of paths for testing
train_seed = 42              # Seed for training path generation
test_seed = 45               # Seed for testing path generation
poly_degree = 5             # Degree for polynomial regression

# Dictionary to store results {dimension: price}
price_results = {}

for dimension in tqdm([2, 3, 5, 10, 20, 30, 50, 100], desc="Pricing dimensions"):
    # Calculate training paths count based on dimension
    num_train_paths = (1500 + dimension) * 4000
    
    # 1. Model Parameters Configuration
    # --------------------------------
    # Volatility vector (constant for all assets)
    volatility = 0.2 * np.ones(dimension)
    
    # Initial prices vector (100 for all assets)
    initial_prices = 100.0 * np.ones(dimension)

    # 2. Training Phase
    # -----------------
    # Generate training paths: shape (paths, assets, time_steps+1)
    train_paths = simulate_gbm_multi(
        dimension=dimension,
        risk_free_rate=risk_free_rate,
        dividend_rates=dividend_rate,
        sigma=volatility,
        initial_prices=initial_prices,
        time_horizon=maturity,
        time_step=dt,
        num_paths=num_train_paths,
        seed=train_seed
    )
    
    # Extract maximum asset values across dimensions: shape (paths, time_steps+1)
    max_train_paths = np.amax(train_paths, axis=1)
    
    # Train Longstaff-Schwartz model
    coefficients = longstaff_schwartz_train(
        price_paths=max_train_paths,
        strike_price=k,
        risk_free_rate=risk_free_rate,
        maturity=maturity,
        poly_degree=poly_degree
    )

    # 3. Testing Phase
    # ----------------
    # Generate independent test paths
    test_paths = simulate_gbm_multi(
        dimension=dimension,
        risk_free_rate=risk_free_rate,
        dividend_rates=dividend_rate,
        sigma=volatility,
        initial_prices=initial_prices,
        time_horizon=maturity,
        time_step=dt,
        num_paths=num_test_paths,
        seed=test_seed
    )
    
    # Extract test maximums: shape (paths, time_steps+1)
    max_test_paths = np.amax(test_paths, axis=1)
    
    # Calculate option price using trained model
    option_price = longstaff_schwartz_evaluate(
        price_paths=max_test_paths,
        strike_price=k,
        risk_free_rate=risk_free_rate,
        maturity=maturity,
        coefficients=coefficients
    )
    
    price_results[dimension] = option_price

# 4. Results Presentation
# -----------------------
print("\nBermudan Max Call Option Prices:")
for dim, price in price_results.items():
    print(f"Dimension {dim}: {price:.4f}")