# Equity Risk Factor Modelling

## Setup

Assuming some input parameters

In [34]:
import numpy as np 
import polars as pl 
import matplotlib.pyplot as plt 


# Model parameters
S0: float = 100.0 # initial spot 
r: float = 0.02   # risk-free rate 
q: float = 0.01   # dividend yield 
T: float = 1.0    # horizon (years) 
n_paths: int = 10_000 # number of Monte Carlo paths 
n_steps: int = 252    # Number of steps in one year
time_grid = np.linspace(start=0.0, stop=T, num=n_steps + 1) # Number of interval is n_steps but number of points is n_steps + 1 

# Setting Seed
rng= np.random.default_rng(seed=42) 

# Polars DataFrame to hold simulated results 
results_schema: dict[str,any] = { "path_id": pl.Int32, "time": pl.Float64, "S": pl.Float64 } 
simulated_paths = pl.DataFrame(schema=results_schema) 

# Plot style
plt.style.use("seaborn-v0_8-whitegrid") 
print("Setup complete. Polars and parameters initialized.")

Setup complete. Polars and parameters initialized.


# Basic Model 
This will be used by Heston, SABR or Local Volatility

In [35]:
from abc import ABC, abstractmethod
import polars as pl
import numpy as np

class BaseModel(ABC):
    """Abstract base class for equity models."""

    def __init__(self, S0: float, r: float, q: float, T: float, n_paths: int, n_steps: int, rng: np.random.Generator) -> None:
        self.S0 = S0
        self.r = r
        self.q = q
        self.T = T
        self.n_paths = n_paths
        self.n_steps = n_steps
        self.dt = T / n_steps
        self.rng = rng

    @abstractmethod
    def simulate_paths(self) -> pl.DataFrame:
        """Simulate asset price paths and return as Polars DataFrame."""
        pass

# Monte Carlo Engine
Defines the engine

In [36]:
import numpy as np
import polars as pl

class MonteCarloEngine:
    """Monte Carlo engine for simulating Brownian paths."""

    def __init__(self, T: float, n_paths: int, n_steps: int, rng: np.random.Generator) -> None:
        self.T = T
        self.n_paths = n_paths
        self.n_steps = n_steps
        self.dt = T / n_steps
        self.sqrt_dt = np.sqrt(self.dt)
        self.rng = rng
        self.time_grid = np.linspace(0.0, T, n_steps + 1)

    def generate_brownian_increments(self) -> np.ndarray:
        """
        Generate standard Brownian motion increments dW for all paths and steps.
        Each path i has a sequence of random increments over time steps j:

            dW =
                   step_1   step_2   ...  step_N
            path_1   x11      x12    ...   x1N
            path_2   x21      x22    ...   x2N
            ...      ...      ...    ...   ...
            path_M   xM1      xM2    ...   xMN

        Each element x_ij represents one Brownian increment:
            dW_t = N(0,1) * sqrt(dt)

        Returns: array of shape (n_paths, n_steps)
        """
        dW = self.rng.standard_normal((self.n_paths, self.n_steps)) * self.sqrt_dt
        return dW

    def generate_brownian_paths(self) -> np.ndarray:
            """
            Generate Brownian motion paths W_t by cumulatively summing the increments dW.

            Each path i accumulates its increments over time steps j:

                dW =
                       step_1   step_2   ...  step_N
                path_1   x11      x12    ...   x1N
                path_2   x21      x22    ...   x2N
                ...      ...      ...    ...   ...
                path_M   xM1      xM2    ...   xMN

            The Brownian motion W_t is built as the cumulative sum of these increments.
            Including W_0 = 0

                W =
                        t0     t1       t2      ...    tN
                path_1   0    x11     x11+x12   ...   Sum x1j
                path_2   0    x21     x21+x22   ...   Sum x2j
                ...      ...    ...      ...    ...    ...
                path_M   0    xM1     xM1+xM2   ...   Sum xMj

            Mathematically:
                W_t = Sum_{k=1}^{t/dt} (N(0,1) * sqrt(dt))

            Returns: array of shape (n_paths, n_steps + 1)
            """
            dW = self.generate_brownian_increments()
            W = np.zeros((self.n_paths, self.n_steps + 1))
            W[:, 1:] = np.cumsum(dW, axis=1)
            return W



# Local Volatility Model

- Step 1 - 
- Step 2 - 
- Step xx

What is found below is the **backward** pricing PDE (like Black-Scholes) but with a local volatility $\sigma_{\text{loc}}^2(S,t)$  that varies with both the underlying and time.

It's not the **forward** Dupire PDE!

SDE --> replication/martingale -->  backward pricing PDE --> (transformation TO DO) --> forward Dupire PDE.

### Step 1 — Risk-Neutral Dynamics and Pricing Expectation

Under the risk-neutral measure $\mathbb{Q}$, an equity paying a continuous dividend yield $q$ and earning the risk-free rate $r$ follows the stochastic differential equation (SDE):

$$
dS_t = S_t \big[(r - q)\,dt + \sigma_{\text{loc}}(S_t,t)\,dW_t\big].
$$

Here:  
- $S_t$ is the spot price at time $t$,  
- $\sigma_{\text{loc}}(S_t,t)$ is the local volatility, a deterministic function of spot and time,  
- $W_t$ is a standard Brownian motion under $\mathbb{Q}$.  

The drift term $(r - q)$ ensures that the discounted process $e^{-(r - q)t}S_t$ is a martingale, enforcing the no-arbitrage condition.  

For a European payoff $f(S_T)$ maturing at $T$, the time-$t$ price is given by the discounted expectation under $\mathbb{Q}$:  

$$
V(S_t,t) = e^{-r(T-t)}\,\mathbb{E}^{\mathbb{Q}}_t[f(S_T)].
$$

This expectation can be computed either with Monte Carlo simulation or by solving the associated PDE.  

| Model | Volatility | Dimension | Notes |  
|:--|:--|:--|:--|  
| Black–Scholes | Constant $\sigma$ | 1 D in $S$ | Simple but cannot reproduce volatility smiles |  
| Local Volatility | Deterministic $\sigma_{\text{loc}}(S,t)$ | 1 D in $S$ | Fits the entire implied-volatility surface for vanilla options |  


### Step 2 — From SDE to the Local-Volatility Pricing PDE (via Martingale)

### 0️⃣ Set Up  

Start from the risk-neutral SDE  
$$
dS_t = S_t\big[(r-q)\,dt + \sigma_{\text{loc}}(S_t,t)\,dW_t\big].
$$

Let $V(S,t)$ be the price of a European claim with maturity $T$ and payoff $f(S_T)$.  

Apply Itô’s lemma to $V(S_t,t)$:
$$
dV = V_t\,dt + V_S\,dS + \tfrac12 V_{SS}\,(dS)^2 .
$$

Because $(dS)^2 = S^2 \sigma_{\text{loc}}^2(S,t)\,dt$ and $dS$ has the form above, substitute:

$$
dV = \Big[V_t + (r-q)S\,V_S + \tfrac12 \sigma_{\text{loc}}^2(S,t) S^2\,V_{SS}\Big]dt
      + \sigma_{\text{loc}}(S,t) S V_S\, dW .
$$

---

### 1️⃣ Discounted process

Now consider the discounted process $ e^{-rt}V(S_t,t) $. 
1) If we want to enforce the no arbitrage condition, the proces $ e^{-rt}V(S_t,t) $ has to be have drift zero. 
2) To find the drift, we need to see how $ e^{-rt}V(S_t,t) $ over an infinitesimal step
3) So we compute its differential using the product rule and Itô’s lemma:  

$$
d(e^{-rt}V) = e^{-rt}\big(-rV\,dt + dV\big).
$$

Substitute the expression for $dV$:  

$$
d(e^{-rt}V) = e^{-rt}\big[
  (V_t + (r - q)S\,V_S + \tfrac{1}{2}\sigma_{\text{loc}}^2S^2V_{SS} - rV)\,dt
  + \sigma_{\text{loc}}(S,t)S\,V_S\,dW_t
\big].
$$

---

### 2️⃣  No-arbitrage condition : drift zero
Under the risk-neutral measure, discounted asset prices and all tradable portfolios must be martingales. 

For $e^{-rt}V(S_t,t)$ to be a martingale, its *drift term* (the part multiplying $dt$) must be zero.  

Setting that term to zero gives the local-volatility pricing PDE.

$$
\boxed{
V_t + (r - q)S\,V_S + \tfrac{1}{2}\sigma_{\text{loc}}^2(S,t)S^2V_{SS} - rV = 0.
}
$$

This PDE holds for any derivative price $V(S,t)$ under the local volatility model, with terminal condition $V(S,T) = f(S)$


The PDE says the time decay $V_t$ is balanced by drift convection $(r-q)S V_S$, curvature (gamma) weighted by local variance $\tfrac12\sigma_{\text{loc}}^2 S^2 V_{SS}$, and discounting $-rV$.

## Step 2 — From SDE to the Local-Volatility Pricing PDE (via Replication Argument)

### 0️⃣ Set Up  

Start from the risk-neutral SDE  
$$
dS_t = S_t\big[(r-q)\,dt + \sigma_{\text{loc}}(S_t,t)\,dW_t\big].
$$

Let $V(S,t)$ be the price of a European claim with maturity $T$ and payoff $f(S_T)$.  

Applying Itô’s lemma to $V(S_t,t)$ gives  
$$
dV = V_t\,dt + V_S\,dS + \tfrac12 V_{SS}\,(dS)^2 .
$$

Because $(dS)^2 = S^2 \sigma_{\text{loc}}^2(S,t)\,dt$ and $dS$ has the form above, substitute:  
$$
dV = \Big[V_t + (r-q)S\,V_S + \tfrac12 \sigma_{\text{loc}}^2(S,t) S^2\,V_{SS}\Big]dt
      + \sigma_{\text{loc}}(S,t) S V_S\, dW_t .
$$

---

### 1️⃣ Build a hedged portfolio and differentiate
Form a self-financing portfolio  
$$
\Pi_t = V(S_t,t) - \Delta_t S_t ,
$$  
where $\Delta_t$ is the number of underlyings held short to hedge the option.  
Differentiate: $d\Pi_t = dV - \Delta_t\, dS_t.$  

Substitute $dV$ and $dS_t$:
$$
\begin{aligned}
d\Pi_t
&= \Big[V_t + (r-q)S V_S + \tfrac12\sigma_{\text{loc}}^2S^2V_{SS}\Big]dt
   + \sigma_{\text{loc}} S V_S\, dW_t
   - \Delta_t S\big[(r-q)\,dt + \sigma_{\text{loc}}\,dW_t\big] \\
&= \Big[V_t + (r-q)S(V_S-\Delta_t) + \tfrac12\sigma_{\text{loc}}^2S^2V_{SS}\Big]dt
   + \sigma_{\text{loc}} S (V_S-\Delta_t)\, dW_t .
\end{aligned}
$$

---

### 2️⃣ Eliminate randomness (the hedge)  
Choose $\Delta_t = V_S$ so the stochastic term vanishes:
$$
d\Pi_t = \Big[V_t + \tfrac12\sigma_{\text{loc}}^2S^2V_{SS}\Big]dt .
$$
The portfolio is now locally risk-free (no $dW_t$).

---

### 3️⃣ No-arbitrage condition : no risk only risk free
A riskless portfolio must earn the risk-free rate $r$:
$$
d\Pi_t = r\,\Pi_t\,dt = r\,[V - S V_S]\,dt .
$$
Equating the two expressions for $d\Pi_t$:
$$
V_t + \tfrac12\sigma_{\text{loc}}^2S^2V_{SS} = r(V - S V_S) .
$$
Simplify:
$$
\boxed{
V_t + (r - q)S\,V_S + \tfrac12\sigma_{\text{loc}}^2(S,t)S^2V_{SS} - rV = 0 .
}
$$

---

The PDE states that for a hedged portfolio, all randomness is removed and the remaining deterministic evolution of value must earn the risk-free rate.

# 1 - Option Prices

Assumes call market prices are sourced from BBG or Reuters

In [37]:
# Call Market Prices

import polars as pl

# --- Spot and curve parameters ---
S0 = 100.0
r = 0.02
q = 0.01

# --- Grid data ---
strikes = [80, 90, 100, 110, 120]
maturities = [0.25, 0.5, 1.0, 2.0]

# --- Call price grid - Assumed to be market-observed ---
prices = [
    [20.23, 21.07, 22.78, 25.41],
    [11.08, 12.36, 14.43, 17.94],
    [3.81,  5.38,  7.97,  11.93],
    [0.92,  1.76,  3.41,  6.20],
    [0.18,  0.47,  1.20,  2.93],
]

# --- Build Polars DataFrame in long format ---
df = (
    pl.DataFrame(prices,
                 schema=[str(t) + "Y" for t in maturities],
                 orient="row")
    # Add a new column called "Strike"
    .with_columns(pl.Series("Strike", strikes))
    # Keep "Strike" fixed as the index (that’s the x-dimension),
    .unpivot(index=["Strike"], 
            # Take all the maturity columns (e.g. "0.25Y", "0.5Y", etc.),
            on=[str(t) + "Y" for t in maturities],
            # Stack them vertically into two new columns:
            # "Maturity" → will hold "0.25Y", "0.5Y", etc.
            # "CallPrice" → will hold the actual numerical prices.
            variable_name="Maturity", value_name="CallPrice")
    # Remove the letter “Y” (so "0.25Y" → "0.25")
    # Convert the column to float (0.25, 0.5, 1.0, 2.0)
    .with_columns(pl.col("Maturity").str.replace("Y", "").cast(pl.Float64))
)

display(df[5:,:])


Strike,Maturity,CallPrice
i64,f64,f64
80,0.5,21.07
90,0.5,12.36
100,0.5,5.38
110,0.5,1.76
120,0.5,0.47
…,…,…
80,2.0,25.41
90,2.0,17.94
100,2.0,11.93
110,2.0,6.2


In [38]:
import plotly.graph_objects as go
import numpy as np

# Prepare data grid for plotting
strikes = df["Strike"].unique().sort().to_numpy()
maturities = df["Maturity"].unique().sort().to_numpy()

# Pivot to 2D matrix of call prices
price_matrix = (
    df.to_pandas()
      .pivot(index="Strike", columns="Maturity", values="CallPrice")
      .sort_index(ascending=True)
      .to_numpy()
)

# Build meshgrid for the surface
T_grid, K_grid = np.meshgrid(maturities, strikes)

# --- Prepare red market data points ---
x_points = df["Maturity"].to_numpy()
y_points = df["Strike"].to_numpy()
z_points = df["CallPrice"].to_numpy()

# Create interactive 3D surface + market dots
fig = go.Figure(data=[
    # Interpolated or gridded surface
    go.Surface(
        x=T_grid,
        y=K_grid,
        z=price_matrix,
        colorscale="Viridis",
        opacity=0.8,
        contours={"z": {"show": True, "usecolormap": True, "highlightcolor": "limegreen"}},
        name="Interpolated Surface",
    ),
    # Actual market points
    go.Scatter3d(
        x=x_points,
        y=y_points,
        z=z_points,
        mode="markers",
        marker=dict(size=5, color="red", symbol="circle"),
        name="Market Data",
    )
])

# Layout and labels
fig.update_layout(
    title="Market Call Price Surface (with Observed Data Points)",
    scene=dict(
        xaxis_title="Maturity (Years)",
        yaxis_title="Strike",
        zaxis_title="Call Price",
    ),
    width=850,
    height=650,
    template="plotly_white",
)

fig.show()


## 2 - From Price to Volatility

In [39]:
from scipy.optimize import brentq
from scipy.stats import norm
import numpy as np

# ------------------------------------------------------------
# 1. Black–Scholes call pricing function
# ------------------------------------------------------------
def bs_call_price(s0: float, K: float, r: float, q: float, sigma: float, T: float) -> float:
    if T <= 0.0:
        return max(0.0, s0 - K)
    vol_sqrt_t = sigma * np.sqrt(T)
    if vol_sqrt_t <= 0.0:
        return max(0.0, s0 * np.exp(-q * T) - K * np.exp(-r * T))
    d1 = (np.log(s0 / K) + (r - q + 0.5 * sigma**2) * T) / vol_sqrt_t
    d2 = d1 - vol_sqrt_t
    return s0 * np.exp(-q * T) * norm.cdf(d1) - K * np.exp(-r * T) * norm.cdf(d2)

# ------------------------------------------------------------
# 2. Implied volatility solver (Brent Method)
# ------------------------------------------------------------
def implied_vol_call(price: float, s0: float, K: float, r: float, q: float, T: float,
                     sigma_lo: float = 1e-6, sigma_hi: float = 5.0) -> float:
    """Solve for implied volatility via Brent's method."""
    if T <= 0.0:
        return 0.0

    def f(sig: float) -> float:
        return bs_call_price(s0, K, r, q, sig, T) - price

    try:
        return brentq(f, sigma_lo, sigma_hi, maxiter=200)
    except ValueError:
        return np.nan

# ------------------------------------------------------------
# 3. Apply to your existing DataFrame `df`
# ------------------------------------------------------------
df_iv = df.with_columns(
    pl.struct(["Strike", "Maturity", "CallPrice"]).map_elements(
        lambda x: implied_vol_call(
            price=x["CallPrice"],
            s0=S0,
            K=x["Strike"],
            r=r,
            q=q,
            T=x["Maturity"]
        )
    ).alias("ImpliedVol")
)

df_iv


Strike,Maturity,CallPrice,ImpliedVol
i64,f64,f64,f64
80,0.25,20.23,0.223709
90,0.25,11.08,0.218528
100,0.25,3.81,0.185455
110,0.25,0.92,0.194339
120,0.25,0.18,0.205256
…,…,…,…
80,2.0,25.41,0.245137
90,2.0,17.94,0.216983
100,2.0,11.93,0.200333
110,2.0,6.2,0.166612


# 3 - Interpolator
Given option prices, local volatility surface could be computed by using derivatives of call prices. 
However before doing that there has to be some interpolation because current grid is coarse and derivatives on discrete data are not the best. So the next task is to choose the right interpolation method. Check the notebook [`Volatility Surface`](Volatility%20Surface%20|%20Interpolation%20Methods.ipynb) for more info about the interpolation.

For Dupire Surface we are going to use a Bivariate Cubic Spline

In [40]:
import numpy as np
from scipy import interpolate
import plotly.graph_objects as go

# --------------------------------------------------------------------------
# --- Convert inputs to numpy arrays
strikes = np.array(strikes)
maturities = np.array(maturities)
prices = np.array(prices)

# --------------------------------------------------------------------------
# --- Create a fine meshgrid for interpolation
# 1) Fine grid of evaluation points --> 100×100 grid for surface
# These are going to be used in `bisplev` / `RectBivariateSpline`
K_fine = np.linspace(strikes.min(), strikes.max(), 100)
T_fine = np.linspace(maturities.min(), maturities.max(), 100)

# 2) Build 2D matrices of coordinates
# These are going to be used for the 3D surface plot
K_grid, T_grid = np.meshgrid(K_fine, T_fine)

# --------------------------------------------------------------------------
# --- Flatten for `bisplrep` (requires 1D x, y, z arrays)
# Used also for plotting the red market data dots
T_flat, K_flat = np.meshgrid(maturities, strikes)   # maturity first, strike second
x = T_flat.ravel()   # maturities (x-axis)
y = K_flat.ravel()   # strikes (y-axis)
z = prices.ravel()   # call prices (z-axis)

# --------------------------------------------------------------------------
# --- Interpolation (RectBivariateSpline)
# RectBivariateSpline builds a smooth 2D spline on a rectangular grid.
# Note: we transpose `prices` to match (maturities, strikes) orientation.
bivar_cubic = interpolate.RectBivariateSpline(maturities, strikes, prices.T)

# Evaluate the fitted surface on the fine grid (T_fine, K_fine) created earlier
C_bivar = bivar_cubic(T_fine, K_fine)

# --------------------------------------------------------------------------
# --- Create single 3D figure
fig = go.Figure()

# Add interpolated surface
fig.add_trace(
    go.Surface(
        x=T_grid,               # maturities
        y=K_grid,               # strikes
        z=C_bivar,              # interpolated call prices
        colorscale="Viridis",
        opacity=0.85,
        name="Interpolated Surface"
    )
)

# Add market data points (red dots)
fig.add_trace(
    go.Scatter3d(
        x=x, y=y, z=z,
        mode="markers",
        marker=dict(size=4, color="red"),
        name="Market data"
    )
)

# --------------------------------------------------------------------------
# --- Layout configuration
# Control figure size, margins, camera, and 3D scene axes
fig.update_layout(
    title_text="RectBivariateSpline Interpolated Surface",
    width=1000,   # wider figure
    height=500,  # taller figure
    template="plotly_white",
    margin=dict(l=40, r=20, b=60, t=40),  # prevent clipping of labels
    scene=dict(
        xaxis_title="Maturity (Years)",
        yaxis_title="Strike",
        zaxis_title="Call Price",
        camera=dict(
            eye=dict(x=1.6, y=1.8, z=0.9)  # set viewing angle
        )
    )
)

# --------------------------------------------------------------------------
# --- Show figure
fig.show()
