In [1]:
import pandas as pd
import numpy as np
from scipy.optimize import brentq, minimize_scalar
import time
import warnings

# Suppress warnings
warnings.filterwarnings('ignore')

# The risk-free rate is now a parameter that can be adjusted for each method
# Default value - will be overridden by parameter versions
DEFAULT_R = 0.03


# 1. Black-Scholes European approximation with parameters
def compute_iv_euro(row, r=DEFAULT_R, max_vol=3.0, min_vol=0.001, max_iter=100):
    try:
        S = row['underly_mean_mid']  # Underlying price
        K = row['strike']  # Strike price
        T = row['maturity'] / 365  # Time to expiry in years
        price = row['mean_mid']  # Option price
        is_call = row['is_call'] == 1  # Option type (1=call, 0=put)

        # Calculate Black-Scholes implied volatility
        def objective(sigma):
            return black_scholes_price(S, K, T, r, sigma, is_call) - price

        # Use brentq to find the implied volatility with customizable parameters
        iv = brentq(objective, min_vol, max_vol, maxiter=max_iter)
        return iv
    except:
        return np.nan


# Black-Scholes pricing formula
def black_scholes_price(S, K, T, r, sigma, is_call):
    d1 = (np.log(S / K) + (r + 0.5 * sigma ** 2) * T) / (sigma * np.sqrt(T))
    d2 = d1 - sigma * np.sqrt(T)

    from scipy.stats import norm
    if is_call:
        return S * norm.cdf(d1) - K * np.exp(-r * T) * norm.cdf(d2)
    else:
        return K * np.exp(-r * T) * norm.cdf(-d2) - S * norm.cdf(-d1)


# 2. Binomial Tree method with parameters
def implied_vol_binomial(row, r=DEFAULT_R, steps=100, max_vol=3.0, min_vol=0.001, max_iter=100):
    try:
        S = row['underly_mean_mid']
        K = row['strike']
        T = row['maturity'] / 365
        price = row['mean_mid']
        is_call = row['is_call'] == 1

        def objective(sigma):
            model_price = binomial_tree_price(S, K, T, r, sigma, is_call, steps=steps)
            return model_price - price

        # Find the implied volatility using brentq with customizable parameters
        iv = brentq(objective, min_vol, max_vol, maxiter=max_iter)
        return iv
    except:
        return np.nan


# Binomial Tree pricing model with parameters
def binomial_tree_price(S, K, T, r, sigma, is_call, steps=100):
    dt = T / steps
    u = np.exp(sigma * np.sqrt(dt))
    d = 1 / u
    p = (np.exp(r * dt) - d) / (u - d)

    # Initialize asset prices at maturity
    prices = np.zeros(steps + 1)
    for i in range(steps + 1):
        prices[i] = S * (u ** (steps - i)) * (d ** i)

    # Initialize option values at maturity
    if is_call:
        values = np.maximum(0, prices - K)
    else:
        values = np.maximum(0, K - prices)

    # Work backwards through the tree
    for step in range(steps - 1, -1, -1):
        for i in range(step + 1):
            # Calculate asset price at this node
            asset_price = S * (u ** (step - i)) * (d ** i)

            # Calculate option value from children nodes
            option_value = np.exp(-r * dt) * (p * values[i] + (1 - p) * values[i + 1])

            # Check for early exercise
            if is_call:
                exercise_value = max(0, asset_price - K)
            else:
                exercise_value = max(0, K - asset_price)

            # Take maximum of continuation and exercise
            values[i] = max(option_value, exercise_value)

    # Return the option price (root of the tree)
    return values[0]


# 3. Barone-Adesi and Whaley (BAW) approximation with parameters
def baw_american_iv(row, r=DEFAULT_R, q=0.0, max_vol=3.0, min_vol=0.001, max_iter=100):
    try:
        S = row['underly_mean_mid']
        K = row['strike']
        T = row['maturity'] / 365
        price = row['mean_mid']
        is_call = row['is_call'] == 1

        def objective(sigma):
            model_price = american_option_baw(S, K, T, r, sigma, is_call, q=q)
            return model_price - price

        # Find the implied volatility with customizable parameters
        iv = brentq(objective, min_vol, max_vol, maxiter=max_iter)
        return iv
    except:
        return np.nan


def american_option_baw(S, K, T, r, sigma, is_call, q=0.0):
    """Barone-Adesi and Whaley approximation for American options"""
    from scipy.stats import norm

    # First calculate European option price
    d1 = (np.log(S / K) + (r - q + 0.5 * sigma ** 2) * T) / (sigma * np.sqrt(T))
    d2 = d1 - sigma * np.sqrt(T)

    if is_call:
        euro_price = S * np.exp(-q * T) * norm.cdf(d1) - K * np.exp(-r * T) * norm.cdf(d2)
    else:
        euro_price = K * np.exp(-r * T) * norm.cdf(-d2) - S * np.exp(-q * T) * norm.cdf(-d1)

    # Early exercise premium
    if is_call:
        if r <= q:  # No early exercise benefit
            return euro_price
        else:
            # Call option approximation
            b = r - q
            M = 2 * r / (sigma ** 2)
            N = 2 * b / (sigma ** 2)
            K_inf = K * N / (N - 1)
            q2 = (-(N - 1) + np.sqrt((N - 1) ** 2 + 4 * M / K)) / 2
            a2 = (K / q2) * (1 - np.exp(-r * T) * norm.cdf(d2))

            if S >= K_inf:
                return S - K
            else:
                return euro_price + a2 * (S / K_inf) ** q2
    else:  # Put option
        if r >= q:  # Early exercise calculation
            b = r - q
            M = 2 * r / (sigma ** 2)
            N = 2 * b / (sigma ** 2)
            K_0 = K * N / (N + 1)
            q1 = (-(N - 1) - np.sqrt((N - 1) ** 2 + 4 * M / K)) / 2
            a1 = -(K / q1) * (1 - np.exp(-r * T) * norm.cdf(-d2))

            if S <= K_0:
                return K - S
            else:
                return euro_price + a1 * (S / K_0) ** q1
        else:
            return euro_price


# 4. Bjerksund-Stensland approximation with parameters
def bjerksund_iv(row, r=DEFAULT_R, q=0.0, max_vol=3.0, min_vol=0.001, max_iter=100):
    try:
        S = row['underly_mean_mid']
        K = row['strike']
        T = row['maturity'] / 365
        price = row['mean_mid']
        is_call = row['is_call'] == 1

        def objective(sigma):
            model_price = american_option_bjerksund(S, K, T, r, sigma, is_call, q=q)
            return model_price - price

        # Find the implied volatility with customizable parameters
        iv = brentq(objective, min_vol, max_vol, maxiter=max_iter)
        return iv
    except:
        return np.nan


def american_option_bjerksund(S, K, T, r, sigma, is_call, q=0.0):
    """Simplified Bjerksund-Stensland approximation for American options"""
    from scipy.stats import norm

    # For put options, use put-call transformation
    if not is_call:
        # P(S,K,T,r,σ,q) = C(K,S,T,q,σ,r)
        return american_option_bjerksund(K, S, T, q, sigma, True, r)

    # Check boundary conditions
    if T <= 0:
        return max(0, S - K)

    # Calculate European price (used as fallback)
    d1 = (np.log(S / K) + (r - q + 0.5 * sigma ** 2) * T) / (sigma * np.sqrt(T))
    d2 = d1 - sigma * np.sqrt(T)
    euro_price = S * np.exp(-q * T) * norm.cdf(d1) - K * np.exp(-r * T) * norm.cdf(d2)

    b = r - q

    # Check for early exercise
    if b >= r:  # No early exercise for this case
        return euro_price

    # Calculate parameters for the approximation
    beta = (0.5 - b / (sigma ** 2)) + np.sqrt((b / (sigma ** 2) - 0.5) ** 2 + 2 * r / (sigma ** 2))
    B_infty = beta * K / (beta - 1)
    B_0 = max(K, r * K / b)

    h1 = -(b * T + 2 * sigma * np.sqrt(T)) * K / (B_0 - K)
    h2 = -(b * T - 2 * sigma * np.sqrt(T)) * K / (B_0 - K)

    I1 = B_0 - (B_0 - K) * np.exp(h1)
    I2 = B_0 - (B_0 - K) * np.exp(h2)

    alpha = (I1 - I2) / (B_infty - B_0)

    # Exercise boundary
    B = alpha * B_infty + (1 - alpha) * B_0

    # If S ≥ B, immediate exercise
    if S >= B:
        return S - K

    # Otherwise use approximation formula
    lambda_val = -2 * b / (sigma ** 2)

    d1 = (np.log(S / B) + (b + 0.5 * sigma ** 2) * T) / (sigma * np.sqrt(T))

    part1 = (S * np.exp((b - r) * T) * norm.cdf(d1)) - (K * np.exp(-r * T) * norm.cdf(d1 - sigma * np.sqrt(T)))
    part2 = (S * np.exp((b - r) * T) * norm.cdf(-d1)) - (K * np.exp(-r * T) * norm.cdf(-d1 + sigma * np.sqrt(T)))

    return part1 - part2 * (S / B) ** lambda_val


# 5. Monte Carlo simulation with LSM with parameters
def monte_carlo_iv(row, r=DEFAULT_R, M=1000, N=50, max_vol=3.0, min_vol=0.001, max_iter=50, seed=42):
    try:
        S = row['underly_mean_mid']
        K = row['strike']
        T = row['maturity'] / 365
        price = row['mean_mid']
        is_call = row['is_call'] == 1

        # For efficiency, skip very deep ITM options
        intrinsic = max(0, S - K) if is_call else max(0, K - S)
        if intrinsic / price > 0.95:
            return np.nan

        def objective(sigma):
            model_price = lsm_american_option_price(S, K, T, r, sigma, is_call, M=M, N=N,seed=seed)
            return model_price - price

        # Find the implied volatility with customizable parameters
        iv = brentq(objective, min_vol, max_vol, maxiter=max_iter)
        return iv
    except:
        return np.nan


def simulate_paths(S, r, sigma, T, M, N, seed=42):
    """Simulate asset price paths with optional seed parameter"""
    dt = T / N
    paths = np.zeros((M, N + 1))
    paths[:, 0] = S

    # Generate random paths with optional seed for reproducibility
    np.random.seed(seed)
    Z = np.random.standard_normal((M, N))
    for t in range(1, N + 1):
        paths[:, t] = paths[:, t - 1] * np.exp((r - 0.5 * sigma ** 2) * dt + sigma * np.sqrt(dt) * Z[:, t - 1])

    return paths


def lsm_american_option_price(S, K, T, r, sigma, is_call, M=1000, N=50, seed=42, min_itm_paths=5):
    """Price American option using Least Squares Monte Carlo method with parameters"""
    # Simulate paths
    paths = simulate_paths(S, r, sigma, T, M, N, seed=seed)
    dt = T / N

    # Initialize payoffs at maturity
    if is_call:
        payoffs = np.maximum(0, paths[:, -1] - K)
    else:
        payoffs = np.maximum(0, K - paths[:, -1])

    # Working backwards through time
    for t in range(N - 1, 0, -1):
        # Identify in-the-money paths
        if is_call:
            itm = paths[:, t] > K
        else:
            itm = paths[:, t] < K

        if sum(itm) > min_itm_paths:  # Need enough ITM paths for regression (now parameterized)
            # Current stock prices for ITM options
            S_itm = paths[itm, t]
            # Future discounted cashflows for ITM options
            V_itm = payoffs[itm] * np.exp(-r * dt * (N - t))
            # Regression basis functions (polynomial)
            X = np.column_stack([np.ones(len(S_itm)), S_itm, S_itm ** 2])
            # Fit regression model
            beta, _, _, _ = np.linalg.lstsq(X, V_itm, rcond=None)
            # Expected continuation values
            C = np.dot(X, beta)
            # Immediate exercise values
            if is_call:
                exercise = np.maximum(0, S_itm - K)
            else:
                exercise = np.maximum(0, K - S_itm)

            # Exercise decision
            ex_idx = exercise > C

            # Update payoffs
            payoffs_new = np.copy(payoffs)
            ex_path_idx = np.where(itm)[0][ex_idx]
            payoffs_new[ex_path_idx] = exercise[ex_idx] * np.exp(-r * dt * t)

            # Only consider unexercised paths for future iterations
            payoffs = payoffs_new

    # Option price is the average of all discounted payoffs
    return np.mean(payoffs)



    

In [2]:
# Load the data
file_path = 'sample_data.csv'  # Use your file path
df = pd.read_csv(file_path)

# Ensure time_to_expiry column exists
if 'time_to_expiry' not in df.columns:
    df['time_to_expiry'] = df['maturity'] / 365.0

# Use a subset of the data for faster testing
sample_size = 100
df_filtered = df[:sample_size]

print(f"Processing {sample_size} option records...")

Processing 100 option records...


In [3]:
# 1. Binomial Tree method with different parameters
t0 = time.time()

# Run with default parameters
df_filtered['iv_binomial_default'] = df_filtered.apply(implied_vol_binomial, axis=1)

# Run with custom parameters (example: more steps for better accuracy)
df_filtered['iv_binomial_high_steps'] = df_filtered.apply(
    lambda row: implied_vol_binomial(row, steps=200), axis=1)

# Run with different risk-free rate
df_filtered['iv_binomial_higher_r'] = df_filtered.apply(
    lambda row: implied_vol_binomial(row, r=0.03), axis=1)

t1 = time.time()
print(f'Time spent for binomial methods: {t1 - t0:.2f} seconds')
print(f'Default valid results: {df_filtered["iv_binomial_default"].notna().sum()} out of {sample_size}')
print(f'High steps valid results: {df_filtered["iv_binomial_high_steps"].notna().sum()} out of {sample_size}')
print(f'Higher r valid results: {df_filtered["iv_binomial_higher_r"].notna().sum()} out of {sample_size}')


#is more sensitive to r than step

Time spent for binomial methods: 28.36 seconds
Default valid results: 98 out of 100
High steps valid results: 100 out of 100
Higher r valid results: 98 out of 100


In [4]:


# 2. Barone-Adesi and Whaley (BAW) method with different parameters
t1 = time.time()

# Run with default parameters
df_filtered['iv_baw_default'] = df_filtered.apply(baw_american_iv, axis=1)


# Run with higher max_iter
df_filtered['iv_baw_higher_iter'] = df_filtered.apply(
    lambda row: baw_american_iv(row, max_iter=200), axis=1)

df_filtered['iv_baw_higher_r'] = df_filtered.apply(
    lambda row: baw_american_iv(row,  r=0.03), axis=1)

# more iteration make almost no difference
# is more sensitive to r than iteration

t2 = time.time()
print(f'Time spent for BAW methods: {t2 - t1:.2f} seconds')
print(f'Default valid results: {df_filtered["iv_baw_default"].notna().sum()} out of {sample_size}')
print(f'Higher iter valid results: {df_filtered["iv_baw_higher_iter"].notna().sum()} out of {sample_size}')







Time spent for BAW methods: 0.97 seconds
Default valid results: 95 out of 100
Higher iter valid results: 95 out of 100


In [5]:

# 3. Bjerksund-Stensland method with different parameters
t2 = time.time()

# Run with default parameters
df_filtered['iv_bjerksund_default'] = df_filtered.apply(bjerksund_iv, axis=1)

df_filtered['iv_bjerksund_more_iteration'] =  df_filtered.apply(
    lambda row: bjerksund_iv(row, max_iter=500), axis=1)

# Run with different min/max volatility bounds
df_filtered['iv_bjerksund_wider_bounds'] = df_filtered.apply(
    lambda row: bjerksund_iv(row, min_vol=0.0001, max_vol=5.0), axis=1)

# Run with different min/max volatility bounds
df_filtered['iv_bjerksund_smaller_bounds'] = df_filtered.apply(
    lambda row: bjerksund_iv(row, min_vol=0.1, max_vol=0.5), axis=1)

# Run with different risk-free rate and dividend yield
df_filtered['iv_bjerksund_diff_r'] = df_filtered.apply(
    lambda row: bjerksund_iv(row, r=0.03), axis=1)

t3 = time.time()
#put 都算不出来 算不出来的还是算不出出来，wider bound 和 more iteration基本没用，r影响比其他都大


print(f'Time spent for Bjerksund-Stensland methods: {t3 - t2:.2f} seconds')
print(f'Default valid results: {df_filtered["iv_bjerksund_default"].notna().sum()} out of {sample_size}')
print(f'More Iteration valid results: {df_filtered["iv_bjerksund_more_iteration"].notna().sum()} out of {sample_size}')
print(f'Wider bounds valid results: {df_filtered["iv_bjerksund_wider_bounds"].notna().sum()} out of {sample_size}')
print(f'Smaller bounds valid results: {df_filtered["iv_bjerksund_smaller_bounds"].notna().sum()} out of {sample_size}')
print(f'Different r valid results: {df_filtered["iv_bjerksund_diff_r"].notna().sum()} out of {sample_size}')

Time spent for Bjerksund-Stensland methods: 0.27 seconds
Default valid results: 49 out of 100
More Iteration valid results: 49 out of 100
Wider bounds valid results: 49 out of 100
Smaller bounds valid results: 49 out of 100
Different r valid results: 49 out of 100


In [6]:
# 4. Black-Scholes European approximation with different parameters
t3 = time.time()

# Run with default parameters
df_filtered['iv_euro_default'] = df_filtered.apply(compute_iv_euro, axis=1)

# Run with higher risk-free rate
df_filtered['iv_euro_higher_r'] = df_filtered.apply(
    lambda row: compute_iv_euro(row, r=0.025), axis=1)

# Run with different max iterations
df_filtered['iv_euro_lower_iter'] = df_filtered.apply(
    lambda row: compute_iv_euro(row, max_iter=50), axis=1)

# Run with different max iterations
df_filtered['iv_euro_higher_iter'] = df_filtered.apply(
    lambda row: compute_iv_euro(row, max_iter=300), axis=1)

t4 = time.time()
print(f'Time spent for European approximation methods: {t4 - t3:.2f} seconds')
print(f'Default valid results: {df_filtered["iv_euro_default"].notna().sum()} out of {sample_size}')
print(f'Higher r valid results: {df_filtered["iv_euro_higher_r"].notna().sum()} out of {sample_size}')
print(f'Lower iter valid results: {df_filtered["iv_euro_lower_iter"].notna().sum()} out of {sample_size}')
print(f'Higher iter valid results: {df_filtered["iv_euro_higher_iter"].notna().sum()} out of {sample_size}')

# higher/lower iter没差别，r会影响

Time spent for European approximation methods: 0.25 seconds
Default valid results: 100 out of 100
Higher r valid results: 100 out of 100
Lower iter valid results: 100 out of 100
Higher iter valid results: 100 out of 100


In [7]:
# 5. Monte Carlo method with different parameters (more time-consuming)
t4 = time.time()

# Run a small sample with default parameters
# monte_carlo_sample = df_filtered.head(100)
df_filtered['iv_monte_carlo_default'] = df_filtered.apply(monte_carlo_iv, axis=1)

# Run with different seed for different random numbers
df_filtered['iv_monte_carlo_diff_seed'] = df_filtered.apply(
    lambda row: monte_carlo_iv(row, seed=123), axis=1)


t5 = time.time()
print(f'Time spent for Monte Carlo methods: {t5 - t4:.2f} seconds')


Time spent for Monte Carlo methods: 25.15 seconds


In [8]:
t4 = time.time()


# Run with more paths for better accuracy (but slower)
df_filtered['iv_monte_carlo_more_paths'] = df_filtered.apply(
    lambda row: monte_carlo_iv(row, M=2000, N=100), axis=1)


t5 = time.time()
print(f'Time spent for Monte Carlo methods: {t5 - t4:.2f} seconds')



print(
    f'Default valid results: {df_filtered["iv_monte_carlo_default"].notna().sum()} out of {len(df_filtered)}')
print(
    f'More paths valid results: {df_filtered["iv_monte_carlo_more_paths"].notna().sum()} out of {len(df_filtered)}')
print(
    f'Different seed valid results: {df_filtered["iv_monte_carlo_diff_seed"].notna().sum()} out of {len(df_filtered)}')

# Add monte carlo results back to main dataframe
for col in ['iv_monte_carlo_default', 'iv_monte_carlo_more_paths', 'iv_monte_carlo_diff_seed']:
    df_filtered.loc[df_filtered.index, col] = df_filtered[col]



Time spent for Monte Carlo methods: 68.81 seconds
Default valid results: 98 out of 100
More paths valid results: 100 out of 100
Different seed valid results: 98 out of 100


In [9]:
t4 = time.time()
# Run with more paths for best accuracy (but slower)
df_filtered['iv_monte_carlo_most_paths'] = df_filtered.apply(
    lambda row: monte_carlo_iv(row, M=4000, N=200), axis=1)

df_filtered['iv_monte_carlo_most_paths_other_seed'] = df_filtered.apply(
    lambda row: monte_carlo_iv(row, M=4000, N=200, seed=123), axis=1)

t5 = time.time()
print(f'Time spent for Monte Carlo methods: {t5 - t4:.2f} seconds')


Time spent for Monte Carlo methods: 2023.65 seconds
