# Notebook 2 (v2) â€” Static Replication from Calls (with diagnostics)

Replicate a target payoff using cash + forward + a strip of calls. Includes guards to avoid the 'all weights = 0' issue.

In [None]:
# %pip install numpy pandas scipy matplotlib cvxpy yfinance nbformat
import numpy as np, pandas as pd, numpy.linalg as LA, matplotlib.pyplot as plt, math, warnings
warnings.filterwarnings('ignore')
pd.set_option('display.float_format', lambda v: f'{v:,.6f}')
try:
    import yfinance as yf; HAVE_YF=True
except Exception:
    HAVE_YF=False
try:
    import cvxpy as cp; HAVE_CVXPY=True
except Exception:
    HAVE_CVXPY=False
from scipy import optimize

In [None]:
from pathlib import Path
DATA_DIR = Path('C:/Pcs/Python/new price project')
CLEANED_PATH = DATA_DIR / 'cleaned_chain_v2.csv'
PDF_PATH     = DATA_DIR / 'implied_pdf_v2.csv'
SAMPLE_CHAIN = DATA_DIR / 'sample_chain.csv'

TICKER = 'SPY'; EXPIRY = '2025-10-17'
KEEP_FWD_MULT = (0.7, 1.3); GRID_POINTS = 161
FIT_MODE = 'nnls'; L2_REG = 0.0
STATE_BAND = (0.5, 1.5); STATE_POINTS = 201

In [None]:
def fetch_chain_yf(ticker:str, expiry:str|None):
    if not HAVE_YF: return None
    try:
        tk = yf.Ticker(ticker); exps = tk.options
        if not exps: return None
        exp = expiry or exps[0]
        oc = tk.option_chain(exp)
        calls = oc.calls.copy(); puts = oc.puts.copy() if oc.puts is not None else None
        calls['call_mid']=(calls['bid'].fillna(0)+calls['ask'].fillna(0))/2
        calls = calls[['strike','call_mid','bid','ask','impliedVolatility','openInterest']].rename(
            columns={'strike':'K','impliedVolatility':'iv','openInterest':'oi','bid':'call_bid','ask':'call_ask'})
        if puts is not None and len(puts):
            puts['put_mid']=(puts['bid'].fillna(0)+puts['ask'].fillna(0))/2
            puts=puts[['strike','put_mid']].rename(columns={'strike':'K'})
            df = pd.merge(calls, puts, on='K', how='left')
        else:
            df = calls
        df['expiry']=exp
        return df.sort_values('K').reset_index(drop=True)
    except Exception:
        return None

def load_sample_chain(path: Path):
    df = pd.read_csv(path)
    keep = [c for c in ['K','call_mid','put_mid','call_bid','call_ask','expiry'] if c in df.columns]
    return df[keep].sort_values('K').reset_index(drop=True)

def infer_forward_discount(df: pd.DataFrame):
    have_puts = 'put_mid' in df.columns and df['put_mid'].notna().sum()>5
    if have_puts:
        m = df['call_mid'].notna() & df['put_mid'].notna()
        y = (df.loc[m,'call_mid'] - df.loc[m,'put_mid']).values
        X = df.loc[m,'K'].values
        A = np.vstack([np.ones_like(X), X]).T
        a, b = LA.lstsq(A, y, rcond=None)[0]
        D = max(1e-9, -b); F = a/D
    else:
        D = 1.0
        idx = np.nanargmin(np.abs(np.diff(df['call_mid'], prepend=df['call_mid'].iloc[0])))
        F = float(df['K'].iloc[int(np.clip(idx,0,len(df)-1))])
    return float(F), float(D)

def diff_mats(n:int):
    D1=np.zeros((n-1,n)); D2=np.zeros((n-2,n))
    for i in range(n-1): D1[i,i],D1[i,i+1] = -1.0, 1.0
    for i in range(n-2): D2[i,i],D2[i,i+1],D2[i,i+2] = 1.0,-2.0,1.0
    return D1,D2

def convex_monotone_fit_regularized(call_mid: np.ndarray, lambda_reg: float=1e-2):
    from scipy import optimize
    c = np.asarray(call_mid, dtype=float).copy(); n=c.size; D1,D2=diff_mats(n)
    try:
        C = cp.Variable(n)
        obj = cp.Minimize(cp.sum_squares(C-c) + lambda_reg*cp.sum_squares(D2@C))
        cons = [D1@C <= 0, D2@C >= 0]
        cp.Problem(obj, cons).solve(solver=cp.OSQP, verbose=False)
        return np.array(C.value).ravel()
    except Exception:
        def fun(x):
            r=x-c; s=D2@x; return 0.5*np.dot(r,r)+0.5*lambda_reg*np.dot(s,s)
        def jac(x):
            r=x-c; s=D2@x; return r + lambda_reg*(D2.T @ s)
        Aub=np.vstack([D1,-D2]); bub=np.zeros(Aub.shape[0])
        lin=optimize.LinearConstraint(Aub, lb=-np.inf*np.ones_like(bub), ub=bub)
        res=optimize.minimize(fun, c, jac=jac, method='trust-constr', constraints=[lin], options={'maxiter':2000})
        return res.x if res.success else c

def restrict_and_uniformize(df: pd.DataFrame, F_T: float, keep_mult=(0.7,1.3), n_points=161):
    lo,hi = keep_mult; kmin,kmax = F_T*lo, F_T*hi
    d = df[(df['K']>=kmin)&(df['K']<=kmax)].copy().dropna(subset=['call_mid'])
    if len(d) < 25: d = df.copy().dropna(subset=['call_mid'])
    K_uni = np.linspace(d['K'].min(), d['K'].max(), n_points)
    C_uni = np.interp(K_uni, d['K'].values, d['call_mid'].values)
    out = pd.DataFrame({'K':K_uni, 'call_mid':C_uni})
    if 'put_mid' in d.columns:
        out['put_mid'] = np.interp(K_uni, d['K'].values, d['put_mid'].values)
    return out

def load_cleaned_or_rebuild():
    if CLEANED_PATH.exists():
        d = pd.read_csv(CLEANED_PATH)
        if {'K','C_clean'}.issubset(d.columns):
            K = d['K'].values; C = d['C_clean'].values
            # get F_T,D from sample or fallback
            if SAMPLE_CHAIN.exists():
                raw = pd.read_csv(SAMPLE_CHAIN); F,D = infer_forward_discount(raw)
            else:
                F,D = float('nan'), 1.0
            if not np.isfinite(F) or F<=0:
                curv = np.abs(np.diff(C, n=2))
                i = (np.argmax(curv)+1) if len(curv)>0 else len(C)//2
                F = float(K[i]); D = 1.0
            return K, C, F, D, 'cleaned_csv'
    # else rebuild
    df_raw = fetch_chain_yf(TICKER, EXPIRY)
    src = 'yfinance'
    if df_raw is None or df_raw['call_mid'].fillna(0).sum()==0:
        df_raw = load_sample_chain(SAMPLE_CHAIN); src = 'sample_csv'
    F_T, D = infer_forward_discount(df_raw)
    df_uni = restrict_and_uniformize(df_raw, F_T, keep_mult=KEEP_FWD_MULT, n_points=GRID_POINTS)
    C_clean = convex_monotone_fit_regularized(df_uni['call_mid'].values, lambda_reg=1e-2)
    return df_uni['K'].values, C_clean, F_T, D, src

K, C_clean, F_T, D, source = load_cleaned_or_rebuild()
print(f"Source: {source}; points: {len(K)}; F_T={F_T:.4f}; D={D:.6f}")

In [None]:
def payoff_fn(name:str, S: np.ndarray, params: dict, F_T: float):
    if name=='digital':
        K0 = float(params.get('K0', F_T)); return (S > K0).astype(float)
    if name=='call':
        K0 = float(params.get('K0', F_T)); return np.maximum(S - K0, 0.0)
    if name=='capped_call':
        K0 = float(params.get('K0', F_T)); M=float(params.get('M',20.0)); return np.minimum(np.maximum(S-K0,0.0), M)
    if name=='power':
        K0=float(params.get('K0',F_T)); p=float(params.get('p',1.5)); M=params.get('M',None)
        x=(S/max(K0,1e-9))**p
        return np.minimum(x,float(M)) if M is not None else x
    if name=='log':
        return np.log(np.maximum(S,1e-9)/max(F_T,1e-9))
    raise ValueError('unknown payoff')

In [None]:
def payoff_matrix(S: np.ndarray, K: np.ndarray):
    return np.maximum(S.reshape(-1,1) - K.reshape(1,-1), 0.0)

def build_state_grid(F_T: float, band=(0.5,1.5), n=201):
    lo,hi=band; return np.linspace(F_T*lo, F_T*hi, n)

def ensure_overlap_state_band(S, K, F_T):
    if np.max(S) <= np.min(K) or np.min(S) >= np.max(K):
        new_lo = min(np.min(K)/max(F_T,1e-9), 0.6)
        new_hi = max(np.max(K)/max(F_T,1e-9), 1.4)
        new_lo *= 0.95; new_hi *= 1.05
        return (new_lo, new_hi), True
    return None, False

def fit_replication(S, gS, K, C, mode='nnls', l2_reg=0.0):
    Phi = payoff_matrix(S,K); X = np.column_stack([np.ones_like(S), S, Phi])
    nK=len(K)
    if mode=='ols':
        if l2_reg>0:
            aug=np.zeros((nK, X.shape[1])); aug[:,2:]=math.sqrt(l2_reg)*np.eye(nK)
            X_aug=np.vstack([X,aug]); y_aug=np.concatenate([gS, np.zeros(nK)])
            beta,*_ = LA.lstsq(X_aug, y_aug, rcond=None)
        else:
            beta,*_ = LA.lstsq(X, gS, rcond=None)
        return dict(a=beta[0], b=beta[1], w=beta[2:], Phi=Phi, X=X)
    # nnls
    if HAVE_CVXPY:
        a=cp.Variable(1); b=cp.Variable(1); w=cp.Variable(nK, nonneg=True)
        resid = a + b*S + Phi@w - gS
        obj = cp.Minimize(0.5*cp.sum_squares(resid) + 0.5*l2_reg*cp.sum_squares(w))
        cp.Problem(obj).solve(solver=cp.OSQP, verbose=False)
        return dict(a=float(a.value), b=float(b.value), w=np.array(w.value).ravel(), Phi=Phi, X=X)
    # SciPy bounds
    def obj(beta):
        a,b=beta[0],beta[1]; w=beta[2:]
        r=a + b*S + Phi.dot(w) - gS
        return 0.5*np.dot(r,r) + 0.5*l2_reg*np.dot(w,w)
    def grad(beta):
        a,b=beta[0],beta[1]; w=beta[2:]
        r=a + b*S + Phi.dot(w) - gS
        ga=np.sum(r); gb=np.dot(r,S); gw=Phi.T.dot(r) + l2_reg*w
        return np.concatenate([[ga,gb],gw])
    beta0=np.zeros(2+nK); bounds=[(None,None),(None,None)] + [(0.0,None)]*nK
    res=optimize.minimize(obj, beta0, jac=grad, method='L-BFGS-B', bounds=bounds, options={'maxiter':1000})
    return dict(a=res.x[0], b=res.x[1], w=res.x[2:], Phi=Phi, X=X)

def portfolio_price(a,b,w,K,C,F_T,D): return a*D + b*D*F_T + np.dot(w,C)
def error_stats(err): 
    import numpy as np, math
    return {'RMSE': math.sqrt(np.mean(err**2)), 'MaxAbs': np.max(np.abs(err)), 'P95Abs': np.percentile(np.abs(err),95)}

In [None]:
# Choose payoff and run
PAYOFF_NAME='digital'; PARAMS={'K0': float(F_T), 'M': 15.0, 'p': 1.5}
S = build_state_grid(F_T, band=STATE_BAND, n=STATE_POINTS)

new_band, changed = ensure_overlap_state_band(S, K, F_T)
if changed:
    print(f"[warn] Expanded STATE_BAND from {STATE_BAND} to {new_band} to overlap strikes.")
    STATE_BAND = new_band; S = build_state_grid(F_T, band=STATE_BAND, n=STATE_POINTS)

gS = payoff_fn(PAYOFF_NAME, S, PARAMS, F_T)
Phi = payoff_matrix(S, K)
is_const = np.nanstd(gS) < 1e-8
if is_const: print("[warn] Payoff is almost constant on the state grid; call weights may be ~0.")
if not np.any(Phi>0): print("[warn] (S-K)^+ is zero everywhere; expand STATE_BAND or check F_T.")

mode = FIT_MODE if not is_const else 'ols'
fit = fit_replication(S, gS, K, C_clean, mode=mode, l2_reg=L2_REG)
a,b,w = fit['a'], fit['b'], fit['w']
price = portfolio_price(a,b,w, K, C_clean, F_T, D)
ghat = a + b*S + Phi.dot(w); err = ghat - gS
stats = error_stats(err)

print(f"Mode={mode}; Price={price:.6f}")
print(pd.Series(stats))

In [None]:
import matplotlib.pyplot as plt
plt.figure(figsize=(7,4)); plt.plot(S,gS,label='Target'); plt.plot(S,ghat,label='Replicated')
plt.legend(); plt.xlabel('S_T'); plt.ylabel('Payoff'); plt.title(f'Replication: {PAYOFF_NAME}'); plt.tight_layout(); plt.show()

plt.figure(figsize=(7,3.2)); plt.plot(S, err); plt.axhline(0,color='k',lw=0.8)
plt.title('Replication error'); plt.xlabel('S_T'); plt.ylabel('Error'); plt.tight_layout(); plt.show()

plt.figure(figsize=(7,3.2)); 
if len(K)>1: width=(K[1]-K[0])
else: width=1.0
plt.bar(K, w, width=width); plt.xlabel('Strike K'); plt.ylabel('Weight w_i'); plt.title('Call weights'); plt.tight_layout(); plt.show()