Chapter 4 code:

- simiulate hidden regimes + prices
- update belief each step
- choose hedge from state
- compute loss
- optimise CVaR

In [None]:
from dataclasses import dataclass # immutable-ish container for hyperparameters; avoids ad-hoc lists
import torch
import math

@dataclass
class HedgeConfig:
    T: float = 1.0              # year. keep it at one year
    steps: int = 252            # time steps. 252 business days
    mu: float = 0.0             # drift of S
    r: float = 0.0              # risk-free rate
    sigma_L: float = 0.2        # low-vol regime
    sigma_H: float = 0.5        # high-vol regime
    S0: float = 100.0       
    seed: int = 1337
    known_regime: bool = False  # debug first
    eps: float = 0.05           # e-rectangle size
    P_bar: torch.Tensor = None  # shape (2,2), row-stochastic
    alpha: float = 0.95         # CVaR level
    device: torch.device = torch.device("cuda") if \
        torch.cuda.is_available() else (torch.device("mps") \
            if torch.backends.mps.is_built() else torch.device("cpu")) # preference for GPU, and then Macbook metal shard if using mac otherwise CPU suffices.
    dtype: torch.dtype = torch.float32 
    
    @property
    def dt(self) -> float:
        return self.T / self.steps
    
    @property
    def sqrt_dt(self) -> float:
        return math.sqrt(self.dt)
    
    def rng(self) -> torch.Generator:
        # Statefully reproducible RNG for torch ops
        g = torch.Generator(device = 'cpu') # keep random generator on CPU because does not support mps
        g.manual_seed(self.seed)
        return g


def sample_regimes(P: torch.Tensor, steps: int, start_state: int = 0, *, generator=None):
    pass
    

We know market flips between two volatility "moods" (Low, High) following a Markov chain with matrix P. You do not see the mood directly. What we do see are prices and from price moves, we infer a belief $q_t$ = [Pr(L), Pr(H)]

In [112]:
# take yesterday belief and flow it through the Markov chain
### FILTER.PY

def hmm_predict(q_prev: torch.Tensor, P:torch.Tensor) -> torch.Tensor:
    assert q_prev.shape == (2,); assert P.shape == (2,2)
    q_pred = P.T @ q_prev 
    q_pred = q_pred.clamp(0,1) # ensure we do not get out of bound values
    q_pred /= q_pred.sum() # normalise just in case
    return q_pred

def hmm_update(q_pred: torch.Tensor, x: torch.Tensor,
               mu: float, sigma_L: float, sigma_H: float, dt: float) -> torch.Tensor:
    """
    q_pred: (2,), prior after predict
    x: sxcalar log-return at this step
    returns: q_post: (2,)
    """
    mL = mu * 0.5 * (sigma_L**2)
    mH = mu - 0.5 * (sigma_H**2)
    vL = sigma_L**2
    vH = sigma_H**2
    
    # log-likelihoods for Normal(x; mean=m*dt, var=v*dt)
    # logN = -0.5*log(2*pi*v*dt) - 0.5*((x - m*dt)^2)/(v*dt)
    const = -0.5 * math.log(2*math.pi)
    loglik_L = const - 0.5*math.log(vL*dt) - 0.5*((x - mL*dt)**2)/(vL*dt)
    loglik_H = const - 0.5*math.log(vH*dt) - 0.5*((x - mH*dt)**2)/(vH*dt)

    log_qpred = torch.log(q_pred.clamp_min(1e-32))
    log_post_unnorm = torch.stack([loglik_L, loglik_H]) + log_qpred
    
    # softmax = exp - logsumexp
    maxlog = log_post_unnorm.max()
    exps = torch.exp(log_post_unnorm - maxlog)
    q_post = exps / exps.sum()

    # guardrails
    q_post = q_post.clamp(0, 1)
    s = q_post.sum()
    return q_post if s > 0 else torch.tensor([0.5, 0.5], dtype=q_pred.dtype, device=q_pred.device)

def stationary_2state(P: torch.Tensor) -> torch.Tensor:
    """
    P: (2,2) row-stochastic transition matrix
    return: pi (2,) with pi.sum()==1 and P.T @ pi == pi (within tolerance)
    """
    p_LH = P[0,1]
    p_HL = P[1,0]
    # unnormalised weights 
    wL = p_HL
    wH = p_LH
    # guardrail 
    denom = wL + wH
    if float(denom) <= 0.0:
        return torch.tensor([0.5, 0.5], dtype=P.dtype, device=P.device)
    
    w = torch.stack([wL, wH]).clamp_min(0)
    pi = w/w.sum()
    
    return pi
    

    
P = torch.tensor([[0.9, 0.1],
                  [0.2, 0.8]], dtype=torch.float64)
pi = stationary_2state(P)
print("pi =", pi)
print("sum =", pi.sum().item())
print("fixed point OK:", torch.allclose(P.T @ pi, pi, atol=1e-10))
    

pi = tensor([0.6667, 0.3333], dtype=torch.float64)
sum = 1.0
fixed point OK: True


This means in the long run, the chain spends about 2/3 of time in Low and 1/3 of time High.

In [113]:
def simulate_high_vol_returns(steps: int, mu: float, sigma_H: float, dt: float, *, generator=None) -> torch.Tensor:
    """
    Returns a length 'steps' tensor of log-returns generated under the High-vol regime
    """
    mH = mu - 0.5 * (sigma_H ** 2)
    
    # sample standard normals Z_t
    Z = torch.randn(steps, generator=generator)
    
    # Build log-returns
    x = mH * dt + sigma_H * math.sqrt(dt) * Z
    return x   

In [114]:
steps = 50_000
mu, sigma_H, dt = 0.0, 0.6, 1/252
g = torch.Generator().manual_seed(0)
xs = simulate_high_vol_returns(steps, mu, sigma_H, dt, generator=g)
print(xs.mean().item(), xs.std().item())

-0.0009849938796833158 0.03779744729399681


In [115]:
import torch

def belief_demo(
    P: torch.Tensor,
    q0: torch.Tensor,
    xs: torch.Tensor,
    mu: float,
    sigma_L: float,
    sigma_H: float,
    dt: float,
    *,
    burn_in: int = 0
) -> torch.Tensor:
    """
    Run a predict→update belief filter over a sequence of log-returns.

    Returns a tensor of the posterior High-regime beliefs q_t(H) at each step,
    optionally discarding the first `burn_in` steps from the output.
    """

    # make sure everything lives on the same device/dtype
    xs = xs.reshape(-1)  # ensure 1D
    dtype, device = xs.dtype, xs.device
    P = P.to(device=device, dtype=dtype)
    q = q0.to(device=device, dtype=dtype)

    # storage for the High-belief each step
    qH_series = []

    # Core loop: for each observed log-return x_t
    for x in xs:
        # Predict: push yesterday's belief through the Markov chain
        #      q_{t|t-1} = P^T @ q_{t-1}
        q = hmm_predict(q, P)

        # Update; reweight with the likelihood of the observed return
        #      q_t ∝ ℓ(x_t) ⊙ q_{t|t-1}  (done in log-space inside hmm_update)
        q = hmm_update(q, x, mu, sigma_L, sigma_H, dt)

        # Record the High component for plotting / inspection
        qH_series.append(q[1])

    qH_series = torch.stack(qH_series)

    # early steps can be prior-driven; trim if requested
    if burn_in > 0:
        burn_in = min(burn_in, qH_series.numel())
        qH_series = qH_series[burn_in:]

    return qH_series

P  = torch.tensor([[0.9, 0.1],
                   [0.2, 0.8]], dtype=torch.float64)
pi = stationary_2state(P)

cfg = HedgeConfig()
g   = cfg.rng()

xs  = simulate_high_vol_returns(steps=60, mu=0.0, sigma_H=0.6, dt=cfg.dt, generator=g)
qHs = belief_demo(P, pi, xs, mu=0.0, sigma_L=0.10, sigma_H=0.60, dt=cfg.dt, burn_in=0)

print(qHs[:10])        # should start near ~0.33 (the stationary H weight)
print(qHs[-1].item())  # should be well above ~0.6–0.8 on a high-vol path (noise is fine)


tensor([0.1161, 0.0407, 0.2330, 1.0000, 0.9976, 0.3976, 1.0000, 0.4085, 0.4469,
        1.0000])
0.9980629086494446
