## Design and Hedging of a Structured Product

### Experiment 03: Hedging

In [1]:
from copy import deepcopy
import json
import numpy as np

In [2]:
class Valuation:
    def __init__(
        self,
        S0,
        r,
        T,
        v0,
        kappa,
        theta,
        sigma,
        rho,
        lamb,
        mu_j,
        sigma_j
    ):
        """
        Description
            Initialize Bates model parameters.

        Args:
            S0 (float): Spot price.
            r (float): Risk-free rate.
            T (float): Time to maturity (in years).
            v0 (float): Initial variance.
            kappa (float): Mean reversion rate.
            theta (float): Long-run variance.
            sigma (float): Volatility of volatility.
            rho (float): Correlation between stock and variance.
            lamb (float): Jump intensity (λ).
            mu_j (float): Mean of log jump size.
            sigma_j (float): Std. dev. of log jump size.
        """
        self.S0 = S0
        self.r = r
        self.T = T
        self.v0 = v0
        self.kappa = kappa
        self.theta = theta
        self.sigma = sigma
        self.rho = rho
        self.lamb = lamb
        self.mu_j = mu_j
        self.sigma_j = sigma_j

    def bs_characteristic_function(self, u, sigma):
        """
        Black Scholes Characteristic function for Sanity Check.

        Args:
            u (np.ndarray or float): Fourier variable.
            sigma (float): Representative volatility.

        Returns:
            float: Black Scholes Characteristic function value.
        """
        i = 1j
        return np.exp(i * u * np.log(self.S0) + 
                      (i * u * (self.r - 0.5 * sigma ** 2) - 
                       0.5 * u ** 2 * sigma ** 2) * self.T)

    def bates_characteristic_function(self, u):
        """
        Description
            Characteristic function φ(u) of log(S_T) under the Bates model.

        Args:
            u (np.ndarray or float): Fourier variable.

        Returns:
            np.ndarray or float: Characteristic function value at u.
        """
        i = 1j
        x = np.log(self.S0)

        d = np.sqrt(
            (self.rho * self.sigma * i * u - self.kappa) ** 2
            + self.sigma ** 2 * (i * u + u ** 2)
        )

        g = (self.kappa - self.rho * self.sigma * i * u - d) / (
            self.kappa - self.rho * self.sigma * i * u + d
        )

        C = (
            self.kappa * self.theta / self.sigma ** 2
            * ((self.kappa - self.rho * self.sigma * i * u - d) * self.T
               - 2 * np.log((1 - g * np.exp(-d * self.T)) / (1 - g)))
        )

        D = (
            (self.kappa - self.rho * self.sigma * i * u - d)
            / self.sigma ** 2
            * ((1 - np.exp(-d * self.T)) / (1 - g * np.exp(-d * self.T)))
        )

        jump_term = self.lamb * self.T * (
            np.exp(i * u * self.mu_j - 0.5 * self.sigma_j ** 2 * u ** 2) - 1
        )

        return np.exp(i * u * x + C + D * self.v0 + jump_term)

    def carr_madan_price(self, alpha=1.5, N=4096, eta=0.25):
        """
        Description
            Compute option prices using Carr-Madan method with FFT.

        Args:
            alpha (float): Dampening factor.
            N (int): Number of FFT points (must be power of 2).
            eta (float): Integration grid spacing.

        Returns:
            tuple: (strikes array, call prices array)
        """
        i = 1j
        lamb = 2 * np.pi / (N * eta)
        b = N * lamb / 2

        u = np.arange(N) * eta
        ku = -b + lamb * np.arange(N)

        phi = self.bates_characteristic_function(u - (alpha + 1) * i)
        # Black Scholes for Sanity Check.
        #phi = self.bs_characteristic_function(u - (alpha + 1) * i, sigma=0.25)
        psi = (
            np.exp(-self.r * self.T)
            * phi
            / (alpha ** 2 + alpha - u ** 2 + i * (2 * alpha + 1) * u)
        )

        # Construct Simpson's weights
        simpson_weights = np.ones(N)
        simpson_weights[1:N-1:2] = 4
        simpson_weights[2:N-1:2] = 2
        simpson_weights *= eta / 3

        # Apply Simpson's weights to the input
        fft_input = np.exp(i * b * u) * psi * simpson_weights
        
        fft_output = fft(fft_input).real

        call_prices = (np.exp(-alpha * ku) / np.pi) * fft_output
        strikes = np.exp(ku)

        return strikes, call_prices

    def interpolate_call_price(self, K_target):
        """
        Description
            Interpolate the call price at a specific strike using Carr-Madan FFT.

        Args:
            K_target (float): Strike price.

        Returns:
            float: Interpolated call price.
        """
        strikes, prices = self.carr_madan_price()
        try:
            interpolator = interp1d(
                strikes,
                prices,
                kind="cubic",
                bounds_error=True  # prevent silent extrapolation
            )
            return interpolator(K_target)

        except ValueError as e:
            print(f"[Warning] Strike {K_target} is outside interpolation range.")
            raise e

In [3]:
class Calibration(Valuation):
    def __init__(self, *args, **kwargs):
        """
        Inherits all attributes from Valuation.
        """
        super().__init__(*args, **kwargs)

    def calibrate_to_market(self, clean_calls_df, initial_guess, bounds):
        """
        Description
            Calibrate Bates model parameters to market call prices.

        Args:
            clean_calls_df (pd.DataFrame): DataFrame with 'strike' and 'mid' columns.
            initial_guess (list): Initial parameter guess in order:
                [v0, kappa, theta, sigma, rho, lamb, mu_j, sigma_j]
            bounds (list of tuples): Bounds for each parameter.

        Returns:
            dict: Optimized parameters, loss value, and success flag.
        """
        strikes = clean_calls_df["strike"].values
        market_prices = clean_calls_df["mid"].values

        def objective(params):
            self.v0, self.kappa, self.theta, self.sigma, self.rho, self.lamb, self.mu_j, self.sigma_j = params
            model_prices = []
            for K in strikes:
                try:
                    model_price = self.interpolate_call_price(K)
                except Exception:
                    model_price = np.nan
                model_prices.append(model_price)

            model_prices = np.array(model_prices)
            mask = ~np.isnan(model_prices)
            error = model_prices[mask] - market_prices[mask]
            return np.sum(error ** 2)

        result = minimize(
            objective,
            x0=initial_guess,
            bounds=bounds,
            method='L-BFGS-B'
        )

        # Update instance if successful
        if result.success:
            self.v0, self.kappa, self.theta, self.sigma, self.rho, self.lamb, self.mu_j, self.sigma_j = result.x

        return {
            "optimized_params": result.x,
            "loss": result.fun,
            "success": result.success,
            "message": result.message
        }
    
    def plot_calibrated_vs_market(self, clean_calls_df, full_grid=False, num_strikes=100):
        """
        Plot model-calibrated call prices vs market prices.
    
        Args:
            clean_calls_df (pd.DataFrame): Market data with 'strike' and 'mid' columns.
            full_grid (bool): If True, plot model over a full strike range [0.5*S0, 1.5*S0].
            num_strikes (int): Number of strike points in the full grid.
        """
        market_strikes = clean_calls_df["strike"].values
        market_prices = clean_calls_df["mid"].values

        # Plotting base: market points
        plt.figure(figsize=(10, 6))
        plt.plot(market_strikes, market_prices, 'o', label="Market", color="steelblue", alpha=0.7)

        if full_grid:
            # Generate a dense grid of strikes
            strike_grid = np.linspace(0.5 * self.S0, 1.5 * self.S0, num_strikes)
            model_prices = []
            for K in strike_grid:
                try:
                    price = self.interpolate_call_price(K)
                except Exception:
                    price = np.nan
                model_prices.append(price)
            plt.plot(strike_grid, model_prices, 'r-', label="Bates (Calibrated)", lw=2)
        else:
            # Just interpolate at the available strike points
            model_prices = []
            for K in market_strikes:
                try:
                    price = self.interpolate_call_price(K)
                except Exception:
                    price = np.nan
                model_prices.append(price)
            plt.plot(market_strikes, model_prices, 'r*', label="Bates (Calibrated)", markersize=6)

        plt.xlabel("Strike Price (K)")
        plt.ylabel("Call Price")
        plt.title("Bates Model Calibration: Market vs Calibrated Prices")
        plt.legend()
        plt.grid(True)
        plt.tight_layout()
        plt.show()
    
    def plot_residuals(self, clean_calls_df):
        """
        Plot residuals between model-calibrated call prices and market prices.

        Args:
            clean_calls_df (pd.DataFrame): DataFrame with 'strike' and 'mid' columns.
        """
        import matplotlib.pyplot as plt

        strikes = clean_calls_df["strike"].values
        market_prices = clean_calls_df["mid"].values
        model_prices = []

        for K in strikes:
            try:
                model_price = self.interpolate_call_price(K)
            except Exception:
                model_price = np.nan
            model_prices.append(model_price)

        model_prices = np.array(model_prices)
        residuals = model_prices - market_prices

        plt.figure(figsize=(10, 4))
        plt.axhline(0, color='black', linestyle='--', linewidth=1)
        plt.scatter(strikes, residuals, color='crimson', label='Residuals')
        plt.xlabel("Strike Price (K)")
        plt.ylabel("Residual (Model - Market)")
        plt.title("Bates Model Calibration Residuals")
        plt.grid(True, linestyle='--', alpha=0.4)
        plt.legend()
        plt.tight_layout()
        plt.show()

In [4]:
class MonteCarloExoticPricer(Calibration):
    def __init__(self, *args, **kwargs):
        """
        Inherits parameters from Calibration (which inherits from Valuation).
        """
        super().__init__(*args, **kwargs)

    def simulate_paths(self, N_paths=100000, N_steps=252, seed=42, vol_truncation="max"):
        """
        Simulate asset price paths under the Bates model.

        Args:
            N_paths (int): Number of Monte Carlo paths.
            N_steps (int): Number of time steps.
            seed (int): Random seed.
            vol_truncation (str): 'max' or 'abs' method for ensuring non-negative variance.

        Returns:
            np.ndarray: Simulated stock paths of shape (N_paths, N_steps + 1).
        """

        np.random.seed(seed)
        dt = self.T / N_steps

        S_paths = np.zeros((N_paths, N_steps + 1))
        v_paths = np.zeros((N_paths, N_steps + 1))

        S_paths[:, 0] = self.S0
        v_paths[:, 0] = self.v0

        EJ = np.exp(self.mu_j + 0.5 * self.sigma_j ** 2) - 1

        for t in range(1, N_steps + 1):
            v_prev = v_paths[:, t - 1]
            S_prev = S_paths[:, t - 1]

            Z1 = np.random.normal(size=N_paths)
            Z2 = np.random.normal(size=N_paths)
            dW_v = Z1 * np.sqrt(dt)
            dW_s = (self.rho * Z1 + np.sqrt(1 - self.rho ** 2) * Z2) * np.sqrt(dt)

            # Variance update (Euler)
            v_candidate = v_prev + self.kappa * (self.theta - v_prev) * dt + \
                        self.sigma * np.sqrt(np.maximum(v_prev, 0)) * dW_v

            if vol_truncation == "abs":
                v_new = np.abs(v_candidate)
            elif vol_truncation == "max":
                v_new = np.maximum(v_candidate, 0)
            else:
                raise ValueError("vol_truncation must be 'abs' or 'max'")

            v_paths[:, t] = v_new

            # Jump component
            N_jump = np.random.poisson(self.lamb * dt, size=N_paths)
            J = np.random.lognormal(self.mu_j, self.sigma_j, size=N_paths)

            # Log return
            log_return = (
                (self.r - self.lamb * EJ - 0.5 * v_prev) * dt +
                np.sqrt(np.maximum(v_prev, 0)) * dW_s +
                N_jump * np.log(J)
            )

            S_paths[:, t] = S_prev * np.exp(log_return)

        return S_paths
    
    def plot_path_comparison(self, paths_abs, paths_max, n_paths_plot=50):
        """
        Compare simulated stock price paths under two volatility truncation schemes.

        Args:
            paths_abs (np.ndarray): Paths using abs truncation.
            paths_max (np.ndarray): Paths using max truncation.
            n_paths_plot (int): Number of paths to show for each.
        """
        T = paths_abs.shape[1] - 1
        time = np.linspace(0, 1, T + 1)

        plt.figure(figsize=(14, 5))

        # Left: abs
        plt.subplot(1, 2, 1)
        plt.plot(time, paths_abs[:n_paths_plot].T, lw=0.5, alpha=0.7)
        plt.title("Simulated Paths (abs truncation)")
        plt.xlabel("Time (T normalized)")
        plt.ylabel("Stock Price")
        plt.grid(True)

        # Right: max
        plt.subplot(1, 2, 2)
        plt.plot(time, paths_max[:n_paths_plot].T, lw=0.5, alpha=0.7)
        plt.title("Simulated Paths (max truncation)")
        plt.xlabel("Time (T normalized)")
        plt.grid(True)

        plt.tight_layout()
        plt.show()


    def price_bonus_certificate(self, B, H, N_paths=100000, N_steps=252, seed=42, vol_truncation="max"):
        """
        Price the Bonus Certificate via Monte Carlo simulation.

        Args:
            B (float): Bonus level.
            H (float): Barrier level (lower barrier).
            N_paths (int): Number of simulation paths.
            N_steps (int): Number of time steps per path.
            seed (int): Random seed for reproducibility.
            vol_truncation (str): Volatility truncation method ("max" or "abs").

        Returns:
            float: Monte Carlo price (discounted expected payoff).
        """
        paths = self.simulate_paths(N_paths=N_paths, N_steps=N_steps, seed=seed, vol_truncation=vol_truncation)
        S_T = paths[:, -1]                  # Final prices
        barrier_hit = (paths <= H).any(axis=1)  # Barrier breach across time

        payoff = np.where(
            barrier_hit,
            S_T,                     # If barrier hit → get S_T
            np.where(S_T >= B, S_T, B)  # Else: if S_T >= B → S_T, else → B
        )

        return np.exp(-self.r * self.T) * payoff.mean()

    def evaluate_bonus_certificate_payoffs(self, paths, B, H):
        """
        Compute payoff per path for the Bonus Certificate.

        Args:
            paths (np.ndarray): Simulated stock price paths (N_paths x N_steps + 1).
            B (float): Bonus level.
            H (float): Barrier level.

        Returns:
            np.ndarray: Payoffs for each path.
        """
        S_T = paths[:, -1]
        barrier_hit = (paths <= H).any(axis=1)

        payoff = np.where(
            barrier_hit,
            S_T,
            np.where(S_T >= B, S_T, B)
        )

        return payoff

In [5]:
class HedgingStrategy:
    def __init__(self, pricer, B, H, T, notional):
        """
        Initialize the hedging strategy for the Bonus Certificate.

        Args:
            pricer (MonteCarloExoticPricer): Calibrated pricing model.
            B (float): Bonus level.
            H (float): Barrier level.
            T (float): Time to maturity.
            notional (float): Total investor capital in USD.
        """
        self.pricer = pricer
        self.B = B
        self.H = H
        self.T = T
        self.notional = notional
        self.S0 = pricer.S0
        self.r = pricer.r

    def compute_delta(self, eps=1.0, N_paths=100000, N_steps=252):
        """
        Compute the Delta of the Bonus Certificate at t=0 using central difference.

        Args:
            eps (float): Bump size in underlying.
            N_paths (int): Number of paths.
            N_steps (int): Time discretization steps.

        Returns:
            float: Estimated delta ∂V/∂S at t=0.
        """
        S_up = self.S0 + eps
        S_down = self.S0 - eps

        # Get model parameters from the calibrated pricer
        params = (
            self.pricer.v0, self.pricer.kappa, self.pricer.theta,
            self.pricer.sigma, self.pricer.rho, self.pricer.lamb,
            self.pricer.mu_j, self.pricer.sigma_j
        )

        # Create bumped pricers
        pricer_up = self.pricer.__class__(S_up, self.r, self.T, *params)
        pricer_down = self.pricer.__class__(S_down, self.r, self.T, *params)

        V_up = pricer_up.price_bonus_certificate(self.B, self.H, N_paths, N_steps)
        V_down = pricer_down.price_bonus_certificate(self.B, self.H, N_paths, N_steps)

        delta = (V_up - V_down) / (2 * eps)
        return delta

    def compute_hedge_position(self, delta):
        """
        Compute the number of shares required for delta hedge.

        Returns:
            float: Number of shares (positive = long, negative = short).
        """
        return delta * self.notional / self.S0

In [6]:
with open("calibrated_pricer.json", "r") as f:
    config = json.load(f)

# Extract fields
S0 = config["S0"]
r = config["r"]
T = config["T"]
params = config["params"]

# Rebuild pricer object
pricer = MonteCarloExoticPricer(
    S0, r, T,
    params["v0"], params["kappa"], params["theta"],
    params["sigma"], params["rho"], params["lamb"],
    params["mu_j"], params["sigma_j"]
)

In [7]:
hedger = HedgingStrategy(
    pricer=pricer,   # your calibrated MonteCarloExoticPricer
    B=1000,
    H=750,
    T=0.25,
    notional=1_000_000
)

delta_0 = hedger.compute_delta()
shares = hedger.compute_hedge_position(delta_0)

print(f"Delta at t=0: {delta_0:.4f}")
print(f"Hedge position: {shares:.0f} shares (buy if positive)")


Delta at t=0: 0.9059
Hedge position: 949 shares (buy if positive)
