In [8]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import scipy.sparse as sp
from scipy.sparse.linalg import spsolve


import finite_differences.finite_diff_formulas as fd
import test_fd_methods as tfd
import test_implementation as toi
import optimizer_utils as opt

# Volatility Arbitrage Option Pricer

## Overview
This notebook implements a **finite difference PDE solver** for European call options and uses it as the core engine in a **volatility arbitrage strategy**.

The main numerical methods explored are:
- **Crank–Nicolson** (second-order accurate in time and space)
- **Forward finite differences** (for comparison and stability checks)

While the initial implementation focuses on **European vanilla options**, the end goal is to extend the solver to **American** and **Asian** options, adapting the numerical scheme for early exercise and path dependence.

The project also connects the pricer to a volatility arbitrage framework, where differences between **implied** and **realized** volatility can signal potential trades.

---

## Key Features
- **Finite difference method** with dynamic grid sizing.
- **Sparse matrix solving** (`scipy.sparse`) for computational efficiency.
- **Analytical benchmark** via the closed-form Black–Scholes formula.
- **Rigorous testing** to improve numerical stability and accuracy.
- **Parameter optimization utilities** for tuning PDE grid and timestep.
- **Volatility arbitrage logic** using option mispricing signals.

---

## Project Structure
- `finite_differences/finite_diff_formulas.py` → PDE solver implementation (Crank–Nicolson & forward differences)
- `test_fd_methods.py` → Validation of numerical methods
- `test_implementation.py` → Strategy integration tests
- `optimizer_utils.py` → Parameter optimization routines for PDE solver
- **Notebook** → Combines pricing engine with volatility arbitrage backtesting

---

## Current Scope
- European call option pricing with PDE methods.
- Comparison between:
  - Analytical Black–Scholes
  - Forward finite difference
  - Crank–Nicolson finite difference
- Backtesting volatility arbitrage on historical data (planned/partially implemented).

---

## Roadmap
1. Extend PDE solver for **American options** (free boundary condition handling).
2. Implement **Asian options** via Monte Carlo simulation.
3. Integrate live data feed for real-time volatility arbitrage.
4. Enhance backtesting module with risk metrics and trade execution simulation.

In [6]:
# --- Option Parameters ---
TRADING_DAYS = 252
K = 95
S = 100
sigma = .2
r = .05
tfinal = 21 / TRADING_DAYS  

#n is discrete time steps, m is asset price steps
n = 16
m = 16        

bs_price = fd.black_scholes_call(S, K, tfinal, r, sigma)
price_fd = fd.finite_difference_call(n, m, K, tfinal, S, sigma, r)
price_cn = fd.price_derivative_cn(n, m, K, r, tfinal, S, sigma)

print("When given", m, "pricesteps and", n, "timesteps:")
print("European Call Option Price Analytical:", bs_price)
print("European Call Option Price Finite Dif:", price_fd)
print("European Call Option Price Crank-Nico:", price_cn)

print("Finite Difference Relative Error:", np.abs(price_fd - bs_price)/bs_price)
print("Crank-Nicolson Relative Error:", np.abs(price_cn - bs_price)/bs_price)

When given 16 pricesteps and 16 timesteps:
European Call Option Price Analytical: 5.898921658718436
European Call Option Price Finite Dif: 5.876594250948936
European Call Option Price Crank-Nico: 5.296782858142622
Finite Difference Relative Error: 0.0037849981846259043
Crank-Nicolson Relative Error: 0.1020760802418642


## Convergence Analysis

After confirming the correctness of our numerical methods for a fixed grid,
we now analyze **how the solution converges** as we refine the finite difference grid.  
The primary goal here is to identify and understand sources of inaccuracy.

We use an **external reference price** (from an online source) as our ground truth,
and compute errors for varying **time step (N)** and **asset price step (M)** values.  

Our results show a clear pattern:  
while increasing *either* time steps or asset price steps reduces error,
**increasing the number of asset price steps has a significantly greater effect on convergence**.
This aligns with the PDE structure, since spatial discretization directly impacts the accuracy
of the second derivative term in the Black–Scholes PDE.

In [19]:
cn_errors, oi_errors, cn_values, neumann = tfd.test_convergence_all(K, r, tfinal, S, sigma, (5,4), fd.price_derivative_cn, toi.crank_nicholson)

convergence_df = pd.DataFrame({
    "Step_sizes (NTS, NAS)": [f"{2 + 4** m}x{2 + 4** n}" for m in range(5) for n in range(4)],
    "Crank–Nicolson Error": cn_errors.ravel(),
    "Other Impl. Error": oi_errors.ravel(),
    "Crank–Nicolson Price": cn_values.ravel()
})

convergence_df

Unnamed: 0,"Step_sizes (NTS, NAS)",Crank–Nicolson Error,Other Impl. Error,Crank–Nicolson Price
0,3x3,1.373837,3.041053,4.525084
1,3x6,0.279605,1.766296,5.619317
2,3x18,0.78672,0.385381,5.112202
3,3x66,2.200948,0.478248,3.697973
4,6x3,1.227792,3.041053,4.67113
5,6x6,0.130315,1.766294,5.768606
6,6x18,0.725213,0.385389,5.173709
7,6x66,2.439825,0.476579,3.459097
8,18x3,1.132523,3.041053,4.766399
9,18x6,0.033291,1.766293,5.865631


### Optimal Asset Price Grid Boundaries

In this section, we investigate how to choose **optimal asset price boundaries** for the finite difference grid.  
Our objective is to minimize numerical errors while maintaining stability in the **Crank–Nicolson** solver.

#### **Boundary Types Compared**
1. **Static boundaries** — fixed minimum and maximum asset prices.
2. **Dynamic boundaries** — calculated from the underlying's lognormal distribution, parameterized by the number of standard deviations `n_std` from the mean.

#### **Key Findings**
- Using approximately **3 standard deviations** for dynamic boundaries yields the most accurate results in a representative case.
- The optimal range for `n_std` is **(2, 5)** for best convergence and stability.
- Additional robustness tests with varying starting values indicate that the combination:

In [29]:
def dynamic_bounds(K, sigma, T, n_std=3):
    """
    Generate asset price bounds based on lognormal distribution percentiles.

    Parameters:
        K (float): Strike price
        sigma (float): Volatility (annualized)
        T (float): Time to maturity (in years)
        n_std (float): Number of standard deviations for the bounds

    Returns:
        (float, float): (S_min, S_max)
    """
    S_min = K * np.exp(-n_std * sigma * np.sqrt(T))
    S_max = K * np.exp(n_std * sigma * np.sqrt(T))
    return S_min, S_max

In [32]:
#Static boundaries: 
s_vals = [(K/3, K*2), (K/4, K*3)]

#Dynamic boundaries:
d_vals = np.array([dynamic_bounds(K, sigma, tfinal, i) for i in range(2,5)])

cn_errors, oi_errors, neumann = tfd.test_grid_asset_boundaries(fd.price_derivative_cn, toi.crank_nicholson, K, r, tfinal, S, sigma, (6,5), d_vals)

summary_rows = []

for bounds, error_matrix in zip(d_vals, cn_errors):
    # Mask zeros by setting them to np.nan
    masked_errors = np.where(error_matrix == 0, np.nan, error_matrix)
    
    # Find the min error and its position, ignoring zeros
    min_error = np.nanmin(masked_errors)
    min_pos = np.unravel_index(np.nanargmin(masked_errors), masked_errors.shape)
    
    summary_rows.append({
        "Lower Asset Price Grid Bound": bounds[0],
        "Upper Asset Price Grid Bound": bounds[1],
        "Min CN Error": min_error,
        "NTS": 2 + 4** min_pos[0],
        "NAS": 2 + 4** min_pos[1]
    })

# Create summary DataFrame
summary_df = pd.DataFrame(summary_rows)

summary_df

Unnamed: 0,Lower Asset Price Grid Bound,Upper Asset Price Grid Bound,Min CN Error,NTS,NAS
0,224.664482,1112.770464,8.676509,1026,18
1,150.597106,1660.058461,0.041644,66,6
2,100.948259,2476.516212,0.00044,1026,18


## Testing Various Starting Conditions

We next test the performance of the Crank–Nicolson method under different starting conditions. These include variations in volatility, time to maturity, moneyness, and strike size. 

Empirical testing indicates that the method performs best for **low volatility and short time to maturity**. Across most test cases, using **3 standard deviations** for the dynamic asset price boundaries provides consistently good results, even when some boundary configurations are less optimal.


In [33]:
# Test cases: K, σ, T, S0
test_cases = [
    # Low volatility, short maturity
    {"K": 100, "sigma": 0.1, "T": 0.1, "S0": 100, "type" : "Low volatility, short maturity"},
    # High volatility, long maturity
    {"K": 100, "sigma": 0.5, "T": 5.0, "S0": 100, "type" : "High volatility, long maturity"},
    # Extreme moneyness (Strike price is significantly higher/lower than current stock price )
    {"K": 100, "sigma": 0.3, "T": 1.0, "S0": 50, "type" : "OTM"},   # OTM call
    {"K": 100, "sigma": 0.3, "T": 1.0, "S0": 150, "type" : "ITM"},  # ITM call
    # Small/large strikes
    {"K": 10,  "sigma": 0.2, "T": 1.0, "S0": 10, "type" : "Small strike"},
    {"K": 500, "sigma": 0.4, "T": 1.0, "S0": 500, "type" : "Large strike"},
]

summary_list = []

for case in test_cases:
    K = case["K"]
    sigma = case["sigma"]
    tfinal = case["T"]
    S0 = case["S0"]
    
    d_vals = np.array([dynamic_bounds(K, sigma, tfinal, i) for i in range(2,5)])

    cn_errors, oi_errors, conds = tfd.test_grid_asset_boundaries(fd.price_derivative_cn, toi.crank_nicholson, K, r, tfinal, S0, sigma, (6,5), d_vals)

    for bounds, err_matrix in zip(d_vals, cn_errors):
        # Ignore zeros for stability
        masked_errors = np.where(err_matrix == 0, np.nan, err_matrix)
        min_error = np.nanmin(masked_errors)
        min_pos = np.unravel_index(np.nanargmin(masked_errors), err_matrix.shape)

        summary_list.append({
            "Test Case": case["type"],
            "Lower Bound": bounds[0],
            "Upper Bound": bounds[1],
            "Min CN Error": min_error,
            "NTS": 2 + 4** min_pos[0],
            "NAS": 2 + 4** min_pos[1]
        })

summary_df = pd.DataFrame(summary_list)
summary_df

Unnamed: 0,Test Case,Lower Bound,Upper Bound,Min CN Error,NTS,NAS
0,"Low volatility, short maturity",93.871294,106.528839,0.007395,1026,6
1,"Low volatility, short maturity",90.949268,109.951407,0.000228,66,6
2,"Low volatility, short maturity",88.118199,113.483936,0.010937,18,6
3,"High volatility, long maturity",10.687793,935.646902,0.221943,1026,6
4,"High volatility, long maturity",3.494073,2861.989102,13.659803,1026,18
5,"High volatility, long maturity",1.142289,8754.351246,9.854615,1026,18
6,OTM,54.881164,182.21188,0.38938,258,18
7,OTM,40.656966,245.960311,0.005866,258,18
8,OTM,30.119421,332.011692,0.049831,258,18
9,ITM,54.881164,182.21188,4.063845,1026,3


## Stability Condition Testing

Given the variation in initial conditions and the size of the asset price boundaries, it is important that **boundaries and step sizes are chosen adaptively**. 

Regardless of approach, the **Crank–Nicolson stability condition** must be satisfied:

$
\Delta t \leq \frac{(\Delta S)^2}{2\sigma^2 S_{\text{max}}^2}
$

We will test various starting conditions and check:

1. When the stability condition holds.
2. When it is violated, and by how much.

For simplicity, we will narrow the dynamic boundary selection to **3 standard deviations** for most cases, adjusting slightly to 3.5 in extreme volatility/long-maturity cases.

In [37]:
results = []

for case in test_cases:
    K = case["K"]
    sigma = case["sigma"]
    tfinal = case["T"]
    S0 = case["S0"]
    
    c = 3.5 if sigma*np.sqrt(tfinal) > 1 else 3
    d_vals = dynamic_bounds(K, sigma, tfinal, c)

    cn_errors, oi_errors, vals, conds = tfd.test_convergence_all(K, r, tfinal, S0, sigma, (6,5), fd.price_derivative_cn, toi.crank_nicholson, True, d_vals)
    
    nonzero_mask = cn_errors > 0
    if np.any(nonzero_mask):
        min_error = cn_errors[nonzero_mask].min()
        min_pos = np.argwhere(cn_errors == min_error)[0]  
        timestep_idx, price_idx = min_pos
    else:
        min_error = None
        timestep_idx, price_idx = None, None

    results.append({
        "Case": case["type"],
        "Lower Bound": d_vals[0],
        "Upper Bound": d_vals[1],
        "Min CN Error": min_error,
        "NTS": 2 + 4**timestep_idx,
        "NAS": 2 + 4**price_idx
    })

error_summary_df = pd.DataFrame(results)
error_summary_df

Unnamed: 0,Case,Lower Bound,Upper Bound,Min CN Error,NTS,NAS
0,"Low volatility, short maturity",90.949268,109.951407,0.000228,66,6
1,"High volatility, long maturity",1.997809,5005.48278,1.141021,1026,18
2,OTM,40.656966,245.960311,0.005866,258,18
3,ITM,40.656966,245.960311,0.136827,1026,3
4,Small strike,5.488116,18.221188,0.059597,1026,6
5,Large strike,150.597106,1660.058461,17.048105,1026,6


## Advection Term Verification

We check whether the advection term condition is satisfied for each test case.  

The results show that **in nearly every scenario, the condition holds**, confirming that enforcing this constraint does not limit our ability to find viable solutions.

In [39]:
advection_results = []

for case in test_cases:
    K = case["K"]
    sigma = case["sigma"]
    tfinal = case["T"]
    S0 = case["S0"]

    cn_errors = tfd.satisfies_advection_term(K, r, tfinal, S0, sigma, (6,5), fd.price_derivative_cn)
    
    total_terms = cn_errors.size
    passed_terms = np.sum(cn_errors > 0)
    fraction_passed = passed_terms / total_terms

    advection_results.append({
        "Case": case["type"],
        "Fraction > 0": fraction_passed,
        "Passed / Total": f"{passed_terms} / {total_terms}"
    })

advection_df = pd.DataFrame(advection_results)
advection_df

Unnamed: 0,Case,Fraction > 0,Passed / Total
0,"Low volatility, short maturity",1.0,30 / 30
1,"High volatility, long maturity",0.8,24 / 30
2,OTM,0.966667,29 / 30
3,ITM,0.966667,29 / 30
4,Small strike,0.966667,29 / 30
5,Large strike,0.9,27 / 30


### Next Steps: Generating Optimal NAS and NTS

There are several directions to explore from here. Since no single condition has proven sufficient to distinguish NAS and NTS, two plausible approaches emerge:

1. **Lookup Table Approach**  
   One option is to create a lookup table that, given a set of initial conditions, provides the optimal NAS and NTS.  

2. **Continuous Mapping Approach**  
   The approach taken here builds on the lookup table idea: we use the table to generate a continuous mapping from the initial conditions onto `(NAS, NTS)`. The main advantage of this continuous mapping is that it allows us to predict optimal NAS and NTS for points not previously seen in the dataset. For simplicity, we assume a constant interest rate.  

The code iterates over a carefully chosen grid of option parameters, including:

- Strike prices (`K`) focusing around 50–250  
- Time to maturity (`T`), with finer granularity for short-term options  
- Asset prices (`S`) concentrated around the strike price (0.8 × K to 1.2 × K)  
- Volatility (`σ`) rounded to two decimal places  

To make the problem computationally feasible, we apply several filters:  

- Exclude deep OTM options  
- Skip unlikely or extreme parameter combinations (e.g., very low volatility with long maturities, or very high volatility with very long maturities)  

Even with these restrictions, the original parameter grid contained around 3 million entries. After applying the filters, we reduce the dataset to approximately 12,000 cases, making it manageable while still capturing the essential dynamics.  

For each combination in this reduced parameter grid, the optimal NAS and NTS steps are computed in parallel and saved to a CSV file for later use. This dataset can then be used to generate a continuous mapping, providing near-optimal step sizes for new option parameters not included in the original grid.


In [96]:
import os

#K_vals initialized to focus on values around 50 - 150
K_vals = np.round(np.exp(np.linspace(np.log(50), np.log(250), 8)).astype(int))

#More precision for short term, we concatenate 
T_vals = np.concatenate([
    np.linspace(0.01, 0.1, 8),    # Short-term
    np.linspace(0.1, 1.0, 6),     # Medium
    [1.0, 2.0, 3.0, 5.0]      # Long-term
])

#Something to think about (How much emphasis should we place on unlikely combinations?)
S_vals = np.unique(np.concatenate([np.linspace(0.8 * K, 1.2 * K, 6) for K in K_vals]))

#Round the volatilies to 2 digits as that is what is generally used
sigmas = np.round(np.linspace(0.10, 0.40, 8), 2)

#Test iteration
#print(t.optimal_NAS_NTS(95, 21/252, 100, .2, .05))

#We are going to avoid options that are deep OTM and not consider those to reduce our cases.

#Compute optimal steps for every combination in parameter grid
param_grid = [(K, T, S, sigma) for K in K_vals for T in T_vals 
                for S in S_vals if S for sigma in sigmas 
                if (0.7 <= S/K <= 1.3)                  
                and not (sigma <= 0.1 and T >= 0.25)  
                and not (sigma >= 0.4 and T >= 2) 
                and r * T >= 0.002  # Heuristic
                ]
results = opt.run_parallel_processing(param_grid)

#Save results as csv file:
filename = "CN_NAS_NTS_optimal_step_data.csv"
header = not os.path.exists(filename) 
results.to_csv(filename, mode="a", header=header, index=False)


12528


'\n#Save results as csv file:\nfilename = "CN_NAS_NTS_optimal_step_data.csv"\nheader = not os.path.exists(filename) \nresults.to_csv(filename, mode="a", header=header, index=False)\n'

### Testing the Continuous Mapping for Unseen Points

To evaluate the effectiveness of the continuous mapping of `(NAS, NTS)`, we test the optimizer on option parameters that were **not explicitly part of the training dataset**. This checks whether the fitted mapping can provide near-optimal grid sizes for new points within the convex hull of the training data.

For each test case, we compute the Crank–Nicolson price using the predicted `(NAS, NTS)` and compare it to the Black–Scholes analytical price. We focus on the **relative error**:

$
\text{Relative Error} = \frac{|V_\text{CN} - V_\text{BS}|}{V_\text{BS}}
$

Preliminary results show that the relative error remains **under 10%**, demonstrating that the continuous mapping generalizes well to unseen points and provides accurate grid parameters for Crank–Nicolson pricing.


In [40]:

#Load in generated data
results_cn = pd.read_csv("CN_NAS_NTS_optimal_step_data.csv")  
results_cn.columns = ['K', 'T', 'S', 'sigma', 'NAS', 'NTS', 'Error']

optimizer = opt.fit_to_data(results_cn)
testNAS, testNTS = optimizer(50, *results_cn.iloc[10][1:4])
kk,tt, ss, sig = 50, *results_cn.iloc[10][1:4]
#We pass this into cn and then we pass the same conditions into bs and see what happens
bs_p = fd.black_scholes_call(ss, kk, tt, r, sig)
cn_p = fd.price_derivative_cn(testNTS, testNAS, kk, r, tt, ss, sig)
print(bs_p, cn_p)
print(np.abs(bs_p - cn_p))
print(np.abs(bs_p - cn_p)/ bs_p)

[0.005      0.         0.01510015 0.3       ]
[22.] [22.69230769]
0.00802139607001856 0.007228276839108007
0.0007931192309105538
0.09887546057911072
