In [7]:
import pandas as pd
import matplotlib.pyplot as plt

# Load the data
df = pd.read_csv("4M.csv")

# Compute moneyness measures
df['MoneynessRatio'] = df['S'] / df['cv']
df['MoneynessPercentage'] = (df['S'] - df['cv']) / df['cv'] * 100

# Define thresholds for moneyness categorization.
# (These thresholds are examples – adjust as necessary for your application.)
def moneyness_category(ratio):
    if ratio < 0.6:
        return "Deep OTM"
    elif ratio < 0.8:
        return "OTM"
    elif ratio <= 1.2:
        return "ATM"
    elif ratio < 1.6:
        return "ITM"
    else:
        return "Deep ITM"

# Apply the categorization to create a new column.
df['MoneynessCategory'] = df['MoneynessRatio'].apply(moneyness_category)

# Print summary counts for each category.
print("Moneyness Category Counts:")
print(df['MoneynessCategory'].value_counts())

Moneyness Category Counts:
MoneynessCategory
Deep OTM    2083788
OTM          571763
ATM          565988
Deep ITM     501696
ITM          276765
Name: count, dtype: int64


In [13]:
import math
def moneyness_category(ratio):
    if ratio < 0.6:
        return "Deep OTM"
    elif ratio < 0.8:
        return "OTM"
    elif ratio <= 1.2:
        return "ATM"
    elif ratio < 1.6:
        return "ITM"
    else:
        return "Deep ITM"

# apply it
df["MoneynessCategory"] = df["MoneynessRatio"].apply(moneyness_category)

# prepare dict for bond_data if still needed
bond_data_by_category = {}
for cat in ["Deep OTM", "OTM", "ATM", "ITM", "Deep ITM"]:
    subset = df[df["MoneynessCategory"] == cat]
    if subset.empty:
        print(f"No rows for {cat!r}")
        continue
    row = subset.iloc[0]
    bond_data_by_category[cat] = dict(
        Pr   = float(row["Pr"]),
        T    = float(row["T"]),
        sigma= float(row["IVOL"]) / 100.0,
        r    = float(row["r"]),
        d    = float(row["d"]),
        rc   = float(row["CDS"]) / 1e4,
        cv   = float(row["cv"]),
        cp   = int(row["cp"]),
        cfq  = int(row["cfq"]),
        tfc  = float(row["tfc"]),
        S    = float(row["S"])
    )

# slice out each bucket into both a pandas DF and GPU arrays
df_deep_otm = df[df["MoneynessCategory"] == "Deep OTM"].copy()
df_otm      = df[df["MoneynessCategory"] == "OTM"].copy()
df_atm      = df[df["MoneynessCategory"] == "ATM"].copy()
df_itm      = df[df["MoneynessCategory"] == "ITM"].copy()
df_deep_itm = df[df["MoneynessCategory"] == "Deep ITM"].copy()

# move S and cv columns onto GPU for each bucket
cupy_S_deep,   cupy_cv_deep   = cup.asarray(df_deep_otm["S"].values),   cup.asarray(df_deep_otm["cv"].values)
cupy_S_otm,    cupy_cv_otm    = cup.asarray(df_otm["S"].values),        cup.asarray(df_otm["cv"].values)
cupy_S_atm,    cupy_cv_atm    = cup.asarray(df_atm["S"].values),        cup.asarray(df_atm["cv"].values)
cupy_S_itm,    cupy_cv_itm    = cup.asarray(df_itm["S"].values),        cup.asarray(df_itm["cv"].values)
cupy_S_deepIT, cupy_cv_deepIT = cup.asarray(df_deep_itm["S"].values),   cup.asarray(df_deep_itm["cv"].values)

# optional: quick sanity check
print("Rows per bucket:")
print("Deep OTM:",  len(df_deep_otm))
print("OTM:",       len(df_otm))
print("ATM:",       len(df_atm))
print("ITM:",       len(df_itm))
print("Deep ITM:",  len(df_deep_itm))

Rows per bucket:
Deep OTM: 2083788
OTM: 571763
ATM: 565988
ITM: 276765
Deep ITM: 501696


In [79]:
import math
import torch

def price_torch(bond, dx, dt, device="cuda"):
    """
    Price a TF convertible‐bond via Crank–Nicolson on the GPU with PyTorch.
    
    bond : dict with keys
      Pr, T, sigma, r, d, rc, cv, cp, cfq, tfc, S
    dx, dt : grid steps
    device  : "cuda" or "cpu"
    """
    # Unpack
    Pr, T, sigma, r, d, rc = (
        bond["Pr"], bond["T"], bond["sigma"],
        bond["r"], bond["d"], bond["rc"]
    )
    cv, cp_amt, cfq, tfc, S_target = (
        bond["cv"], bond["cp"], bond["cfq"],
        bond["tfc"], bond["S"]
    )
    cp_amt = Pr * cp_amt / cfq  # coupon per step

    # Grid
    S_max = 2 * cv
    M = max(1, int(math.log(S_max) / dx))
    N = max(1, int(T / dt))
    dx_ = math.log(S_max) / M
    dt_ = T / N
    n_int = M - 1

    # Prepare coupon schedule
    cp_steps = []
    if cfq * T > 0:
        period = int(N / (cfq * T))
        offset = int(tfc * period)
        cp_steps = list(range(offset, N + 1, period))

    # Allocate state: U[:,0]=E, U[:,1]=B
    U = torch.zeros((M+1, 2), device=device)

    # Precompute CN half‐step matrices
    D = sigma**2 / 2
    drift = r - d - sigma**2 / 2
    hh = dt_ / 2
    dx2 = dx_**2
    ac = D / dx2 - drift / (2 * dx_)
    cc = D / dx2 + drift / (2 * dx_)
    bu = -2 * D / dx2 - r
    bv = -2 * D / dx2 - (r + rc)

    db   = torch.tensor([[1-hh*bu, -hh*r],
                         [0,       1-hh*bv]], device=device)
    lb   = torch.tensor([[-hh*ac, 0],
                         [0,     -hh*ac]], device=device)
    ub   = torch.tensor([[-hh*cc, 0],
                         [0,     -hh*cc]], device=device)
    db_r = torch.tensor([[1+hh*bu, hh*r],
                         [0,       1+hh*bv]], device=device)
    lb_r = torch.tensor([[hh*ac, 0],
                         [0,     hh*ac]], device=device)
    ub_r = torch.tensor([[hh*cc, 0],
                         [0,     hh*cc]], device=device)

    # Terminal payoff at t = T
    i = torch.arange(M+1, device=device)
    s = torch.exp(i * dx_)
    # Equity leg E
    U[:,0] = torch.where(i==0,   0.0,
                 torch.where(i==M, Pr * s[M]/cv + cp_amt,
                   torch.where(s>=cv, Pr*s/cv + cp_amt, 0.0)))
    # Debt leg B
    U[:,1] = torch.where(i==0,   Pr,
                 torch.where(i==M, 0.0,
                   torch.where(s>=cv, 0.0, Pr)))

    # Time-stepping backwards
    for n in range(N-1, -1, -1):
        # Build RHS Bb for interior nodes 1..M-1
        Cc = U @ db_r.T
        Lc = torch.cat([U[0:1], U[:-1]], dim=0) @ lb_r.T
        Rc = torch.cat([U[1:], U[-1:]], dim=0) @ ub_r.T
        Bb = Cc[1:-1] + Lc[1:-1] + Rc[1:-1]

        # Add coupon to debt leg
        if n in cp_steps:
            Bb[:,1] += cp_amt

        # Build tridiagonal blocks As, Ad, Au
        As = lb.unsqueeze(0).expand(n_int, -1, -1).clone()
        Ad = db.unsqueeze(0).expand(n_int, -1, -1).clone()
        Au = ub.unsqueeze(0).expand(n_int, -1, -1).clone()

        # Forward elimination
        for j in range(n_int-1):
            inv_Ad = torch.inverse(Ad[j])
            Mj     = As[j+1] @ inv_Ad
            Ad[j+1] -= Mj @ Au[j]
            Bb[j+1] -= (Mj @ Bb[j])

        # Backward substitution into X
        X = torch.zeros_like(Bb)
        X[-1] = torch.inverse(Ad[-1]) @ Bb[-1]
        for j in range(n_int-2, -1, -1):
            tmp = Bb[j] - (Au[j] @ X[j+1])
            X[j] = torch.inverse(Ad[j]) @ tmp

        # Copy interior back to U
        U[1:-1, 0] = X[:,0]
        U[1:-1, 1] = X[:,1]

        # Re-apply boundaries
        disc = math.exp((r + rc) * dt_)
        U[0,0], U[0,1]   = 0.0, Pr / disc
        U[M,0], U[M,1] = Pr * s[M]/cv + cp_amt, 0.0

    # Return Equity component at S_target
    idx = min(M, max(0, int(round(math.log(S_target)/dx))))
    return U[idx,0].item()



In [80]:
try:
    result = price_torch(bond, dx, dt, device="cuda")
except RuntimeError:
    print("GPU failed—falling back to CPU")
    result = price_torch(bond, dx, dt, device="cpu")



NameError: name 'dx' is not defined

In [81]:
import math
import numpy as np
from numba import cuda, float64, int64

# ──────────────── Host setup ────────────────

def setup_device(b, dx, dt):
    """
    Compute grid parameters, allocate device arrays, 
    and prepare coupon time indices.
    
    b: bond dict with keys Pr, T, sigma, r, d, rc, cv, cp, cfq, tfc, S
    dx, dt: spatial/log-step and time-step sizes
    """
    if not cuda.is_available():
        raise RuntimeError("No CUDA device available. Please run on a machine with a CUDA GPU and proper driver installed.")
    # Grid dimensions
    S_max = 2.0 * b['cv']
    M = max(1, int(math.log(S_max) / dx))
    N = max(1, int(b['T'] / dt))
    dx_ = math.log(S_max) / M
    dt_ = b['T'] / N
    n_int = M - 1

    # Allocate device arrays
    Uo_d = cuda.device_array(M+1, dtype=np.float64)
    Vo_d = cuda.device_array(M+1, dtype=np.float64)
    As_d = cuda.device_array((n_int, 2, 2), dtype=np.float64)
    Ad_d = cuda.device_array((n_int, 2, 2), dtype=np.float64)
    Au_d = cuda.device_array((n_int, 2, 2), dtype=np.float64)
    Bb_d = cuda.device_array((n_int, 2),    dtype=np.float64)
    X_d  = cuda.device_array((n_int, 2),    dtype=np.float64)

    # Build coupon time‐step indices
    cp_array = np.empty(0, dtype=np.int64)
    if b['cfq'] * b['T'] > 0:
        period = int(N / (b['cfq'] * b['T']))
        offset = int(b['tfc'] * period)
        cp_array = np.arange(offset, N+1, period, dtype=np.int64)

    # Precompute half-step CN coefficients on host
    D     = b['sigma']**2 / 2.0
    drift = b['r'] - b['d'] - b['sigma']**2/2.0
    dx2   = dx_**2
    hh    = dt_ / 2.0
    ac    = D/dx2 - drift/(2*dx_)
    cc    = D/dx2 + drift/(2*dx_)
    bu    = -2*D/dx2 - b['r']
    bv    = -2*D/dx2 - (b['r'] + b['rc'])

    # Host-size 2×2 arrays
    def make(a00,a01,a10,a11): 
        return np.array([[a00,a01],[a10,a11]], dtype=np.float64)

    db   = make(1 - hh*bu,   -hh*b['r'],  0,                  1 - hh*bv)
    lb   = make(-hh*ac,       0,          0,                 -hh*ac)
    ub   = make(-hh*cc,       0,          0,                 -hh*cc)
    db_r = make(1 + hh*bu,    hh*b['r'],  0,                  1 + hh*bv)
    lb_r = make(hh*ac,        0,          0,                  hh*ac)
    ub_r = make(hh*cc,        0,          0,                  hh*cc)

    # Copy coeffs to device
    db_d   = cuda.to_device(db)
    lb_d   = cuda.to_device(lb)
    ub_d   = cuda.to_device(ub)
    db_r_d = cuda.to_device(db_r)
    lb_r_d = cuda.to_device(lb_r)
    ub_r_d = cuda.to_device(ub_r)

    return {
        'M': M, 'N': N, 'dx_': dx_, 'dt_': dt_,
        'Uo_d': Uo_d, 'Vo_d': Vo_d,
        'As_d': As_d, 'Ad_d': Ad_d, 'Au_d': Au_d,
        'Bb_d': Bb_d, 'X_d': X_d,
        'db_d': db_d, 'lb_d': lb_d, 'ub_d': ub_d,
        'db_r_d': db_r_d, 'lb_r_d': lb_r_d, 'ub_r_d': ub_r_d,
        'cp_array': cp_array
    }

# ──────────────── CUDA kernels ────────────────

@cuda.jit
def init_payoff_kernel(Uo, Vo, M, dx_, cv, Pr, cpt):
    i = cuda.grid(1)
    if i > M: return

    si = math.exp(i * dx_)
    if i == 0:
        Uo[i] = 0.0
        Vo[i] = Pr
    elif i == M:
        Uo[i] = Pr * si / cv + cpt
        Vo[i] = 0.0
    else:
        if si >= cv:
            Uo[i] = Pr * si / cv + cpt
            Vo[i] = 0.0
        else:
            Uo[i] = 0.0
            Vo[i] = Pr

@cuda.jit
def build_rhs_kernel(Uo, Vo, Bb, db_r, lb_r, ub_r, M, cp_amount, cp_flag):
    i = cuda.grid(1)
    if 1 <= i <= M-1:
        j = i - 1
        # center
        u_c = db_r[0,0]*Uo[i] + db_r[0,1]*Vo[i]
        v_c = db_r[1,0]*Uo[i] + db_r[1,1]*Vo[i]
        # left
        u_c += lb_r[0,0]*Uo[i-1] + lb_r[0,1]*Vo[i-1]
        v_c += lb_r[1,0]*Uo[i-1] + lb_r[1,1]*Vo[i-1]
        # right
        u_c += ub_r[0,0]*Uo[i+1] + ub_r[0,1]*Vo[i+1]
        v_c += ub_r[1,0]*Uo[i+1] + ub_r[1,1]*Vo[i+1]
        # coupon into debt leg
        if cp_flag == 1:
            v_c += cp_amount
        Bb[j,0] = u_c
        Bb[j,1] = v_c

# 2×2 utilities
@cuda.jit(device=True)
def inv2x2(A, out):
    a,b = A[0,0], A[0,1]
    c,d = A[1,0], A[1,1]
    det = a*d - b*c
    out[0,0] =  d/det
    out[0,1] = -b/det
    out[1,0] = -c/det
    out[1,1] =  a/det

@cuda.jit(device=True)
def matmul2x2(A, B, out):
    out[0,0] = A[0,0]*B[0,0] + A[0,1]*B[1,0]
    out[0,1] = A[0,0]*B[0,1] + A[0,1]*B[1,1]
    out[1,0] = A[1,0]*B[0,0] + A[1,1]*B[1,0]
    out[1,1] = A[1,0]*B[0,1] + A[1,1]*B[1,1]

@cuda.jit(device=True)
def matvec2x2(A, v, out):
    out[0] = A[0,0]*v[0] + A[0,1]*v[1]
    out[1] = A[1,0]*v[0] + A[1,1]*v[1]

@cuda.jit
def solve_tridiagonal_kernel(As, Ad, Au, Bb, X, n_int):
    # Only thread 0 per block
    if cuda.threadIdx.x == 0:
        # forward elim
        tmp_mat = cuda.local.array((2,2), dtype=float64)
        inv_mat = cuda.local.array((2,2), dtype=float64)
        for i in range(n_int-1):
            inv2x2(Ad[i], inv_mat)
            matmul2x2(As[i+1], inv_mat, tmp_mat)
            # Ad[i+1] -= tmp_mat @ Au[i]
            for p in range(2):
                for q in range(2):
                    Ad[i+1,p,q] -= (tmp_mat[p,0]*Au[i,0,q] + tmp_mat[p,1]*Au[i,1,q])
            # Bb[i+1] -= tmp_mat @ Bb[i]
            vec = cuda.local.array(2, dtype=float64)
            matvec2x2(tmp_mat, Bb[i], vec)
            Bb[i+1,0] -= vec[0];  Bb[i+1,1] -= vec[1]

        # backward sub
        inv2x2(Ad[n_int-1], inv_mat)
        matvec2x2(inv_mat, Bb[n_int-1], X[n_int-1])
        for i in range(n_int-2, -1, -1):
            # tmp = Bb[i] - Au[i]@X[i+1]
            vec = cuda.local.array(2, dtype=float64)
            matvec2x2(Au[i], X[i+1], vec)
            t0 = Bb[i,0] - vec[0];  t1 = Bb[i,1] - vec[1]
            inv2x2(Ad[i], inv_mat)
            X[i,0] = inv_mat[0,0]*t0 + inv_mat[0,1]*t1
            X[i,1] = inv_mat[1,0]*t0 + inv_mat[1,1]*t1

@cuda.jit
def copy_interior_kernel(X, Uo, Vo, M):
    i = cuda.grid(1)
    if i < M-1:
        Uo[i+1] = X[i,0]
        Vo[i+1] = X[i,1]

@cuda.jit
def apply_boundaries_kernel(Uo, Vo, M, dx_, cv, Pr, cpt, r, rc, dt2):
    # single-thread
    if cuda.threadIdx.x==0 and cuda.blockIdx.x==0:
        Uo[0] = 0.0
        Vo[0] = Pr / math.exp((r+rc)*dt2)
        si = math.exp(M*dx_)
        Uo[M] = Pr * si/ cv + cpt
        Vo[M] = 0.0

# ──────────────── Host driver ────────────────

def price_cuda(b, dx=0.1, dt=0.1):
    """
    b: bond param dict
    dx, dt: initial grid steps
    """
    # Setup device arrays & grid
    if not cuda.is_available():
        # Fallback to your original CPU solver:
        return price_cpu(b, dx, dt)
    ctx = setup_device(b, dx, dt)
    M, N = ctx['M'], ctx['N']
    cpt   = b['Pr'] * b['cp'] / b['cfq']
    dt2   = ctx['dt_'] / 2.0

    # Init payoff
    threads = 128
    blocks  = (M + threads - 1) // threads
    init_payoff_kernel[blocks, threads](
        ctx['Uo_d'], ctx['Vo_d'], M, ctx['dx_'], b['cv'], b['Pr'], cpt
    )
    cuda.synchronize()

    # Time‐stepping
    for n in range(N-1, -1, -1):
        cp_flag = 1 if n in ctx['cp_array'] else 0

        # RHS build
        build_rhs_kernel[blocks, threads](
            ctx['Uo_d'], ctx['Vo_d'], ctx['Bb_d'],
            ctx['db_r_d'], ctx['lb_r_d'], ctx['ub_r_d'],
            M, cpt, cp_flag
        )
        cuda.synchronize()

        # Solve tridiagonal
        solve_tridiagonal_kernel[1, 1](
            ctx['As_d'], ctx['Ad_d'], ctx['Au_d'],
            ctx['Bb_d'], ctx['X_d'], M-1
        )
        cuda.synchronize()

        # Copy back interior
        copy_interior_kernel[(M-1+threads-1)//threads, threads](
            ctx['X_d'], ctx['Uo_d'], ctx['Vo_d'], M
        )
        cuda.synchronize()

        # Reapply boundaries
        apply_boundaries_kernel[1,1](
            ctx['Uo_d'], ctx['Vo_d'], M, ctx['dx_'],
            b['cv'], b['Pr'], cpt, b['r'], b['rc'], dt2
        )
        cuda.synchronize()

    # Extract price at S_target
    idx = min(M, max(0, int(round(math.log(b['S'])/dx))))
    return ctx['Uo_d'].copy_to_host()[idx]




In [83]:
price = price_torch(bond, dx=0.1, dt=0.05, device="cpu")
print(price)

231.37010192871094


In [84]:
# ---------------- Part 5: GPU Convergence Test for Deep OTM ----------------
import cupy as cup  # Changed alias

tol = 1e-8
dx0, dt0 = 0.1, 0.1
price_list = []

# Deep OTM bond parameters (first index)
deep_otm_bond = bond_data_by_category["Deep OTM"]

for k in range(20):
    print(f"\n=== Iteration {k} ===")
    print(f"Testing dx={dx0:.6f}, dt={dt0:.6f}")
    
    # GPU pricing call
    p_new = price_once_gpu(dx=dx0, dt=dt0, bond_params=deep_otm_bond)
    
    print(f"Computed price: {p_new:.8f}")
    price_list.append((dx0, dt0, p_new))
    
    # Convergence check
    if k > 0:
        p_prev = price_list[-2][2]
        diff = abs(p_new - p_prev)
        print(f"Δ vs previous: {diff:.4e} (tol={tol:.1e})")
        
        if diff < tol:
            print(f"\n*** CONVERGED AFTER {k+1} ITERATIONS ***")
            print(f"Final difference: {diff:.4e} < {tol:.1e}")
            break
    
    # Halve grid spacing
    dx0 /= 2.0
    dt0 /= 2.0

# Final results
P_exact = price_list[-1][2]
dx_min, dt_min = price_list[-1][0], price_list[-1][1]

print("\n=== Final Results ===")
print(f"Exact price: {P_exact:.8f}")
print(f"Optimal dx:  {dx_min:.6f}")
print(f"Optimal dt:  {dt_min:.6f}")

# Explicit GPU cleanup
cup.get_default_memory_pool().free_all_blocks()


=== Iteration 0 ===
Testing dx=0.100000, dt=0.100000


CUDARuntimeError: cudaErrorIllegalAddress: an illegal memory access was encountered

In [65]:
# ───────────────────────────────────────────────────────────────────────────
# 0.  Build the parameter dictionary from the first Deep‑OTM row
# ───────────────────────────────────────────────────────────────────────────
row = df_deep_otm.iloc[0]

params_deep_otm = {
    "Pr"   : row["Pr"],
    "T"    : row["T"],
    "sigma": row["IVOL"] / 100,      # % → decimal
    "r"    : row["r"],
    "d"    : row["d"],
    "rc"   : row["CDS"] / 10_000,    # bp → decimal
    "cv"   : row["cv"],
    "cp"   : row["cp"],
    "cfq"  : row["cfq"],
    "tfc"  : row["tfc"],
    "S"    : row["S"],
}

# wrapper that calls the EXISTING price_once_gpu already defined earlier
def price_deep_otm_gpu(dx, dt):
    return price_once_gpu(dx, dt, params_deep_otm)

# ───────────────────────────────────────────────────────────────────────────
# 1.  Controlled‑Variable Test  (dt = dx   vs   dt = dx²)
# ───────────────────────────────────────────────────────────────────────────
import numpy as np
import matplotlib.pyplot as plt

dx_test = np.logspace(np.log10(dx_min*8), np.log10(dx_min), num=7)[::-1]

err_dt_eq_dx  = []
err_dt_eq_dx2 = []

for dx in dx_test:
    err_dt_eq_dx .append( abs(price_deep_otm_gpu(dx, dx   ) - P_exact) )
    err_dt_eq_dx2.append( abs(price_deep_otm_gpu(dx, dx**2) - P_exact) )

plt.figure(figsize=(6,4))
plt.loglog(dx_test, err_dt_eq_dx,  'o-',  label=r'dt = dx')
plt.loglog(dx_test, err_dt_eq_dx2, 's--', label=r'dt = dx$^2$')
plt.xlabel(r'$\Delta x$');  plt.ylabel(r'$|\,\mathrm{error}\,|$')
plt.title('Controlled‑Variable Test')
plt.grid(True, which='both');  plt.legend();  plt.show()

slope1 = np.polyfit(np.log(dx_test), np.log(err_dt_eq_dx ), 1)[0]
slope2 = np.polyfit(np.log(dx_test), np.log(err_dt_eq_dx2), 1)[0]
print(f"Slope (dt = dx)  : {slope1:.2f}")
print(f"Slope (dt = dx²) : {slope2:.2f}")


In [66]:
# Cell 7: Convergence Test for OTM
tol = 1e-8
dx0, dt0 = 0.1, 0.1
price_list = []
for k in range(20):
    print(f"OTM Iter {k}: dx={dx0:.5g}, dt={dt0:.5g}")
    p_new = price_otm_gpu(dx0, dt0)
    print(f"  Price = {p_new:.8f}")
    price_list.append((dx0, dt0, p_new))
    if k > 0 and abs(price_list[-1][2] - price_list[-2][2]) < tol:
        print(f"Converged at iter {k}")
        break
    dx0 /= 2; dt0 /= 2

P_exact_otm  = price_list[-1][2]
dx_otm, dt_otm = price_list[-1][0], price_list[-1][1]
print(f"OTM exact price = {P_exact_otm:.8f} at dx={dx_otm}, dt={dt_otm}")


OTM Iter 0: dx=0.1, dt=0.1


NameError: name 'price_otm_gpu' is not defined

In [None]:
# Cell 8: Controlled‐Variable Test for OTM
dx_vals = np.logspace(np.log10(dx_otm*10), np.log10(dx_otm), num=10)[::-1]
err_dx  = [abs(price_otm_gpu(dx, dt_otm) - P_exact_otm) for dx in dx_vals]
err_dt1 = [abs(price_otm_gpu(dx, dx)       - P_exact_otm) for dx in dx_vals]
err_dt2 = [abs(price_otm_gpu(dx, dx*dx)    - P_exact_otm) for dx in dx_vals]

plt.figure(figsize=(6,4))
plt.loglog(dx_vals,  err_dx,  'o-',  label="Error vs dx")
plt.loglog(dx_vals,  err_dt1, 's--', label="dt = dx")
plt.loglog(dx_vals,  err_dt2, '^--', label="dt = dx²")
plt.xlabel("dx"); plt.ylabel("|Error|")
plt.title("OTM Controlled‐Variable Test")
plt.grid(True, which="both"); plt.legend()
plt.show()


In [None]:
# Cell 9: Convergence Test for ATM
tol = 1e-8
dx0, dt0 = 0.1, 0.1
price_list = []
for k in range(20):
    print(f"ATM Iter {k}: dx={dx0:.5g}, dt={dt0:.5g}")
    p_new = price_atm_gpu(dx0, dt0)
    print(f"  Price = {p_new:.8f}")
    price_list.append((dx0, dt0, p_new))
    if k > 0 and abs(price_list[-1][2] - price_list[-2][2]) < tol:
        print(f"Converged at iter {k}")
        break
    dx0 /= 2; dt0 /= 2

P_exact_atm  = price_list[-1][2]
dx_atm, dt_atm = price_list[-1][0], price_list[-1][1]
print(f"ATM exact price = {P_exact_atm:.8f} at dx={dx_atm}, dt={dt_atm}")


In [None]:
# Cell 10: Controlled‐Variable Test for ATM
dx_vals = np.logspace(np.log10(dx_atm*10), np.log10(dx_atm), num=10)[::-1]
err_dx  = [abs(price_atm_gpu(dx, dt_atm) - P_exact_atm) for dx in dx_vals]
err_dt1 = [abs(price_atm_gpu(dx, dx)      - P_exact_atm) for dx in dx_vals]
err_dt2 = [abs(price_atm_gpu(dx, dx*dx)   - P_exact_atm) for dx in dx_vals]

plt.figure(figsize=(6,4))
plt.loglog(dx_vals,  err_dx,  'o-',  label="Error vs dx")
plt.loglog(dx_vals,  err_dt1, 's--', label="dt = dx")
plt.loglog(dx_vals,  err_dt2, '^--', label="dt = dx²")
plt.xlabel("dx"); plt.ylabel("|Error|")
plt.title("ATM Controlled‐Variable Test")
plt.grid(True, which="both"); plt.legend()
plt.show()


In [None]:
# Cell 11: Convergence Test for ITM
tol = 1e-8
dx0, dt0 = 0.1, 0.1
price_list = []
for k in range(20):
    print(f"ITM Iter {k}: dx={dx0:.5g}, dt={dt0:.5g}")
    p_new = price_itm_gpu(dx0, dt0)
    print(f"  Price = {p_new:.8f}")
    price_list.append((dx0, dt0, p_new))
    if k > 0 and abs(price_list[-1][2] - price_list[-2][2]) < tol:
        print(f"Converged at iter {k}")
        break
    dx0 /= 2; dt0 /= 2

P_exact_itm  = price_list[-1][2]
dx_itm, dt_itm = price_list[-1][0], price_list[-1][1]
print(f"ITM exact price = {P_exact_itm:.8f} at dx={dx_itm}, dt={dt_itm}")


In [None]:
# Cell 12: Controlled‐Variable Test for ITM
dx_vals = np.logspace(np.log10(dx_itm*10), np.log10(dx_itm), num=10)[::-1]
err_dx  = [abs(price_itm_gpu(dx, dt_itm) - P_exact_itm) for dx in dx_vals]
err_dt1 = [abs(price_itm_gpu(dx, dx)      - P_exact_itm) for dx in dx_vals]
err_dt2 = [abs(price_itm_gpu(dx, dx*dx)   - P_exact_itm) for dx in dx_vals]

plt.figure(figsize=(6,4))
plt.loglog(dx_vals,  err_dx,  'o-',  label="Error vs dx")
plt.loglog(dx_vals,  err_dt1, 's--', label="dt = dx")
plt.loglog(dx_vals,  err_dt2, '^--', label="dt = dx²")
plt.xlabel("dx"); plt.ylabel("|Error|")
plt.title("ITM Controlled‐Variable Test")
plt.grid(True, which="both"); plt.legend()
plt.show()


In [None]:
# Cell 13: Convergence Test for Deep ITM
tol = 1e-8
dx0, dt0 = 0.1, 0.1
price_list = []
for k in range(20):
    print(f"Deep ITM Iter {k}: dx={dx0:.5g}, dt={dt0:.5g}")
    p_new = price_deepitm_gpu(dx0, dt0)
    print(f"  Price = {p_new:.8f}")
    price_list.append((dx0, dt0, p_new))
    if k > 0 and abs(price_list[-1][2] - price_list[-2][2]) < tol:
        print(f"Converged at iter {k}")
        break
    dx0 /= 2; dt0 /= 2

P_exact_deep  = price_list[-1][2]
dx_deep, dt_deep = price_list[-1][0], price_list[-1][1]
print(f"Deep ITM exact price = {P_exact_deep:.8f} at dx={dx_deep}, dt={dt_deep}")


In [None]:
# Cell 14: Controlled‐Variable Test for Deep ITM
dx_vals = np.logspace(np.log10(dx_deep*10), np.log10(dx_deep), num=10)[::-1]
err_dx  = [abs(price_deepitm_gpu(dx, dt_deep) - P_exact_deep) for dx in dx_vals]
err_dt1 = [abs(price_deepitm_gpu(dx, dx)      - P_exact_deep) for dx in dx_vals]
err_dt2 = [abs(price_deepitm_gpu(dx, dx*dx)   - P_exact_deep) for dx in dx_vals]

plt.figure(figsize=(6,4))
plt.loglog(dx_vals,  err_dx,  'o-',  label="Error vs dx")
plt.loglog(dx_vals,  err_dt1, 's--', label="dt = dx")
plt.loglog(dx_vals,  err_dt2, '^--', label="dt = dx²")
plt.xlabel("dx"); plt.ylabel("|Error|")
plt.title("Deep ITM Controlled‐Variable Test")
plt.grid(True, which="both"); plt.legend()
plt.show()
