<a href="https://colab.research.google.com/github/ashish1610dhiman/CSE8803_DLT_Project/blob/main/0_gen_simulated_brownian_data.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import torch
import matplotlib.pyplot as plt
import numpy as np
from itertools import product
from scipy.stats import norm
import pickle
import yaml
from google.colab import drive
import os
from matplotlib.animation import FuncAnimation, PillowWriter


In [None]:
#exp params
VERSION = "v0" #meta for saving
drive.mount('/content/drive')
EXP_PATH = '/content/drive/My Drive/call_prices_conditional_flow/'

In [None]:
SEED = 77 #random-seed
np.random.seed(SEED)
torch.manual_seed(SEED)

In [None]:
# Parameters for GBM paths
S0 = 25.0       # initial price
T = 1.15          # total time (in years)
dt = 1/252      # step size (in year)
n_steps = int(T/dt)

#GBM params
mu_low,mu_high = 0.05,0.15 #drift of gbm
sigma_low,sigma_high = 0.1,0.2 #volatility of GBM
n_mu,n_sigma = 10,10
N_PATHS = n_mu * n_sigma     # how many paths to simulate

# Parameters for BS Call prices
K_GRID_SIZE = 25 #grid-size of strike prices
M = 10 #number of time-to-maturities for call price
BURNIN_WINDOW = 50 #burn-in period, call prices beyond this,
RISK_FREE_RATE = 0.02
L = n_steps-BURNIN_WINDOW # time-steps of gbm path where we will make predictions
M_steps = np.arange(1, M+1)

In [None]:
params = {
    "gbm": {
        "S0": S0,
        "T": T,
        "dt": dt,
        "N_PATHS": N_PATHS,
        "n_steps": n_steps,
        "mu_low": mu_low,
        "mu_high": mu_high,
        "sigma_low": sigma_low,
        "sigma_high": sigma_high,
        "n_mu": n_mu,
        "n_sigma": n_sigma
    },
    "bs_call": {
        "K_GRID_SIZE": K_GRID_SIZE,
        "M": M,
        "BURNIN_WINDOW": BURNIN_WINDOW,
        "RISK_FREE_RATE": RISK_FREE_RATE,
        "L": L,
        "M_steps": M_steps
    }
}

# Save to YAML
with open(f"{EXP_PATH}/dataset_params_{VERSION}.yaml", "w") as f:
    yaml.dump(params, f, default_flow_style=False)

# Step 1: Get simulated stock price by GBM

**Geometric Brownian Motion**

- **SDE:**  
$$dS_t = \mu\,S_t\,dt + \sigma\,S_t\,dW_t$$

- **Euler–Maruyama discretization:**

$$
S_{t+\Delta t}
= S_t \exp\!\Bigl((\mu - \tfrac12\sigma^2)\,\Delta t
+ \sigma\,\sqrt{\Delta t}\,Z_{t+1}\Bigr),
\quad Z_{t+1}\sim{N}(0,1).
$$


In [None]:
def simulate_gbm_paths(S0, mu, sigma, dt, n_steps, n_paths, seed=None):
    """
    Generate n_paths of Geometric Brownian Motion.
    Args:
        S0 (float): Initial price. | mu (float): Drift. |sigma (float): Volatility.
        dt (float): Time‐step size | n_steps (int): Number of steps | n_paths (int): Number of paths.
        seed (int, optional): random seed.
    Returns:
        np.ndarray[n_steps, n_paths]: Simulated price paths.
    """
    paths = np.zeros((n_steps, n_paths))
    paths[0] = S0
    for t in range(1, n_steps):
        z = np.random.randn(n_paths)
        paths[t] = paths[t-1] * np.exp((mu - 0.5*sigma**2)*dt + sigma*np.sqrt(dt)*z)
    return paths


In [None]:
# Simulate for examplemu/sigma
mu = 0.1        # drift
sigma = 0.2      # volatility

gbm_paths = simulate_gbm_paths(S0, mu, sigma, dt, n_steps, N_PATHS) #np.ndarray[n_steps, n_paths]
gbm_paths.shape

In [None]:
np.random.choice(len(gbm_paths),size=5,replace=False)

In [None]:
n_path_plot = 5 #random paths to plot
time = np.linspace(0, T, n_steps)
plt.figure(figsize=(8, 4))
for i in (np.random.choice(gbm_paths.shape[1],size=n_path_plot,replace=False)):
    plt.plot(time, gbm_paths[:, i], label=f"Path {i+1}")
plt.xlabel("Time (years)")
plt.ylabel("Price")
plt.title("Simulated GBM Stock Price Paths")
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()

## generate path across a grid of mu,sigma

$$
{(\mu,\sigma) | \mu \in [\mu_L,\mu_H], [\sigma_L,\sigma_H]}
$$

In [None]:
mu_vals    = np.round(np.linspace(mu_low, mu_high, n_mu), 2)
sigma_vals = np.round(np.linspace(sigma_low, sigma_high, n_sigma), 2)
print(sigma_vals)
mu_mesh, sigma_mesh  = np.meshgrid(mu_vals, sigma_vals)

cross_grid      = np.column_stack([mu_mesh.ravel(), sigma_mesh.ravel()])
cross_grid.shape

In [None]:
gbm_paths_grid = dict()
for mu_i,sigma_i in cross_grid:
    gbm_paths_grid[(mu_i,sigma_i)] = simulate_gbm_paths(S0, mu_i, sigma_i, dt, n_steps, N_PATHS)

In [None]:
len(gbm_paths_grid)

In [None]:
gbm_paths_grid[(0.07,0.1)].shape #n_steps,n_paths

In [None]:
rolling_window = 70
sigma_true = 0.16
gbm_paths = gbm_paths_grid[(0.15,sigma_true)]
returns_gbm_paths = np.log(gbm_paths[1:] / gbm_paths[:-1])

rolling_sd_est = np.array([returns_gbm_paths[:i].std(axis=0,ddof=1).mean() for i in range(rolling_window,n_steps)])
rolling_sd_est = rolling_sd_est * np.sqrt(1.0 / dt)
plt.plot(rolling_sd_est,label="rolling sigma estimate")
plt.axhline(sigma_true,color='r',label="true sigma")
plt.legend()
plt.show()

# Step 2: Get BS-call prices for the above simulated path
- for each path i, we will get call price for multiple strike prices and tau (K,tau) at each point (t) in the path after some intial burnin-period(B); L=n_steps-B

In [None]:
def bs_call_price(S, K, tau, r, sigma):
    """
    Vectorized Black–Scholes call price for arrays S (spots) and K (strikes) at time to maturities tau.
    S: array shape (...,) -- gbm-path
    K: array shape (M,) -- array of strike prices
    tau: scalar -- time to maturity
    returns: array shape (..., M)
    """
    S = np.atleast_1d(S)[..., None]  # shape (..., 1)
    K = np.atleast_1d(K)[None, ...]  # shape (1, M)
    sqrt_tau = np.sqrt(tau)
    d1 = (np.log(S / K) + (r + 0.5 * sigma**2) * tau) / (sigma * sqrt_tau)
    d2 = d1 - sigma * sqrt_tau
    return S * norm.cdf(d1) - K * np.exp(-r * tau) * norm.cdf(d2)

In [None]:
M_steps

In [None]:
call_prices_grid=dict()
for (mu_i,sigma_i),gbm_paths in gbm_paths_grid.items():
  #get min/max srtike
  K_min,K_max = np.floor(gbm_paths.min()),np.ceil(gbm_paths.max())
  K = np.linspace(K_min, K_max, K_GRID_SIZE)
  #gbm-path after burn-in
  gbm_paths_burnin = gbm_paths[BURNIN_WINDOW:]
  #init a nan array
  call_prices_i = np.full((L, N_PATHS, K_GRID_SIZE, M), np.nan)
  for t in range(L):
    sigma_estimate_t = gbm_paths[:i-1].std(axis=0,ddof=1).mean()
    max_m = min(M, L - 1 - t)
    for idx_m in range(max_m):
        tau = M_steps[idx_m] * dt
        call_prices_i[t, :, :, idx_m] = bs_call_price(gbm_paths_burnin[i], K, tau, RISK_FREE_RATE, sigma_estimate_t)
        call_prices_grid[(mu_i,sigma_i)] = call_prices_i

# Step 3: Combine the dict and save to google-drive

In [None]:
len(cross_grid),len(gbm_paths_grid),len(call_prices_grid)

In [None]:
combined_data_dict= {
    k:{"gbm_paths":gbm_paths_grid[k],"call_prices":call_prices_grid[k]}
    for k in gbm_paths_grid.keys()
}

# os.makedirs(f"{EXP_PATH}/dataset_{VERSION}.pkl", exist_ok=True)
with open(f"{EXP_PATH}/dataset_{VERSION}.pkl", 'wb') as f:
    pickle.dump(combined_data_dict, f)

In [None]:
cross_grid[0]

In [None]:
gbm_paths_grid[tuple(cross_grid[0])].shape

In [None]:
call_prices_grid[tuple(cross_grid[0])].shape

In [None]:
#total data points = 177*50*10*10*100
(177*50*10*10*100)/1e6

# Step 4: Plotting and experimenting

In [None]:
idx=7
print(cross_grid[idx])
gbm_paths_grid[tuple(cross_grid[idx])].shape
call_prices = call_prices_grid[tuple(cross_grid[idx])]

In [None]:
# Plot example: for path 0, strike index 4, show call price surface over (t, tau)
t_vals = np.arange(L)
tau_vals = M_steps * dt
strike_idx = 3
path_idx = 0

# Prepare data for plotting
surface = call_prices[:, path_idx, strike_idx, :]  # shape (L, M)

fig, axs = plt.subplots(1, 2, figsize=(14, 4.3))

# Left panel: stock price path
axs[0].plot(np.arange(L), gbm_paths[BURNIN_WINDOW:, path_idx], label=f'Path {path_idx}')
axs[0].set_xlabel('Time Step')
axs[0].set_ylabel('Price')
axs[0].set_title(f'Stock Path {path_idx}')
axs[0].grid(True)
axs[0].legend()

# Right panel: heatmap of call price surface
im = axs[1].imshow(
    surface.T,
    origin='lower',
    aspect='auto',
    extent=[0, L-1, tau_vals[0], tau_vals[-1]]
)
cbar = fig.colorbar(im, ax=axs[1], label='Call Price')
axs[1].set_xlabel('Time Step t')
axs[1].set_ylabel('Time to Maturity τ')
axs[1].set_title(f'Call Prices for Path {path_idx}, Strike K={K[strike_idx]:.1f}')

plt.tight_layout()
plt.show()

In [None]:
call_prices.shape

In [None]:
np.isnan(call_prices[0,:,:,:]).sum()

In [None]:
np.isnan(call_prices[:,0,:,:]).sum()

In [None]:
np.isnan(call_prices[:,0,0,:]).sum()

In [None]:
np.isnan(call_prices[:,0,0,1]).sum()

In [None]:
call_prices[:,0,0,]

In [None]:
from mpl_toolkits.mplot3d import Axes3D  # needed for projection='3d'


# Create X, Y coordinates to match Z’s indices
#   X varies along the columns (axis=1), Y along the rows (axis=0)
Z = call_prices[52, 4]
x = np.arange(Z.shape[1])
y = np.arange(Z.shape[0])
X, Y = np.meshgrid(x, y)

# Plot
fig = plt.figure(figsize=(8,6))
ax  = fig.add_subplot(111, projection='3d')
surf = ax.plot_surface(X, Y, Z, cmap='viridis', edgecolor='none')
ax.view_init(elev=30, azim=45)
ax.set_xlabel('Column index')
ax.set_ylabel('Row index')
ax.set_zlabel('Call price')
ax.set_title('Surface of call_prices[52,4]')

fig.colorbar(surf, shrink=0.5, aspect=10, label='Price')
plt.show()


In [None]:
path_idx=0
surface_data = call_prices[:, path_idx, :, :]  # (time, strikes, maturities)

# --- 4. Create animation with x-axis=Strike, y-axis=Maturity ---
fig, ax = plt.subplots(figsize=(6, 5))
extent = [K.min(), K.max(), tau_vals.min(), tau_vals.max()]
im = ax.imshow(
    surface_data[0].T,
    origin='lower',
    aspect='auto',
    extent=extent,
    vmin=np.nanmin(surface_data),
    vmax=np.nanmax(surface_data)
)
ax.set_xlabel('Strike K')
ax.set_ylabel('Time to Maturity τ')
title = ax.set_title('Call Price Surface at t=0')
cbar = fig.colorbar(im, ax=ax, label='Call Price')

def update(frame):
    im.set_data(surface_data[frame].T)
    title.set_text(f'Call Price Surface at t={frame}')
    return im, title

anim = FuncAnimation(fig, update, frames=L, blit=True)
gif_path = f'./call_price_evolution_{path_idx}.gif'
anim.save(gif_path, writer=PillowWriter(fps=25))
plt.close(fig)