# De-Americanization for Implied Dividends
------------------
> **Idriss Afra**

This project aims to calibrate discrete cash-dividend schedules from American equity or index option prices.

## US Stock and Index Options

Listed options on U.S. stocks and indices are most often American-style, meaning they may be exercised early. With early exercise and discrete dividends, American prices do not satisfy European put–call parity, which complicates the inference of forwards and dividend expectations from observed prices.

To estimate implied forwards and dividends, we follow this workflow:
1. Model the American early-exercise feature with a binomial tree.
2. Compute the early-exercise premium as the difference between model American and model European prices.
3. Convert market American prices into European-equivalent prices by subtracting this premium (de-americanization).
4. Use European parity to infer forwards and recover the present value of dividends.
5. Bootstrap the cash dividend whose ex-dividend date is just before each relevant expiry.

This notebook implements the full workflow, including market-data handling and a synthetic validation harness.


---
## Binomial Pricing Model

The binomial model is straightforward to interpret and implement: over each small time step $\Delta t$, the underlying price $S_t$ moves up or down by fixed multiplicative factors. We define the movement factors $u$ and $d$ as
$$
u = e^{\sigma\sqrt{\Delta t}}, 
\qquad 
d = e^{-\sigma\sqrt{\Delta t}}=\frac{1}{u},
$$
where $\sigma$ is the volatility and $\Delta t$ is the time step, with maturity $T$.

### Discrete dividends and the $S \leftrightarrow X$ transformation
With discrete cash dividends, the stock price drops by the dividend amount on each ex-dividend date. A convenient way to handle this in a recombining tree is to work with a dividend-stripped (also called prepaid forward) process $X_t$, defined by
$$
X_t := S_t - I_t,
\qquad\text{so equivalently}\qquad
S_t = X_t + I_t \quad (S \leftrightarrow X),
$$
where $I_t$ is the present value at time $t$ of all expected future dividends between $t$ and $T$:
$$
I_t = \sum_{t < t_j \le T} D_j\,e^{-r(t_j-t)}.
$$
Here $D_j$ is the cash dividend paid at ex-date $t_j$, and $r$ is the continuously-compounded risk-free rate. Intuitively, $X_t$ removes the deterministic part of price changes coming from scheduled dividend drops, so the remaining uncertainty is modeled cleanly by the tree. In practice: at each node time $t$, compute $I_t$, evolve $X$ on the tree, and recover $S$ via $S=X+I$ whenever you need payoffs or early-exercise comparisons.

### Risk-neutral probability (on the $X$-tree)
Under the risk-neutral measure, the dividend-stripped process $X_t$ has expected growth $e^{r\Delta t}$ over a step (because the expected dividends have been removed into $I_t$). The one-step risk-neutral probability is therefore
$$
p=\frac{e^{r\Delta t}-d}{u-d},
\qquad 
1-p=\frac{u-e^{r\Delta t}}{u-d},
$$
so that
$$
\mathbb{E}^Q[X_{t+\Delta t}\mid X_t]=p(X_tu)+(1-p)(X_td)=X_t e^{r\Delta t}.
$$

### Backward induction pricing
Once the tree for $X$ is built, option values are computed by backward induction. At maturity, using the recovered spot $S_T = X_T + I_T$ (note $I_T=0$ by definition),
$$
V_T(S_T)=\max\big(\phi(S_T-K),0\big),
$$
where $\phi=+1$ for calls and $\phi=-1$ for puts, and $K$ is the strike.

For European options (exercise only at $T$):
$$
V_t(S_t)=e^{-r\Delta t}\Big(p\,V_{t+\Delta t}(S^{u}_{t+\Delta t})+(1-p)\,V_{t+\Delta t}(S^{d}_{t+\Delta t})\Big),
$$
with
$$
S^{u}_{t+\Delta t}=(X_tu)+I_{t+\Delta t},\qquad
S^{d}_{t+\Delta t}=(X_td)+I_{t+\Delta t}.
$$

For American options (early exercise allowed), take the maximum of immediate exercise versus continuation:
$$
V_t(S_t)=\max\!\Big(\max(\phi(S_t-K),0),\;
e^{-r\Delta t}\big(p\,V_{t+\Delta t}(S^{u}_{t+\Delta t})+(1-p)\,V_{t+\Delta t}(S^{d}_{t+\Delta t})\big)\Big),
$$
where $S_t=X_t+I_t$. Discrete dividends matter especially here because the predictable drop in $S$ around ex-dividend dates can make early exercise optimal (most notably for deep ITM calls just before a dividend).

As the number of time steps increases (smaller $\Delta t$), the binomial price typically becomes more accurate, at the cost of higher computational time.


In [1]:
import numpy as np

class binomial_model:
    """
    The Binomial model class.
    """
    def __init__(self, S0, zc_rate, n_steps=250):
        """
        S0 : Initial spot price
        zc_rate : Zero-coupon rate curve (float or function of time)
        n_steps : the binomial tree length (number of steps)
        """
        if S0 <= 0: raise ValueError("The spot price must be positive.")
        if n_steps <= 0: raise ValueError("The number of time steps must be positive.")
        self.S0 = S0
        self.zc_rate = zc_rate if callable(zc_rate) else (lambda T: float(zc_rate))
        self.n_steps = int(n_steps)
        # Cache the context per T and current dividend schedule to avoid recomputation and keep runtime reasonable.
        self._ctx_cache = {}

    def _df(self, T):
        """
        Computes discount factor up to T, using the zc_rate attribute.
        """
        if T < 0 : return 0
        else : return np.exp(-self.zc_rate(T) * T)

    def _divs_as_list(self, T, expected_divs):
        """
        Returns a list of (div_time, div_amount) for dividends strictly inside (0, T).
        """
        if not expected_divs: return []
        divs = []
        for div_data in expected_divs.values():
            div_time = div_data.get("exdiv_year_fraction")
            div_amount = div_data.get("dividend_cash")
            if div_amount is None or div_amount == 0: continue
            if div_amount < 0: raise ValueError("The dividend amount must be non-negative.")
            if div_time is None or div_time <= 0 or div_time >= T: continue
            divs.append((div_time, div_amount))
        divs.sort(key=lambda x: x[0])
        return divs

    def _div_signature(self, T, expected_divs):
        """
        Builds a stable signature for caching. Only divs strictly inside (0,T) matter.
        """
        divs = self._divs_as_list(T, expected_divs)
        # round to avoid floating noise in caching
        return tuple((round(div_time, 12), round(div_amount, 12)) for (div_time, div_amount) in divs)

    def _build_I_from_df(self, times, df_times, expected_divs):
        """
        Builds the dividend PV space.
        => PV at each node-time t_i of dividends falling after t_i: sum expected_divs * df(div_time) / df(t_i)
        """
        I = np.zeros(len(times), dtype=float)
        if not expected_divs: return I
        for (div_time, div_amount) in expected_divs:
            df_div = self._df(div_time)
            mask = times < div_time
            # Cumulative expected divs expressed at t_i
            I[mask] += div_amount * df_div / df_times[mask]

        return I

    def _sigma_floor_from_cf(self, cf, dt):
        """
        Minimal sigma to ensure CRR validity under rate curve term-structure: requires d < cf_i < u for all steps.
        => sigma > max_i |ln(cf_i)| / sqrt(dt)
        """
        sqrt_dt = np.sqrt(dt)
        floor = np.max(np.abs(np.log(cf))) / sqrt_dt
        return floor * 1.01  # small safety bump (1%)

    def _get_ctx(self, T, expected_divs):
        """
        Cached context for a given maturity T and current dividend schedule.
        Context contains term-structure + dividend PV objects that are sigma-independent.
        """
        key = (round(T, 12), self._div_signature(T, expected_divs))
        ctx = self._ctx_cache.get(key)
        if ctx is not None: return ctx
        n = self.n_steps
        dt = T / n
        times = np.linspace(0.0, T, n + 1)
        df_times = np.array([self._df(t) for t in times], dtype=float)
        cf = df_times[:-1] / df_times[1:]   # capitalisation factor per step
        divs = self._divs_as_list(T, expected_divs)
        I = self._build_I_from_df(times, df_times, divs)
        X0 = self.S0 - I[0] # clean spot price space : no dividends
        if X0 < 0: X0 = 0.0
        sigma_floor = self._sigma_floor_from_cf(cf, dt)
        # Cached information (context)
        ctx = {
            "n": n,
            "dt": dt,
            "times": times,
            "df_times": df_times,
            "cf": cf,
            "I": I,
            "X0": X0,
            "sigma_floor": sigma_floor,
        }
        self._ctx_cache[key] = ctx
        return ctx

    def _prices_call_put_amer_euro(self, T, K, sigma, expected_divs=None):
        """
        Computes the binomial price of call_amer, put_amer, call_euro, and put_euro, for a given volatility "sigma".
        """
        if K <= 0: raise ValueError("The strike price must be positive.")
        if T <= 0:
            call0 = max(self.S0 - K, 0.0)
            put0  = max(K - self.S0, 0.0)
            return call0, put0, call0, put0
        # Get the context
        ctx = self._get_ctx(T, expected_divs)
        n   = ctx["n"]
        dt  = ctx["dt"]
        cf  = ctx["cf"]
        I   = ctx["I"]
        X0  = ctx["X0"]
        sigma_floor = ctx["sigma_floor"]
        # Sigma correction
        sigma = max(sigma, sigma_floor)
        # Up & Down factors
        u = np.exp(sigma * np.sqrt(dt))
        d = 1.0 / u
        # Risk-neutral p per step
        p = (cf - d) / (u - d)
        if np.any((p < 0.0) | (p > 1.0)):
            bad_p = int(np.where((p < 0.0) | (p > 1.0))[0][0])
            raise ValueError(f"The Binomial model is not valid with the given parameters: p={p[bad_p]} at step={bad_p}")
        # Spot simulated prices at maturity
        j = np.arange(n + 1)
        X_T = X0 * (u ** (n - j)) * (d ** j) # Clean space (no dividends) : X(T, j) = X0 * u^(n-j) * d^j
        S_T = X_T + I[n] # Dirty space (with dividends) : S(T, j) = X(T, j) + ExpectedDividends(T)
        # Payoff at maturity
        call_amer = np.maximum(S_T - K, 0.0)
        put_amer  = np.maximum(K - S_T, 0.0)
        call_euro = call_amer.copy() # euro starts same at maturity
        put_euro  = put_amer.copy() # euro starts same at maturity
        # Backward induction
        for i in range(n - 1, -1, -1):
            disc = 1.0 / cf[i]
            # Continuation value
            call_amer = disc * (p[i] * call_amer[:i+1] + (1.0 - p[i]) * call_amer[1:i+2])
            put_amer  = disc * (p[i] * put_amer[:i+1]  + (1.0 - p[i]) * put_amer[1:i+2])
            call_euro = disc * (p[i] * call_euro[:i+1] + (1.0 - p[i]) * call_euro[1:i+2])
            put_euro  = disc * (p[i] * put_euro[:i+1]  + (1.0 - p[i]) * put_euro[1:i+2])
            # Early exercise option (American only)
            j = np.arange(i + 1)
            X_i = X0 * (u ** (i - j)) * (d ** j) # Clean space (no dividends) : X(i, j) = X0 * u^(i-j) * d^j
            S_i = X_i + I[i] # Dirty space (with dividends) : S(i, j) = X(i, j) + ExpectedDividends(i)
            call_amer = np.maximum(call_amer, np.maximum(S_i - K, 0.0))
            put_amer  = np.maximum(put_amer,  np.maximum(K - S_i, 0.0))
        return call_amer[0], put_amer[0], call_euro[0], put_euro[0]

    def binomial_price(self, T, K, sigma, isCall, expected_divs=None, isAmerican=True):
        """
        Returns the binomial price.
        """
        ca, pa, ce, pe = self._prices_call_put_amer_euro(T, K, sigma, expected_divs)
        if isCall: return ca if isAmerican else ce
        else: return pa if isAmerican else pe

    def implied_binomial_vol(self, T, K, isCall, market_price, expected_divs=None, isAmerican=True,
                             sigma_min=1e-4, sigma_max=2.5, n_max=60, eps_price=1e-6):
        """
        Returns the binomial implied volatility, using the bisection algorithm.
        It enforce sigma >= sigma_floor(T) (cached floor) to ensure a risk-neutral probability p within [0,1].
        """
        if K <= 0: raise ValueError("The strike price must be positive.")
        if T <= 0: raise ValueError("The expiry must be in the future.")
        target = float(market_price)
        if target < 0: raise ValueError("market_price must be non-negative.")
        # Get the context
        ctx = self._get_ctx(T, expected_divs)
        floor = ctx["sigma_floor"]
        # Low & High boundaries
        low = max(sigma_min, floor)
        high = sigma_max
        # Price difference at bracket ends
        p_low = self.binomial_price(T, K, low, isCall, expected_divs, isAmerican) - target
        p_high = self.binomial_price(T, K, high, isCall, expected_divs, isAmerican) - target
        # Expand bracket if needed
        if p_low * p_high > 0:
            for _ in range(10):
                high *= 1.5
                p_high = self.binomial_price(T, K, high, isCall, expected_divs, isAmerican) - target
                if p_low * p_high <= 0:
                    break
            if p_low * p_high > 0:
                raise ValueError(f"Vol calibration failed to bracket: f(low)={p_low}, f(high)={p_high}, low={low}, high={high}")
        # Bisection algorithm
        for _ in range(n_max):
            mid = 0.5 * (low + high)
            p_mid = self.binomial_price(T, K, mid, isCall, expected_divs, isAmerican) - target
            if abs(p_mid) < eps_price:
                return mid
            if p_low * p_mid <= 0:
                high, p_high = mid, p_mid
            else:
                low, p_low = mid, p_mid
        return 0.5 * (low + high)

---
## De-Americanization Algorithm

American options may be exercised before expiration, so their prices generally do not satisfy European put–call parity. Consequently, American call and put quotes cannot be used directly to infer forwards and dividends. The procedure below de-americanizes market quotes by removing an estimated early-exercise premium, constructs European-equivalent prices, and then implies the forward by parity. When discrete dividends are present, the algorithm bootstraps the cash dividend whose ex-dividend date is the last one strictly before the given option expiry $T$.

#### Init: 
For a given option expiry $T$:
- Set the discount factor $df_T = e^{-r(T)T}.$
- Get the expected cash dividend schedule $\{(t_j, D_j)\}_{0<t_j<T}$, where all dividends are pre-estimated and the final one (the last ex-dividend date before $T$) is provided with an initial input value.
- Compute its present value up to $T$: $PV_{\text{div}}(0,T)=\sum_{0<t_j<T} D_j\,e^{-r(t_j)t_j}.$
- Initialize the forward guess: $F_0=\frac{S_0-PV_{\text{div}}(0,T)}{df_T}.$
- Let $t_{\ell}$ be the time of the last ex-dividend date strictly before $T$ (if none exists, skip the dividend-bootstrap step below).
- Choose a liquid strike set $\{K_i\}_{i=1}^m$ with corresponding market American call and put prices $C^{Am}_{mkt}(K_i),\;P^{Am}_{mkt}(K_i).$
- Set maximum iterations $n_{\max}$ and tolerance $\varepsilon.$

#### While $n < n_{\max}:$
##### 1) Per-strike implied American volatility (OTM selection)
For each strike $K_i$, select an out-of-the-money quote using the current forward guess $F_n$:
- if $K_i > F_n$, use the call quote and denote the selected market price by $M_i=C^{Am}_{mkt}(K_i);$
- otherwise, use the put quote and denote $M_i=P^{Am}_{mkt}(K_i).$

Imply the American volatility $\sigma_i^{Am}$ from the selected American quote: $\quad V^{Am}(K_i,\sigma_i^{Am})=M_i.$
##### 2) Early-exercise premium and European-equivalent prices
Using the same implied volatility $\sigma_i^{Am}$, compute both model American and model European prices for the call and the put:
$$
C^{Am}_{mod}(K_i,\sigma_i^{Am}),\; P^{Am}_{mod}(K_i,\sigma_i^{Am}),\;
C^{Eu}_{mod}(K_i,\sigma_i^{Am}),\; P^{Eu}_{mod}(K_i,\sigma_i^{Am}).
$$
Define the early-exercise premiums:
$$
EEP_c(K_i)=C^{Am}_{mod}(K_i,\sigma_i^{Am})-C^{Eu}_{mod}(K_i,\sigma_i^{Am}),
$$
$$
EEP_p(K_i)=P^{Am}_{mod}(K_i,\sigma_i^{Am})-P^{Eu}_{mod}(K_i,\sigma_i^{Am}).
$$
Construct European-equivalent market prices (floored at 0 for numerical stability):
$$
C^{Eu}_{eq}(K_i)=\max\big(C^{Am}_{mkt}(K_i)-EEP_c(K_i),0\big),
$$
$$
P^{Eu}_{eq}(K_i)=\max\big(P^{Am}_{mkt}(K_i)-EEP_p(K_i),0\big).
$$
##### 3) Forward inference by European put–call parity (per strike)
Imply a forward from parity for each strike:
$$
F_i = K_i + \frac{C^{Eu}_{eq}(K_i)-P^{Eu}_{eq}(K_i)}{df_T}.
$$
##### 4) Aggregate the forward across strikes
Aggregate the per-strike forwards $\{F_i\}$ into a single forward estimate:
$
F_{n+1}=\operatorname{Agg}(\{F_i\}_{i=1}^m),
$
where $\operatorname{Agg}$ is either the median (default) or the mean.
##### 5) Dividend bootstrap for the last ex-dividend date before $T$ (if it exists)
If there is no ex-dividend date strictly before $T$, skip this step.

Otherwise, convert the implied forward into the total dividend present value consistent with the implied forward:
$
PV_{\text{div,total}}(0,T)=S_0 - F_{n+1}\,df_T.
$

Let $t_{\ell}$ be the last ex-dividend time before $T$, and define the PV of all earlier dividends (strictly before that last ex-date):
$
PV_{\text{prev}}=\sum_{0<t_j<T,\; t_j<t_{\ell}} D_j\,e^{-z(t_j)t_j}.
$

Bootstrap the last cash dividend amount:
$$
D_{\ell}=\frac{PV_{\text{div,total}}(0,T)-PV_{\text{prev}}}{e^{-z(t_{\ell})t_{\ell}}},
\qquad
D_{\ell}=\max(D_{\ell},0).
$$
Update the dividend schedule by replacing the cash dividend at the last ex-dividend date with $D_{\ell}$.
##### 6) Convergence
If $|F_{n+1}-F_n|<\varepsilon,$ stop and output $F_{n+1}$ and the updated last cash dividend amount. Otherwise set $n\leftarrow n+1$ and continue.

#### Outputs
- the implied forward : $F(0,T)$
- the bootstrapped cash dividend at the last ex-dividend date before $T$ : $D_{\ell}$

In this project, we use the Binomial model to imply American volatilities and price equivalent European options.

In [2]:
from datetime import date

class deAmericanization_method:
    """
    The de-Americanization algorithm class:
    - Calibrate sigmas from OTM American prices for the given liquid strikes.
    - Compute EEP (Early-Exercise Premium) via model Amer - model Euro, with model being the Binomial.
    - Build Euro-equivalent prices = Amer_mkt - EEP.
    - Imply forward values per strike via parity, then aggregate them (median or mean).
    """
    def __init__(self, S0, zc_rate, n_steps=250):
        """
        S0 : Initial spot price
        zc_rate : Zero-coupon rate curve (float or function of time)
        n_steps : the binomial tree length (number of steps)
        """
        if S0 <= 0: raise ValueError("The spot price must be positive.")
        self.S0 = S0
        self.zc_rate = zc_rate if callable(zc_rate) else (lambda T: float(zc_rate))
        self.binomial_model = binomial_model(S0=S0, zc_rate=zc_rate, n_steps=n_steps)

    def _df(self, T):
        """
        Computes discount factor up to T, using the zc_rate attribute.
        """
        if T < 0 : return 0
        else : return np.exp(-self.zc_rate(T) * T)

    def _pv_divs_upto(self, T):
        if not self.expected_divs: return 0.0
        pv = 0.0
        for div_data in self.expected_divs.values():
            div_time = div_data["exdiv_year_fraction"]
            div_amount = div_data["dividend_cash"]
            if 0.0 < div_time < T and div_amount > 0:
                pv += div_amount * self._df(div_time)
        return pv

    def _last_div_date_before(self, T):
        if not self.expected_divs: return None
        items = []
        for div_date, div_data in self.expected_divs.items():
            div_time = div_data["exdiv_year_fraction"]
            if 0.0 < div_time < T:
                items.append((div_time, div_date))
        if not items: return None
        items.sort()
        return items[-1][1]

    def _implied_forward(self, T, strikes, amer_call_prices, amer_put_prices, n_max=60, eps=1e-6, 
                         debug=False, forward_agg="median", vol_n_max=60, vol_eps_price=1e-6):
        """
        The de-Americanization algorithm.
        _implied_forward is called by _implied_forwards to imply the forward at T and to bootstrap the final 
        cash dividend whose ex-dividend date is just before T.
        """
        if T <= 0: raise ValueError("The expiry must be in the future.")
        # Market data
        strikes = np.array(list(strikes), dtype=float)
        calls = np.array(list(amer_call_prices), dtype=float)
        puts  = np.array(list(amer_put_prices), dtype=float)
        # Dividend and forward data
        df_T = self._df(T)
        pv_divs = self._pv_divs_upto(T)
        F = (self.S0 - pv_divs) / df_T
        last_div_date = self._last_div_date_before(T)
        last_div_time = None
        if last_div_date is not None:
            last_div_time = self.expected_divs[last_div_date]["exdiv_year_fraction"]
        # Forward calibration
        for _ in range(n_max):
            forwards = []
            for i, K in enumerate(strikes):
                isCall = bool(K > F)
                mkt = calls[i] if isCall else puts[i]
                # Binomial skew calibration
                sigma = self.binomial_model.implied_binomial_vol(
                    T=T, K=K, isCall=isCall, market_price=mkt,
                    expected_divs=self.expected_divs, isAmerican=True,
                    n_max=vol_n_max, eps_price=vol_eps_price
                )
                # Binomial American and European call / put prices
                call_amer_model, put_amer_model, call_euro_model, put_euro_model = \
                    self.binomial_model._prices_call_put_amer_euro(T, K, sigma, expected_divs=self.expected_divs)
                # Early-exercise premiums
                eep_call = call_amer_model - call_euro_model
                eep_put  = put_amer_model  - put_euro_model
                # Equivalent market European prices (Floored by 0 for safety against numerical noises)
                call_euro_eq = max(calls[i] - eep_call, 0.0) 
                put_euro_eq  = max(puts[i]  - eep_put,  0.0)
                # Forward by parity
                Fk = K + (call_euro_eq - put_euro_eq) / df_T
                forwards.append(Fk)
                if debug: print("Strike:", K, "Sigma:", sigma, "Forward:", Fk)
            # Aggregated forward value
            forwards = np.array(forwards, dtype=float)
            if forward_agg == "mean": F_new = np.mean(forwards)
            else: F_new = np.median(forwards)
            # Move forward with the calibration if no dividends
            if last_div_date is None:
                if abs(F_new - F) < eps:
                    return F_new
                F = F_new
                continue
            # Last div correction
            pv_divs_total = self.S0 - F_new * df_T
            pv_prev = 0.0
            for div_date, div_data in self.expected_divs.items():
                div_time = div_data["exdiv_year_fraction"]
                div_amount = div_data["dividend_cash"]
                if 0.0 < div_time < T and div_date < last_div_date and div_amount > 0:
                    pv_prev += div_amount * self._df(div_time)
            last_df = self._df(last_div_time)
            last_div = (pv_divs_total - pv_prev) / max(last_df, 1e-8)
            last_div = max(last_div, 0.0)
            self.expected_divs[last_div_date]["dividend_cash"] = last_div
            # Move forward with the calibration 
            if abs(F_new - F) < eps:
                return F_new
            F = F_new
        raise ValueError("Forward fixed-point did not converge.")

    def implied_forwards(self, options_data, ex_div_dates=None, n_max=60, eps=1e-6, debug=False, detailed_debug=False,
                         initial_div=0.0, forward_agg="median", vol_n_max=60, vol_eps_price=1e-6):
        """
        Applies the de-Americanization algorithm across market option expiries and bootstraps a schedule of expected cash dividend.
        """
        print("\nRunning the de-Americanization algorithm...\n")
        if options_data is None or len(options_data) == 0: raise ValueError("The options_data must be a non-empty dictionary.")
        # Init
        self.expected_fwds = {}
        self.expected_divs = {}
        # Sorted market option expiries
        expiries_sorted = sorted(options_data.keys())
        if ex_div_dates is None or len(ex_div_dates) == 0:
            # Dividend-free forward prices under the risk-neutral measure
            for exp in expiries_sorted:
                T = options_data[exp]["expiry_year_fraction"]
                self.expected_fwds[exp] = self.S0 / self._df(T)
            self.expected_divs = None
            return {"implied_fwds": self.expected_fwds, "implied_divs": self.expected_divs}
        # Projected ex-dividend dates & initial dividends
        div_dates_sorted = sorted(ex_div_dates.keys())
        for div_date in div_dates_sorted:
            self.expected_divs[div_date] = {"exdiv_year_fraction": ex_div_dates[div_date], "dividend_cash": initial_div}
        # Reference expiry per dividend: first expiry after that ex-div
        expiry_after_exdiv = {}
        for div_date in div_dates_sorted:
            for expiry in expiries_sorted:
                if options_data[expiry]["expiry_year_fraction"] > ex_div_dates[div_date]:
                    expiry_after_exdiv[div_date] = expiry
                    break
        # The de-Americanization algo across the selected expiries
        for expiry in expiries_sorted:
            row = options_data[expiry]
            T = row["expiry_year_fraction"]
            do_calib = False
            for div_date, ref_expiry in expiry_after_exdiv.items():
                if ref_expiry == expiry:
                    do_calib = True
                    break
            if not do_calib: continue
            F = self._implied_forward(
                T=T,
                strikes=row["strikes"],
                amer_call_prices=row["call"],
                amer_put_prices=row["put"],
                n_max=n_max,
                eps=eps,
                debug=detailed_debug,
                forward_agg=forward_agg,
                vol_n_max=vol_n_max,
                vol_eps_price=vol_eps_price
            )
            self.expected_fwds[expiry] = F
            if debug:
                print("Expiry :", expiry)
                print("Expiry Year Fraction :", round(T, 4))
                print("ZC Rate to Expiry  :", round(100 * self.zc_rate(T), 4), "%")
                print("Implied Fwd :", round(F, 4))
                last_div = self._last_div_date_before(T)
                if last_div is not None:
                    print("Ex-dividend Date :", last_div)
                    print("Implied Cash Dividend :", round(self.expected_divs[last_div]["dividend_cash"], 4))
                else:
                    print("Ex-dividend Date : None")
                print("-------------------------------------------------------")
        return {"implied_fwds": self.expected_fwds, "implied_divs": self.expected_divs}

---
## Market Data

We use a `MarketData` class to clean, validate, and standardize the inputs required by the de-americanization routine. It builds the zero-coupon rate curve from user inputs (flat, callable, or interpolated term structure), converts dates to year fractions from $t_0$, constructs the ex-dividend schedule, and validates the option dataset across expiries (structure, positivity checks, and strike-sorted call/put grids). If option quotes are not provided, it can generate synthetic option data for testing.

In [3]:
from scipy.interpolate import interp1d

class MarketData:
    """
    The market data class.
    Build de-Americanization inputs from user data (or synthetic defaults if not provided).
    """
    def __init__(self, t0, S0, zc_points, exdiv_dates, options_data=None, day_count=365.0, debug=False, zc_kind="linear"):
        # Sanity check
        if not isinstance(t0, date):
            raise ValueError("t0 must be a datetime.date")
        if S0 is None or float(S0) <= 0:
            raise ValueError("S0 must be a positive float.")
        if zc_points is None:
            raise ValueError("zc_points is required.")
        if exdiv_dates is None or len(exdiv_dates) == 0:
            raise ValueError("exdiv_dates is required (list/tuple of dates or dict {date: yf}).")
        self.t0 = t0
        self.S0 = S0
        self.day_count = day_count
        self.debug = debug
        # Build zc_rate and exdiv_dates
        self.zc_rate = self.build_zc_curve(zc_points, kind=zc_kind)
        self.exdiv_dates = self.build_exdiv_dates(exdiv_dates)
        # Optional market options_data
        self.options_data = None
        if options_data is not None:
            self.options_data = self._validate_options_data(options_data)
            if self.debug: print("\n[MarketData] Using USER-PROVIDED options_data (validated).")
        # Synthetic helpers
        self.sigma_surface = None
        self.expected_divs = None

    def yf(self, d):
        """
        Computes the year fraction given date d and the day_count method.
        """
        if isinstance(d, (int, float, np.floating)):
            return float(d)
        if not isinstance(d, date):
            raise ValueError("yf expects datetime.date or float")
        return (d - self.t0).days / self.day_count

    def build_zc_curve(self, zc_points, kind="linear"):
        """
        zc_points:
          - flat rate (float/int) => returns lambda T: flat_rate
          - callable curve => returns it
          - dict {year_fraction: rate} OR {date: rate} => interpolation
        Returns callable zc_rate(T) returning float for scalar T.
        """
        # Case 1) flat scalar
        if isinstance(zc_points, (int, float, np.floating)):
            r = float(zc_points)
            zc_rate = lambda T: r
            if self.debug:
                print("\n[MarketData] ZC curve built:")
                print("  - type: FLAT")
                print("  - r:", round(100 * r, 4), "%")
            return zc_rate
        # Case 2) already a callable
        if callable(zc_points):
            if self.debug:
                print("\n[MarketData] ZC curve provided by the user (callable).")
            return zc_points
        # Case 3) dict => interpolate
        if not isinstance(zc_points, dict) or len(zc_points) < 2:
            raise ValueError("zc_points must be a float, callable, or dict with at least 2 points.")
        xs, ys = [], []
        for k, r in zc_points.items():
            T = self.yf(k)
            if T < 0: continue
            xs.append(T)
            ys.append(r)
        if len(xs) < 2: raise ValueError("zc_points did not yield enough valid points (check dates vs t0).")
        xs = np.array(xs, dtype=float)
        ys = np.array(ys, dtype=float)
        o = np.argsort(xs)
        xs, ys = xs[o], ys[o]
        f = interp1d(xs, ys, kind=kind, fill_value="extrapolate", assume_sorted=True)
        zc_rate = lambda T : float(f(float(T)))
        # Debug
        if self.debug:
            print("\n[MarketData] ZC curve built by interpolating user-provided data:")
            print("  - type: TERM-STRUCTURE")
            print("  - kind:", kind)
            print("  - points (T -> r):")
            for T, r in zip(xs, ys):
                print("     ", round(T, 4), "->", round(100 * r, 4), "%")
        return zc_rate

    def build_exdiv_dates(self, exdiv_dates):
        """
        exdiv_dates:
          - list of projected ex-div dates
          - dict {ex-div date: year_fraction}
        Returns dict {ex-div date: year_fraction}
        """
        # Sanity check
        if exdiv_dates is None or len(exdiv_dates) == 0:
            raise ValueError("[MarketData] exdiv_dates is required and cannot be empty.")
        result = {}
        # dict input
        if isinstance(exdiv_dates, dict):
            for div_date, div_yf in exdiv_dates.items():
                if not isinstance(div_date, date):
                    raise ValueError("build_exdiv_dates: dict keys must be datetime.date.")
                try:
                    t = float(div_yf)
                except Exception:
                    raise ValueError("build_exdiv_dates: dict values must be numeric year fractions.")
                if t <= 0:
                    raise ValueError(f"build_exdiv_dates: ex-div {div_date} has non-positive year_fraction={t}. Check t0.")
                result[div_date] = t
        # list/tuple input
        elif isinstance(exdiv_dates, (list, tuple)):
            for div_date in exdiv_dates:
                if not isinstance(div_date, date):
                    raise ValueError("build_exdiv_dates: list elements must be datetime.date.")
                t = float(self.yf(div_date))
                if t <= 0:
                    raise ValueError(f"build_exdiv_dates: ex-div {div_date} is not after t0={self.t0} (yf={t}).")
                result[div_date] = t
        else:
            raise ValueError("build_exdiv_dates: exdiv_dates must be a list/tuple of dates or a dict {date: yf}.")
        # Sort by date
        result = dict(sorted(result.items(), key=lambda x: x[0]))
        if self.debug:
            print("\n[MarketData] Projected ex-div schedule:")
            for d, t in result.items():
                print("  -", d, "| yf=", round(float(t), 4))
        return result

    def _validate_options_data(self, options_data):
            """
            Validate options_data structure for de-Americanization.
            Required structure (per expiry date key):
                options_data[exp_key] = {
                    "strikes": [K1, K2, ...],
                    "call":    [C1, C2, ...],
                    "put":     [P1, P2, ...],
                    # "expiry_year_fraction": optional (if missing => computed from exp_key)
                }
            Returns validated_options_data with:
            - expiry_year_fraction always present
            - strikes/call/put sorted by strike
            """
            # Sanity check
            if options_data is None: raise ValueError("options_data is None.")
            if not isinstance(options_data, dict) or len(options_data) == 0:
                raise ValueError("options_data must be a non-empty dict keyed by expiry date")
            # Data validation
            validated_options_data = {}
            for exp, row in options_data.items():
                if not isinstance(exp, date):
                    raise ValueError(f"options_data keys must be datetime.date. Got {type(exp)}")
                if not isinstance(row, dict):
                    raise ValueError(f"options_data[{exp}] must be a dict")
                for k in ["strikes", "call", "put"]:
                    if k not in row:
                        raise ValueError(f"options_data[{exp}] missing key '{k}'")
                # expiry_year_fraction is optional
                T = row["expiry_year_fraction"] if ("expiry_year_fraction" in row) else self.yf(exp)
                if T <= 0: raise ValueError(f"options_data[{exp}]: expiry is not after t0={self.t0} (T={T})")
                # row data
                strikes = list(row["strikes"])
                calls   = list(row["call"])
                puts    = list(row["put"])
                if len(strikes) == 0: raise ValueError(f"options_data[{exp}]: empty strikes")
                if len(calls) != len(strikes) or len(puts) != len(strikes):
                    raise ValueError(f"options_data[{exp}]: strikes/call/put must have same length")
                # Validate row data
                strikes_f, calls_f, puts_f = [], [], []
                for i in range(len(strikes)):
                    K = strikes[i]
                    c = calls[i]
                    p = puts[i]
                    if K <= 0: raise ValueError(f"options_data[{exp}]: strike <= 0 at index {i}")
                    if c < 0 or p < 0: raise ValueError(f"options_data[{exp}]: negative price at index {i}")
                    strikes_f.append(K)
                    calls_f.append(c)
                    puts_f.append(p)
                # Sort by strike
                order = np.argsort(np.array(strikes_f, dtype=float))
                strikes_f = (np.array(strikes_f)[order]).tolist()
                calls_f   = (np.array(calls_f)[order]).tolist()
                puts_f    = (np.array(puts_f)[order]).tolist()
                validated_options_data[exp] = {
                    "expiry_year_fraction": T,
                    "strikes": strikes_f,
                    "call": calls_f,
                    "put": puts_f,
                }
            # Debug
            if self.debug:
                self._debug_options_data(validated_options_data)
            return validated_options_data

    def _debug_options_data(self, options_data, title="Options data"):
        """
        option_data debugger.
        """
        print(f"\n[MarketData] {title}:")
        expiries = sorted(options_data.keys())
        print("  - number of expiries:", len(expiries))
        for exp in expiries:
            row = options_data[exp]
            T = row["expiry_year_fraction"]
            strikes = row["strikes"]
            calls = row["call"]
            puts = row["put"]
            print("  -", exp, "| T=", round(T, 4), "| n_strikes=", len(strikes),
                "| K_range=", (round(strikes[0], 4), round(strikes[-1], 4)))
            idxs = [0, len(strikes) // 2, len(strikes) - 1] if len(strikes) >= 3 else list(range(len(strikes)))
            sample = []
            for i in idxs: sample.append((round(strikes[i], 4), round(calls[i], 4), round(puts[i], 4)))
            print("    sample (K, C, P):", sample)

    def _default_sigma_surface_dict(self, option_expiries, base_sigma=0.25, term_slope=-0.10, skew=-0.40, smile=0.80, moneyness_min=0.9, 
                                    moneyness_max=1.1, n_points=5, sigma_min=0.01, sigma_max=2.50):
        """
        Build a synthetic equity-like implied volatility surface as a dict, to be used in synthetic option generation.
        Volatility model (fast, stable, "equity-looking"):
          - Define log-moneyness: x = ln(K / S0)
          - Define an ATM term-structure ATM(T):
                If base_sigma is a float:
                    ATM(T) = base_sigma * (1 + term_slope * T)
                If base_sigma is a list/tuple:
                    ATM(T_i) = base_sigma[i] for each expiry i
                If base_sigma is a dict:
                    ATM(T) obtained by linear interpolation of the provided ATM points
          - Then define the full smile/skew at each expiry:
                sigma(T, K) = clip( ATM(T) * (1 + skew * x + smile * x^2), sigma_min, sigma_max )
        Interpretation:
          - skew < 0 reproduces the equity "downside skew":
              for puts K < S0 => x < 0 => skew * x > 0 => sigma increases (higher IV for OTM puts)
          - smile > 0 gives curvature (both wings rise smoothly) and prevents unrealistic purely-linear IV.
        This is not arbitrage-free: it is a deterministic surface for synthetic validation.
        """
        # Option expiries
        exp_list = [(exp, self.yf(exp)) for exp in option_expiries]
        exp_list.sort(key=lambda z: z[1])
        # Option strikes
        strikes = list(np.linspace(moneyness_min * self.S0, moneyness_max * self.S0, n_points))
        # Build ATM(T)
        def _atm_from_float(T):
            atm = base_sigma * (1.0 + term_slope * T)
            # Ensure reasonable atm values
            atm = max(min(atm, sigma_max), sigma_min)
            return atm
        atm_map = {}  # exp_date -> atm_vol
        # base_sigma is float
        if isinstance(base_sigma, (int, float)):
            for exp, T in exp_list:
                atm_map[exp] = _atm_from_float(T)
        # base_sigma is list/tuple of ATM vols per expiry
        elif isinstance(base_sigma, (list, tuple)):
            if len(base_sigma) != len(exp_list):
                raise ValueError("base_sigma list/tuple must have same length as expiries.")
            for i, (exp, T) in enumerate(exp_list):
                # Ensure reasonable atm values
                atm = max(min(base_sigma[i], sigma_max), sigma_min)
                atm_map[exp] = atm
        # base_sigma is dict => interpolate ATM vols
        elif isinstance(base_sigma, dict):
            xs, ys = [], []
            for exp, atm in base_sigma.items():
                T = self.yf(exp)
                if T <= 0: continue
                xs.append(T)
                ys.append(atm)
            if len(xs) < 2: raise ValueError("base_sigma dict must provide at least 2 valid ATM vol points.")
            xs = np.array(xs, dtype=float)
            ys = np.array(ys, dtype=float)
            o = np.argsort(xs)
            xs, ys = xs[o], ys[o]
            f_atm = interp1d(xs, ys, kind="linear", fill_value="extrapolate", assume_sorted=True)
            for exp, T in exp_list:
                atm = f_atm(T)
                # Ensure reasonable atm values
                atm = max(min(atm, sigma_max), sigma_min)
                atm_map[exp] = atm
        else:
            raise ValueError("base_sigma must be float, list/tuple (ATM vols per expiry), or dict (ATM vol points).")
        # Build sigma surface dict
        print("\n[MarketData] Synthetic equity-like implied volatility surface:")
        sigma_surface = {}
        for exp, T in exp_list:
            atm = atm_map[exp]
            # skew at exp
            skew_exp = {}
            for K in strikes:
                x = np.log(K / self.S0)
                s = atm * (1.0 + skew * x + smile * x * x)
                s = min(max(s, sigma_min), sigma_max)
                skew_exp[K] = s
            sigma_surface[exp] = skew_exp
            # Debug
            if self.debug:
                # Show some sigma values
                k_low = strikes[0]
                k_mid = strikes[len(strikes)//2]
                k_high = strikes[-1]
                print(
                    f"  - exp={exp} | T={T:.4f} | ATM={atm:.4f} "
                    f"| sigma(k_low={k_low:.4f})={skew_exp[k_low]:.4f} "
                    f"| sigma(k_mid={k_mid:.4f})={skew_exp[k_mid]:.4f} "
                    f"| sigma(k_high={k_high:.4f})={skew_exp[k_high]:.4f}"
                )
        self.sigma_surface = sigma_surface
        return sigma_surface

    def build_synthetic_options_data(self, option_expiries, expected_divs, binomial_model, base_sigma=0.25, term_slope=-0.10, 
                                     skew=-0.40, smile=0.80, moneyness_min=0.9, moneyness_max=1.1, n_points=5, 
                                     sigma_min=0.01, sigma_max=2.50):
        """
        Build synthetic American option prices.
        option_expiries:
          - list/tuple of expiry dates to consider
        expected_divs:
          - dict {ex-div date: div_amount}
        """
        # Sanity check
        if option_expiries is None or not isinstance(option_expiries, (list, tuple)) or len(option_expiries) == 0:
            raise ValueError("option_expiries is required and must be a non-empty list/tuple of datetime.date.")
        for exp in option_expiries:
            if not isinstance(exp, date):
                raise ValueError("option_expiries elements must be datetime.date")
        if not isinstance(expected_divs, dict):
            raise ValueError("expected_divs is required and must be a non-empty dict {ex-div date: div_amount}.")
        if set(self.exdiv_dates.keys()) != set(expected_divs.keys()):
            raise ValueError("expected_divs is required and must have the same dates as exdiv_dates input of the constructor.")
        print("\noptions_data not provided => Building synthetic options_data...")
        # Volatility surface
        sigma_surface = self._default_sigma_surface_dict(option_expiries=option_expiries, base_sigma=base_sigma, term_slope=term_slope, 
                                                         skew=skew, smile=smile, moneyness_min=moneyness_min, moneyness_max=moneyness_max, 
                                                         n_points=n_points, sigma_min=sigma_min, sigma_max=sigma_max)
        # Expected dividends
        divs = {}
        for div_date, div_yf in self.exdiv_dates.items():
            divs[div_date] = {"exdiv_year_fraction": div_yf, "dividend_cash": expected_divs[div_date]}
        # Build synthetic options_data by binomial pricing
        options_data = {}
        for exp in option_expiries:
            T = self.yf(exp)
            calls, puts = [], []
            # Pick sigma map for this expiry
            skew_exp = sigma_surface[exp]
            Ks = sorted(skew_exp)
            for K in Ks:
                sigma = skew_exp[K]
                call = binomial_model.binomial_price(T, K, sigma, True,  expected_divs=divs, isAmerican=True)
                put = binomial_model.binomial_price(T, K, sigma, False, expected_divs=divs, isAmerican=True)
                calls.append(call)
                puts.append(put)
            options_data[exp] = {
                "expiry_year_fraction": T,
                "strikes": Ks,
                "call": calls,
                "put": puts
            }
        # Validate & store
        options_data = self._validate_options_data(options_data)
        self.expected_divs = divs
        self.options_data = options_data
        return options_data, divs

---
## Synthetic Validation

To test and validate the implementation, we rely on a fully synthetic numerical setup. This choice is motivated by the strong sensitivity of the de-americanization procedure to market data quality: small inconsistencies or microstructure noise in American quotes can materially affect implied volatilities, early-exercise premia, and the resulting forward and dividend estimates. In addition, obtaining high-quality, internally consistent American option datasets with reliable dividend and rate inputs is often difficult without paid data sources. A controlled synthetic environment therefore provides a clean benchmark, allowing us to verify convergence, stability, and internal consistency of the pipeline before applying it to real market data.

In [4]:
# Analysis date
t0 = date(2025, 12, 12)

# Spot price
S0 = 111.80

# Options expiries
exp1 = date(2026, 3, 20)
exp2 = date(2026, 6, 18)
exp3 = date(2026, 9, 18)
option_expiries = [exp1, exp2, exp3]

# ZC curve
zc_points = {
    #t0:  0.0370,   # To force flat left extrapolation
    exp1: 0.0370,
    exp2: 0.0360,
    exp3: 0.0355,
}

# Projected ex-dividend dates
exdiv_dates = [
    date(2026, 2, 2),
    date(2026, 5, 4),
    date(2026, 8, 4),
]

# Expected dividend amounts ("True" dividends)
expected_divs = {
    date(2026, 2, 2): 0.58,
    date(2026, 5, 4): 0.62,
    date(2026, 8, 4): 0.60,
}

# Base volatility (~ATM)
base_volatility = 0.25 

# Build MarketData
md = MarketData(
    t0=t0,
    S0=S0,
    zc_points=zc_points,
    exdiv_dates=exdiv_dates,
    debug=True,
)

# The Binomial model uses MarketData’s curve
bm = binomial_model(S0=md.S0, zc_rate=md.zc_rate, n_steps=180)

# Generate synthetic market option prices inside MarketData
options_data, divs_struct = md.build_synthetic_options_data(
    option_expiries=option_expiries,
    expected_divs=expected_divs,
    binomial_model=bm,
    base_sigma=base_volatility,
)

print("\nTrue Dividends:", expected_divs)

# run de-Americanization on those synthetic prices
deAmer = deAmericanization_method(S0=md.S0, zc_rate=md.zc_rate, n_steps=180)
results = deAmer.implied_forwards(
    options_data=options_data,
    ex_div_dates=md.exdiv_dates,
    debug=True,
    forward_agg="median",
)

print("\nRecovered Dividends:", {
    d: round(results["implied_divs"][d]["dividend_cash"], 4)
    for d in results["implied_divs"]
})


[MarketData] ZC curve built by interpolating user-provided data:
  - type: TERM-STRUCTURE
  - kind: linear
  - points (T -> r):
      0.2685 -> 3.7 %
      0.5151 -> 3.6 %
      0.7671 -> 3.55 %

[MarketData] Projected ex-div schedule:
  - 2026-02-02 | yf= 0.1425
  - 2026-05-04 | yf= 0.3918
  - 2026-08-04 | yf= 0.6438

options_data not provided => Building synthetic options_data...

[MarketData] Synthetic equity-like implied volatility surface:
  - exp=2026-03-20 | T=0.2685 | ATM=0.2433 | sigma(k_low=100.6200)=0.2557 | sigma(k_mid=111.8000)=0.2433 | sigma(k_high=122.9800)=0.2358
  - exp=2026-06-18 | T=0.5151 | ATM=0.2371 | sigma(k_low=100.6200)=0.2492 | sigma(k_mid=111.8000)=0.2371 | sigma(k_high=122.9800)=0.2298
  - exp=2026-09-18 | T=0.7671 | ATM=0.2308 | sigma(k_low=100.6200)=0.2426 | sigma(k_mid=111.8000)=0.2308 | sigma(k_high=122.9800)=0.2237

[MarketData] Options data:
  - number of expiries: 3
  - 2026-03-20 | T= 0.2685 | n_strikes= 5 | K_range= (100.62, 122.98)
    sample (K, 