## Model Notes

- Forward price/dividend implementation lacks carry consideration
- Theta & Rho in Finite Difference is being overstated
- We will assume all Prices & dividends from yahoo finance are split adjusted

## FYI

- All black scholes models will be priced with forwards.
- Add to config:
    - N_PRECISION
    - DX_THRESH

## Break Down

```python

option_models/
├── black_scholes/
│   ├── base_model.py         # BlackScholes
│   ├── market_model.py       # MarketBlackScholes
│   ├── vectorized.py         # black_scholes_price(), greeks()
│   └── __init__.py
├── binomial/
│   ├── base_model.py
│   ├── market_model.py
│   ├── vectorized.py
│   └── __init__.py
├── monte_carlo/
│   ├── base_model.py
│   ├── market_model.py
│   ├── vectorized.py
│   └── __init__.py
```

## Market Model Flow:

Dividends -> Forward -> BlackScholes

- Information such as bumped spot, risk free rate, spot flows from dividends to BlackScholes
- Since `Stock` object implements singleton, all uncleared bumps will cause unintended effects
    - `Stock` singleton is tracked by (ticker, end_date).
- 

## To-Do
- Add enum

In [1]:
import enum

class GreekCalcStye(enum.Enum):
    """
    Enum to represent the style of Greek calculator.
    """
    analytical = "analytical"
    numerical = "numerical" 

In [2]:
%load_ext autoreload
%autoreload 2
from module_test.raw_code.optionlib.core.helpers import *
from module_test.raw_code.optionlib.core.base_model import *
from module_test.raw_code.optionlib.core.time_utils import *
from module_test.raw_code.optionlib.core.vars import *
from module_test.raw_code.optionlib.core import config, load_config
from datetime import datetime
from trade.helpers.helper import compare_dates
import numpy as np
import math
from datetime import datetime, date
from dateutil.rrule import rrule, MONTHLY
from typing import List, Tuple, Union
from dateutil.relativedelta import relativedelta
from abc import ABC, abstractmethod
import math
from scipy.stats import norm
from copy import deepcopy
from datetime import datetime
from trade.helpers.Logging import setup_logger
from trade.assets.Stock import Stock
from trade.helpers.Context import Context
from trade.helpers.helper import (CustomCache,
                                  retrieve_timeseries,
                                  is_USholiday,
                                  change_to_last_busday,
                                  Scalar)
from pandas.tseries.offsets import BDay
from dbase.DataAPI.ThetaData import (
    list_contracts,
    retrieve_eod_ohlc,
    retrieve_chain_bulk
)
from trade.assets.rates import get_risk_free_rate_helper
from scipy.optimize import minimize_scalar
import numpy as np
from math import erf, exp, log, sqrt
from typing import Literal, List, Tuple
logger = setup_logger('models.ipynb')


Console Logging & File Logging Can be configured using STREAM_LOG_LEVEL and FILE_LOG_LEVEL in environment variables.
Propagate to root logger can be set using PROPAGATE_TO_ROOT_LOGGER in environment variables.
Example:
STREAM_LOG_LEVEL = 'DEBUG'
FILE_LOG_LEVEL = 'INFO'
PROPAGATE_TO_ROOT_LOGGER = 'False'

2025-07-22 21:45:35 trade.helpers.Logging INFO: Logging Root Directory: /Users/chiemelienwanisobi/cloned_repos/QuantTools/logs
Using Proxy URL: http://54.144.4.219:5500/thetadata


In [3]:
import os
os.environ['PROXY_URL'] = ''

In [4]:
load_config()
config
DIVIDEND_FORECAST_METHOD = config['DIVIDEND_FORECAST_METHOD']
DIVIDEND_LOOKBACK_YEARS
config
VOL_EST_LOWER_BOUND

0.01

In [5]:
pd.options.plotting.backend = "plotly"

## Build Test Data from Current Market

In [6]:
retrieve_eod_ohlc?

[0;31mSignature:[0m
[0mretrieve_eod_ohlc[0m[0;34m([0m[0;34m[0m
[0;34m[0m    [0msymbol[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mend_date[0m[0;34m:[0m [0mstr[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mexp[0m[0;34m:[0m [0mstr[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mright[0m[0;34m:[0m [0mstr[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mstart_date[0m[0;34m:[0m [0mstr[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mstrike[0m[0;34m:[0m [0mfloat[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mprint_url[0m[0;34m=[0m[0;32mFalse[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mrt[0m[0;34m=[0m[0;32mTrue[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mproxy[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0;34m**[0m[0mkwargs[0m[0;34m,[0m[0;34m[0m
[0;34m[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;31mDocstring:[0m Interval size in miliseconds. 1 minute is 6000
[0;31mFile:[0m      ~/cloned_repos/FinanceDatabase/dbase/Data

In [7]:
ticks = ['AAPL', 'MSFT', 'GOOGL', 'AMZN', 'TSLA']
test_start, test_valuation_date = '2025-07-16', '2025-07-16'
def pick_random_option(tick, date):
    contracts = list_contracts(tick, date)
    # Pick a random contract from the list
    contract = np.random.choice(contracts.index)
    return contracts.iloc[contract]

def get_option_eod_price(date, contract_series):
    """
    Retrieves the end-of-day price for a given option contract on a specific date.
    
    Args:
        date (datetime): The date for which to retrieve the price.
        contract_series (pd.Series): The series containing option contract details.
        
    Returns:
        float: The end-of-day price of the option contract.
    """
    eod_data = retrieve_eod_ohlc(symbol=contract_series['root'],
                                  end_date=date,
                                  start_date=date,
                                  exp=str(contract_series['expiration']),
                                  right=contract_series['right'],
                                  strike=contract_series['strike'],
                                  )
    return eod_data.Midpoint[0]

def get_spot(tick, date):
    return retrieve_timeseries(tick, date, date)['close'][0]

contract = pick_random_option(ticks[0], test_start)
eod = get_option_eod_price(test_start, contract)
spot = retrieve_timeseries(ticks[0], test_start, test_start)['close'][0]
rates = get_risk_free_rate_helper()['annualized'][test_start]




Scheduled Data Requests will be saved to: /Users/chiemelienwanisobi/cloned_repos/QuantTools/module_test/raw_code/DataManagers/scheduler/requests.jsonl
[get_engine] Creating engine for DB: securities_master, PID: 95947
YF.download() has changed argument auto_adjust default to True


In [8]:
def format_dates(*args):
    return [pd.to_datetime(arg) for arg in args]

def subtract_dates(date1: datetime, date2: datetime) -> int:
    """
    Subtracts two dates and returns the difference in days.
    """

    return (pd.to_datetime(date1) - pd.to_datetime(date2)).days

def get_months_between(start_date: datetime, end_date: datetime) -> int:
    """
    Returns the number of months between two dates.
    """
    return (end_date.year - start_date.year) * 12 + end_date.month - start_date.month

def validate_dates(*args):
    """
    Validates if the input date is a valid datetime object.
    """
    for dt in args:
        if not isinstance(dt, (datetime, pd.Timestamp, date)):
            raise ValueError(f"Invalid date: {dt}. Expected a datetime object.")
        
        if is_USholiday(dt):
            raise ValueError(f"Date {dt} is a US holiday. Please choose a different date.")
        
        if dt.weekday() in [5, 6]:
            raise ValueError(f"Date {dt} falls on a weekend. Please choose a weekday.")



## NJIT Function

In [9]:
from numba import njit
import numpy as np
from math import exp, log, sqrt, erf

@njit
def norm_cdf(x):
    return 0.5 * (1.0 + erf(x / sqrt(2.0)))

@njit
def black_scholes_numba(F, K, T, r, sigma, option_type_int):
    n = len(F)
    prices = np.empty(n)

    for i in range(n):
        d1 = (log(F[i] / K[i]) + 0.5 * sigma[i] ** 2 * T[i]) / (sigma[i] * sqrt(T[i]))
        d2 = d1 - sigma[i] * sqrt(T[i])
        df = exp(-r[i] * T[i])

        if option_type_int[i] == 0:  # call
            prices[i] = df * (F[i] * norm_cdf(d1) - K[i] * norm_cdf(d2))
        elif option_type_int[i] == 1:  # put
            prices[i] = df * (K[i] * norm_cdf(-d2) - F[i] * norm_cdf(-d1))
        else:
            prices[i] = np.nan  # or raise error

    return prices


## Vectorized Functions

In [10]:
import numpy as np
from scipy.stats import norm


# ----------------------
# Vectorized Black-Scholes Pricing
# ----------------------
def black_scholes_vectorized(F, K, T, r, sigma, option_type="c"):
    F = np.asarray(F)
    K = np.asarray(K)
    T = np.asarray(T)
    r = np.asarray(r)
    sigma = np.asarray(sigma)

    d1 = (np.log(F / K) + 0.5 * sigma**2 * T) / (sigma * np.sqrt(T))
    d2 = d1 - sigma * np.sqrt(T)
    df = np.exp(-r * T)
    if option_type == "c":
        price = df * (F * norm.cdf(d1) - K * norm.cdf(d2))
    elif option_type == "p":
        price = df * (K * norm.cdf(-d2) - F * norm.cdf(-d1))
    else:
        raise ValueError("option_type must be 'c' or 'p'")

    return price

# ----------------------
# Vectorized Finite Differences (First Order)
# ----------------------
def finite_diff_first_order_vec(x: str, price_func, params: dict, dx_thresh=0.001, method="forward"):
    if x == 'T':
        dx = 1 / DAILY_BASIS
    else:
        dx = (params[x]) * dx_thresh  # Ensure dx is float

    def to_float_copy(val):
        if isinstance(val, (int, float, np.integer, np.floating)):
            return float(val)
        # elif isinstance(val, np.ndarray):
        #     try:
        #         return val.astype(np.float64)
        #     except ValueError:
        #         return val
        return val

    # Convert all values to float-compatible copies
    p0 = {k: to_float_copy(v) for k, v in params.items()}
    p1 = {k: to_float_copy(v) for k, v in params.items()}
    p2 = {k: to_float_copy(v) for k, v in params.items()}

    if method == "forward":
        p1[x] = p1[x] + dx
        return (price_func(**p1) - price_func(**p0)) / dx
    elif method == "backward":
        p1[x] = p1[x] - dx
        return (price_func(**p0) - price_func(**p1)) / dx
    elif method == "central":
        p1[x] = p0[x] + dx
        p2[x] = p0[x] - dx
        return (price_func(**p1) - price_func(**p2)) / (2 * dx)

    else:
        raise ValueError("Unknown method. Expected central, forward or backward")


# ----------------------
# Vectorized Finite Differences (Second Order)
# ----------------------
def finite_diff_second_order_vec(x: str, price_func, params: dict, dx_thresh=0.001, method="central"):
    if x == 'T':
        dx = 1 / DAILY_BASIS
    else:
        # dx = float(params[x]) * dx_thresh  # Ensure dx is float
        dx = (params[x]) * dx_thresh  # Ensure dx is float

    def to_float_copy(val):
        if isinstance(val, (int, float, np.integer, np.floating)):
            return float(val)
        # elif isinstance(val, np.ndarray):
        #     try:
        #         return val.astype(np.float64)
        #     except ValueError:
        #         return val
        return val

    # Convert all values to float-compatible copies
    p0 = {k: to_float_copy(v) for k, v in params.items()}
    p1 = {k: to_float_copy(v) for k, v in params.items()}
    p2 = {k: to_float_copy(v) for k, v in params.items()}


    if method == "central":
        p1[x] = p0[x] + dx
        p2[x] = p0[x] - dx
        return (price_func(**p1) - 2 * price_func(**p0) + price_func(**p2)) / dx**2

    elif method == "forward":
        p1[x] = p1[x] + dx
        p2[x] = p2[x] + 2 * dx
        return (price_func(**p2) - 2 * price_func(**p1) + price_func(**p0)) / dx**2
    elif method == "backward":
        p1[x] = p1[x] - dx
        p2[x] = p2[x] - 2 * dx
        return (price_func(**p0) - 2 * price_func(**p1) + price_func(**p2)) / dx**2
    else:
        raise ValueError("Unknown method. Expected central, forward or backward")


# ----------------------
# ForwardModel Wrappers
# ----------------------
class ForwardWrapper:
    def __init__(self, models: list):
        self.models = models

    def get_forward_array(self):
        return np.array([model.get_forward_price() for model in self.models])



# Example usage:
# F_cont = vectorized_forward_continuous(S, r, q, T)
# F_disc = vectorized_forward_discrete(S, r, T, div_times, div_amounts)


def black_scholes_analytic_greeks_vectorized(F, K, T, r, sigma, option_type="c"):
    F = np.asarray(F)
    K = np.asarray(K)
    T = np.asarray(T)
    r = np.asarray(r)
    sigma = np.asarray(sigma)

    d1 = (np.log(F / K) + 0.5 * sigma**2 * T) / (sigma * np.sqrt(T))
    d2 = d1 - sigma * np.sqrt(T)
    nd1 = norm.pdf(d1)
    df = np.exp(-r * T)

    # Handle both scalar and vector string inputs for option_type
    option_type = np.asarray(option_type)

    # Enfores option_type
    if not np.all(np.isin(option_type, ["c", "p"])):
        raise ValueError("option_type must be 'c' or 'p'")

    is_call = option_type == "c"

    delta = np.where(is_call, norm.cdf(d1), -norm.cdf(-d1))
    gamma = nd1 / (F * sigma * np.sqrt(T))
    vega = F * nd1 * np.sqrt(T)
    volga = vega * d1 * d2 / sigma
    rho = np.where(
        is_call,
        T * K * df * norm.cdf(d2),
        -T * K * df * norm.cdf(-d2)
    )
    theta = - (F * nd1 * sigma) / (2 * np.sqrt(T)) \
            - r * K * df * np.where(is_call, norm.cdf(d2), norm.cdf(-d2))

    return {
        "delta": delta,
        "gamma": gamma,
        "vega": vega / 100,
        "volga": volga / 100**2,
        "rho": rho / 100,
        "theta": theta/DAILY_BASIS
    }



## FiniteGreekEstimator

In [11]:
import math
from copy import deepcopy
from typing import Callable, Dict, Union

class FiniteGreeksEstimator:
    def __init__(self,
                 price_func: Callable,
                 base_params: Dict[str, Union[float, int]],
                 dx_thresh: float = 0.001,
                 method: str = 'central'):
        """
        Estimate Greeks using finite difference methods.

        Parameters:
        - price_func: Callable pricing function accepting kwargs.
        - base_params: Dictionary with keys like 'S', 'K', 'T', 'r', 'sigma', 'q'.
        - dx_thresh: Relative step size.
        - method: 'forward', 'backward', or 'central'.
        """
        self.price_func = price_func
        self.params = base_params.copy()
        self.dx_thresh = dx_thresh
        self.method = method.lower()
        self._validate_params()

    def _validate_params(self):
        required = {'S', 'K', 'T', 'r', 'sigma', 'q'}
        missing = required - set(self.params.keys())
        if missing:
            raise ValueError(f"Missing required keys: {missing}")
        if self.method not in {'forward', 'backward', 'central'}:
            raise ValueError(f"Invalid method '{self.method}'. Must be 'forward', 'backward', or 'central'.")

    def _step(self, x: str) -> float:
        val = self.params[x]
        return max(self.dx_thresh * abs(val), 1e-6)

    def first_order(self, x: str) -> float:
        return finite_diff_first_order_vec(x, self.price_func, self.params, dx_thresh=self.dx_thresh, method=self.method)

    def second_order(self, x: str) -> float:
        return finite_diff_second_order_vec(x, self.price_func, self.params, dx_thresh=self.dx_thresh, method=self.method)

    def all_first_order(self) -> Dict[str, float]:
        
        return {
            'delta': self.first_order('S'),
            'vega': self.first_order('sigma') * 0.01,
            'theta': -self.first_order('T')/DAILY_BASIS,
            'rho': self.first_order('r')*0.01
        }

    def all_second_order(self) -> Dict[str, float]:
        return {
            'gamma': self.second_order('S'),
            'volga': self.second_order('sigma') * 0.0001,
        }


## DIVIDENDS

- Base Model:
    - DividendSchedule, ContinousDividends

- Market Model: 
    - MarketDividendSchedule, MarketContinuousDividends

- Vectorized:
    - Final Value: List[PvDivs], List[DiscountFactor]

In [12]:
class Dividend(ABC):
    @abstractmethod
    def get_present_value(self, *args, **kwargs) -> float:
        """
        Calculate the present value of the dividend.
        """
        pass
    
    @abstractmethod
    def get_type(self) -> str:
        """
        Get the type of the dividend.
        """
        pass
    
    def __repr__(self):
        return f"<{self.__class__.__name__}: {self.get_type()}>"

### Base Model

In [13]:


FREQ_MAP = {
    "monthly": 1,
    "quarterly": 3,
    "semiannual": 6,
    "annual": 12,
}

class DividendSchedule(Dividend):
    def __init__(
        self,
        start_date: datetime,
        end_date: datetime,
        freq: str = "quarterly",
        amount: Union[float, List[float]] = 1.0,
        valuation_date: datetime = None,
        basis: int = 365,
        **kwargs: Union[str, int, float, datetime, None]
    ):
        """
        Initialize a DividendSchedule object.
        start_date: datetime - The start date of the dividend schedule. Starts the schedule.
        end_date: datetime - The end date of the dividend schedule.
        freq: str - The frequency of dividends ('monthly', 'quarterly', 'semiannual', 'annual').
        amount: float or list - The dividend amount (can be a scalar or a list).
        valuation_date: datetime - The date for valuation purposes.
        basis: int - The day count basis (default is 365).
        """
        if freq not in FREQ_MAP:
            raise ValueError(f"Unsupported frequency '{freq}'. Use one of {list(FREQ_MAP.keys())}.")
        self.start_date = start_date
        self.end_date = end_date
        self.freq = freq
        self.valuation_date = valuation_date or start_date
        self.basis = basis
        self.input_amount = amount
        self._setup_schedule()

    def _setup_schedule(self):
        """
        """
        # Generate dividend dates
        months = FREQ_MAP[self.freq]

        
        ## Generate dates using rrule
        self.dates = list(rrule(freq=MONTHLY, interval=months, dtstart=self.start_date, until=self.end_date))
        self.dates = [dt for dt in self.dates if compare_dates.is_after(dt, self.start_date)]
        if not self.dates:
            raise ValueError("No dividend dates generated. Check your start and end dates.")

        # Handle amount (scalar or list)
        if isinstance(self.input_amount, list):
            if len(self.input_amount) < len(self.dates):
                raise ValueError("Amount list must cover all dividend dates.")
            self.amounts = self.input_amount[:len(self.dates)]
        else:
            self.amounts = [self.input_amount] * len(self.dates)

        self.schedule = list(zip(self.dates, self.amounts))

    def get_schedule(self) -> List[Tuple[datetime, float]]:
        return self.schedule
    

    def get_year_fractions(self) -> List[Tuple[float, float]]:
        return [
            (time_distance_helper(dt, self.valuation_date), amt)
            for dt, amt in self.schedule
            if dt > self.valuation_date
        ]
    
    def get_present_value(self, discount_rate: float, sum_up: bool=True, **kwargs) -> float:
        """
        Calculate the present value of the dividend schedule using a discount rate.
        discount_rate: float - The discount rate to apply.
        """
        pv = []
        for dt, amt in self.schedule:
            if compare_dates.is_after(dt, self.valuation_date):
                time_fraction = time_distance_helper(dt, self.valuation_date)
                pv_amt = amt * math.exp(-discount_rate * time_fraction)
                pv.append(pv_amt)
        return sum(pv) if sum_up else pv
    
    def get_type(self) -> str:
        """
        Get the type of the dividend schedule.
        """
        return "discrete"
    
    def __repr__(self):
        return f"<DividendSchedule: {len(self.schedule)} dividends from {self.start_date.strftime('%Y-%m-%d')} to {self.end_date.strftime('%Y-%m-%d')}>"


In [14]:
class ContinuousDividendYield(Dividend):
    def __init__(self, 
                 yield_rate: float, 
                 start_date: datetime, 
                 end_date: datetime ,
                 valuation_date: datetime = None,
                 **kwargs):
        """
        Initialize a ContinuousDividendYield object.
        yield_rate: float - The continuous dividend yield (between 0 and 1).
        start_date: datetime - The date when the yield starts.
        valuation_date: datetime - The date for valuation purposes (default is start_date).
        """
        super().__init__()
        if not (0 <= yield_rate < 1):
            raise ValueError("Dividend yield must be between 0 and 1.")
        self.yield_rate = yield_rate
        self.start_date = start_date
        self.valuation_date = valuation_date or start_date
        self.end_date = end_date 
        self.T = time_distance_helper(self.end_date, self.valuation_date, )

    def get_yield(self) -> float:
        return self.yield_rate

    def get_present_value(self, end_date: datetime = None, **kwargs) -> float:
        """
        Return the exponential discount factor from q over T:
        e^{-qT}
        """
        T = self.T if end_date is None else time_distance_helper(end_date, self.valuation_date)
        return math.exp(-self.yield_rate * T)
    
    def get_type(self) -> str:
        """
        Get the type of the dividend yield.
        """
        return "continuous"

    def __repr__(self):
        return f"<ContinuousDividendYield: q={self.yield_rate:.4f}>"


### Market Model

In [15]:
DIVIDEND_CACHE = CustomCache(
    location = '/Users/chiemelienwanisobi/cloned_repos/QuantTools/.cache',
    fname='dividend_cache',
    clear_on_exit=True,)


from openbb import obb
def get_div_schedule(ticker, filter_specials=True):
    if ticker not in DIVIDEND_CACHE:
        try:
            div_history = obb.equity.fundamental.dividends(symbol=ticker, provider='yfinance').to_df()
            div_history.set_index('ex_dividend_date', inplace = True)
            DIVIDEND_CACHE.set(ticker, div_history)

            div_history['amount'] = div_history['amount'].astype(float)
            div_history.index = pd.to_datetime(div_history.index)
        except Exception as e:
            logger.error(f"Error fetching dividend schedule for {ticker}: {e}")
            div_history = pd.DataFrame({'amount':[0]}, index = pd.bdate_range(start=OPTION_TIMESERIES_START_DATE, end=datetime.now(), freq='1Q'))
        DIVIDEND_CACHE[ticker] = div_history
    
    else:
        div_history = DIVIDEND_CACHE[ticker]
    
    # Filter out dividends >= 7.5
    if filter_specials:
        div_history = div_history[div_history['amount'] < 7.5]
        
    return div_history.sort_index()
aapl = get_div_schedule('TSLA')

2025-07-22 21:46:45 models.ipynb ERROR: Error fetching dividend schedule for TSLA: 
[Error] -> Error getting data for TSLA: No dividend data found for TSLA


In [16]:
valuation_date = '2025-01-05'
date_diffs = aapl.index.to_series().diff()
day_diffs = date_diffs.dt.days
most_common_day = day_diffs.mode()[0]

def classify_frequency(days):
    if 20 < days < 40:
        return "monthly"
    elif 50 < days < 80:
        return "bi-monthly"
    elif 80 < days < 110:
        return "quarterly"
    elif 170 < days < 200:
        return "semi-annual"
    elif 330 < days < 370:
        return "annual"
    else:
        return "irregular"
    
frequency_labels = day_diffs.apply(classify_frequency)
frequency_labels.mode()[0]


'quarterly'

In [17]:
import numpy as np
import pandas as pd
from datetime import datetime
from sklearn.linear_model import LinearRegression

def infer_frequency(div_history: pd.DataFrame):
    date_diffs = div_history.index.to_series().diff()
    day_diffs = date_diffs.dt.days
    most_common_day = day_diffs.mode()[0]
    frequency_labels = day_diffs.apply(classify_frequency)
    return frequency_labels.mode()[0]
    

def infer_dividend_growth_rate(div_df: pd.DataFrame, valuation_date: datetime, lookback_years: int = 5, method: str = 'cagr'):
    # Ensure proper datetime format and sort
    valuation_date = pd.to_datetime(valuation_date).date()
    div_df = div_df.sort_index()
    div_df = div_df.loc[div_df.index.date < valuation_date]

    if all(div_df.amount == 0.0):
        return 0.0

    # Filter by lookback period
    cutoff_date = valuation_date - pd.DateOffset(years=lookback_years)
    df_filtered = div_df.loc[div_df.index.date >= cutoff_date.date()].copy()

    if len(df_filtered) < 2:
        raise ValueError("Not enough data in the lookback window to estimate growth rate.")
    
    if method == 'constant':
        return 0.0  # No growth

    elif method == 'avg':
        # Compute year-over-year changes
        # Special cutoff to ensure we have enough data
        # Avg needs at least 2 years of data to compare years
        cutoff_date = valuation_date - pd.DateOffset(years=lookback_years+1)
        df_filtered = div_df.loc[div_df.index.date >= cutoff_date.date()].copy()
        df_filtered['year'] = df_filtered.index.year
        yearly_avg = df_filtered.groupby('year')['amount'].mean()
        diffs = yearly_avg.pct_change().dropna()
        return diffs.mean()

    elif method == 'cagr':
        first_date = df_filtered.index[0]
        last_date = df_filtered.index[-1]
        n_years = (last_date - first_date).days / DAILY_BASIS
        start = df_filtered.iloc[0]['amount']
        end = df_filtered.iloc[-1]['amount']
        if start <= 0:
            raise ValueError("Starting dividend must be positive for CAGR.")
        return (end / start) ** (1 / n_years) - 1

    elif method == 'regression':
        df_filtered['ordinal_date'] = df_filtered.index.map(datetime.toordinal) ## Convert dates to ordinal (numbers) for regression
        df_filtered['log_amount'] = np.log(df_filtered['amount']) ## Log-transform the amount for regression
        X = df_filtered[['ordinal_date']]
        y = df_filtered['log_amount']
        model = LinearRegression().fit(X, y)
        # Convert daily log return to annualized rate
        annualized_growth = model.coef_[0] * DAILY_BASIS
        return annualized_growth

    else:
        raise ValueError(f"Unknown method '{method}'. Choose from 'constant', 'avg', 'cagr', 'regression'.")
    

def get_last_dividends(div_history: pd.DataFrame, valuation_date: datetime, size = 4) -> Tuple[datetime, float]:
    """
    Get the nearest dividend date and amount after the valuation date.
    
    div_history: pd.DataFrame - Historical dividend data with 'amount' column.
    valuation_date: datetime - The date for valuation purposes.
    
    Returns a tuple of (nearest_dividend_date, nearest_dividend_amount).
    """
    valuation_date = format_dates(valuation_date)[0].date()
    future_divs = div_history[div_history.index.date <= valuation_date]
    
    if future_divs.empty:
        return 0.0
    
    # Get the last 'size' dividends before the valuation date
    last_divs = sum(future_divs.tail(size)['amount'])
    
    return last_divs

def project_dividends(
        valuation_date: datetime,
        end_date: datetime,
        div_history: pd.DataFrame,
        inferred_growth_rate: float,
):
    """
    Project future dividends based on historical data and inferred growth rate.
    
    valuation_date: datetime - The date for valuation purposes.
    expiration_date: datetime - The date when the option expires.
    div_history: pd.DataFrame - Historical dividend data with 'amount' column.
    inferred_growth_rate: float - Estimated annual growth rate of dividends.
    
    Returns a list of projected dividends with their payment dates.
    """
    end_date, valuation_date = format_dates(end_date, valuation_date)
    typical_spacing = div_history.index.to_series().diff().dt.days.mode()[0]
    expected_dividend_size = int((subtract_dates(end_date, valuation_date) // typical_spacing) + 1)
    period_inferred = classify_frequency(typical_spacing)
    past_divs = div_history.loc[div_history.index.date < valuation_date.date()]
    last_div = past_divs.iloc[-1]['amount'] if not past_divs.empty else 0.0
    last_date = past_divs.index[-1].date() if not past_divs.empty else valuation_date
    periodic_growth = inferred_growth_rate/ (12/FREQ_MAP[period_inferred])
    dividend_list = [last_div * (1 + periodic_growth) ** i 
                    for i in range(expected_dividend_size)]
    
    return dividend_list, [last_date + relativedelta(months=i * FREQ_MAP[period_inferred]) for i in range(expected_dividend_size)], last_date


infer_dividend_growth_rate(
    aapl,
    valuation_date=datetime(2025, 1, 5).date(),
    lookback_years=5,
    method='regression'
)

0.0

In [18]:
valuation_date=datetime(2025, 1, 5).date()
expiration_date = datetime(2025, 8, 31).date()
typical_spacing = aapl.index.to_series().diff().dt.days.mode()[0]
expected_dividend_size = int(((expiration_date - valuation_date).days // typical_spacing) + 1)
period_inferred = classify_frequency(typical_spacing)
past_divs = aapl.loc[aapl.index.date < valuation_date]
last_div = past_divs.iloc[-1]['amount']
last_date = past_divs.index[-1].date()
last_div, last_date, typical_spacing, period_inferred, expected_dividend_size, (expiration_date - valuation_date).days

(0, datetime.date(2024, 12, 31), 92.0, 'quarterly', 3, 238)

In [19]:

yearly_growth = infer_dividend_growth_rate(
    aapl,
    valuation_date=valuation_date,
    lookback_years=2,
    method='cagr'
)
periodic_growth = yearly_growth/ (12/FREQ_MAP[period_inferred])
dividend_list = [last_div * (1 + periodic_growth) ** i 
                 for i in range(expected_dividend_size)]
dividend_list, yearly_growth

([0.0, 0.0, 0.0], 0.0)

In [20]:
project_dividends(
    valuation_date=datetime(2025, 1, 5).date(),
    end_date = datetime(2025, 8, 31).date(),
    div_history=aapl,
    inferred_growth_rate=infer_dividend_growth_rate(
        aapl,
        valuation_date=datetime(2025, 1, 5).date(),
        lookback_years=2,
        method='cagr'
    )
)

([0.0, 0.0, 0.0],
 [datetime.date(2024, 12, 31),
  datetime.date(2025, 3, 31),
  datetime.date(2025, 6, 30)],
 datetime.date(2024, 12, 31))

In [21]:
class MarketDividendSchedule(DividendSchedule):
    """
    A dividend schedule that projects future dividends based on historical data and inferred growth rates.
    This class extends the DividendSchedule class to include methods for inferring growth rates and projecting dividends.
    """
    def __init__(self, ticker: str, 
                 start_date: datetime, 
                 end_date: datetime, 
                 valuation_date: datetime = None, 
                 lookback_years: int = DIVIDEND_LOOKBACK_YEARS, 
                 **kwargs):
        """
        Initialize a MarketDividendSchedule object.
        ticker: str - The stock ticker symbol.
        start_date: datetime - The start date for the dividend schedule.
        end_date: datetime - The end date for the dividend schedule.
        valuation_date: datetime - The date for valuation purposes (default is start_date).
        lookback: int - Number of years to look back for historical dividends (default is 8).

        ps: user can set spot price at asset level by utilizing div_object.spot_price = xxx
        """
        

        ## Validate Dates
        validate_dates(valuation_date, start_date, end_date)

        ## Format Dates
        valuation_date, start_date, end_date = format_dates(valuation_date, start_date, end_date)
        
        with Context(end_date=valuation_date.strftime("%Y-%m-%d")): ## To ensure spot being accessed is for the specific valuation date
            self.asset = Stock(ticker)
            
        div = get_div_schedule(ticker, filter_specials=True)
        div = div[div.index.date <= valuation_date.date()] # Filter to include only dividends before the valuation date
        self.ticker = ticker
        self.lookback_years = lookback_years
        self.div_history = div
        self._projected_freq = self._infer_frequency(self.div_history)
        self.growth_method = kwargs.get('growth_method', DIVIDEND_FORECAST_METHOD)
        self.amount = 0.0
        self.valuation_date = valuation_date or start_date
        self.growth_rate = self._infer_growth_rate(self.div_history, lookback=self.lookback_years, method=self.growth_method)
        self._projected_dividends, self.payment_dates = self._project_schedule(div_history=self.div_history, 
                                                           start_date=start_date, end_date=end_date, 
                                                           valuation_date=valuation_date, **kwargs)
        self.model_start_date = start_date
        
        # # Create the schedule
        super().__init__(
            start_date=self.last_div_date or start_date,
            end_date=end_date,
            freq=self._projected_freq,
            amount=self._projected_dividends,
            valuation_date=valuation_date or start_date,
            **kwargs
        )
    

    @property
    def spot_price(self):
        return self.asset.spot_price
    

    @spot_price.setter
    def spot_price(self, v):
        self.asset.spot_price = v

        
    def _infer_frequency(self, div_history):
        """
        Infer the frequency of dividends based on the historical data.
        """
        if div_history.empty:
            return "quarterly"
        return infer_frequency(div_history)
        

    def _infer_growth_rate(self, div_history, lookback=8, method='cagr'):
        """
        Infer the dividend growth rate based on historical data.
        """
        if div_history.empty:
            return 0.0
        
        return infer_dividend_growth_rate(
            div_history,
            valuation_date=self.valuation_date,
            lookback_years=lookback,
            method=method
        )
    
    def _project_schedule(self, div_history, start_date, end_date, valuation_date=None, **kwargs):
        """
        Project future dividends based on historical data and inferred growth rate.
        """
        if div_history.empty:
            return 0.0
        dividend_list, payment_dates, last_date = project_dividends(
            valuation_date=valuation_date or start_date,
            end_date=end_date,
            div_history=div_history,
            inferred_growth_rate=self.growth_rate
        )
        self.last_div_date = last_date
        return dividend_list, payment_dates

In [22]:



class MarketContinuousDividends(ContinuousDividendYield):
    """
    A continuous dividend yield model that uses historical dividend data as forward dividend yield
    """
    def __init__(self, ticker: str, 
                 start_date: datetime, 
                 end_date: datetime, 
                 valuation_date: datetime = None, 
                 spot_price: float = None,
                 **kwargs):
        """
        Initialize a MarketContinuousDividends object.
        ticker: str - The stock ticker symbol.
        start_date: datetime - The start date for the dividend schedule.
        end_date: datetime - The end date for the dividend schedule.
        valuation_date: datetime - The date for valuation purposes (default is start_date).
        lookback_years: int - Number of years to look back for historical dividends (default is 8).
        """
        validate_dates(valuation_date, start_date, end_date)
        valuation_date, start_date, end_date = format_dates(valuation_date, start_date, end_date)
        div = get_div_schedule(ticker, filter_specials=True)
        div = div[div.index.date <= valuation_date.date()] ## Filter to include only dividends before the valuation date
        self.div = div
        self._projected_freq = self._infer_frequency(div)
        self.ticker = ticker
        with Context(end_date=valuation_date.strftime("%Y-%m-%d")): ## To ensure spot being accessed is for the specific valuation date
            self.asset = Stock(ticker)
        self.__set_yield_rate()
        
        # Initialize the ContinuousDividendYield with the inferred yield rate
        super().__init__(
            yield_rate=self.current_q,
            start_date=start_date,
            end_date=end_date,
            valuation_date=valuation_date or start_date,
            **kwargs
        )

    @property
    def spot_price(self):
        return self.asset.spot_price
    

    @spot_price.setter
    def spot_price(self, v):
        self.asset.spot_price = v
        self.__set_yield_rate()

    def __set_yield_rate(self):
        self.current_q = get_last_dividends(self.div, 
                                            valuation_date=valuation_date, 
                                            size=FREQ_MAP[self._projected_freq]) / self.spot_price 
        self.yield_rate = self.current_q
        


    def _infer_frequency(self, div_history):
        """
        Infer the frequency of dividends based on the historical data.
        """
        if div_history.empty:
            return "quarterly"
        return infer_frequency(div_history)

In [23]:
Stock.list_instances()

{}

In [24]:
a=MarketContinuousDividends(
    ticker='AAPL',
    start_date=datetime(2024, 1, 2),
    end_date=datetime(2025, 1, 2),
    valuation_date=datetime(2025, 1, 6),
)
a.asset.clear_bump()
print(f"Yield Rate before 10% bump: {a.yield_rate}")
a.spot_price *= 1.10
print(f"Yield Rate after 10% bump: {a.yield_rate}")
a.asset.clear_bump()

Yield Rate before 10% bump: 0.003061224489795918
Yield Rate after 10% bump: 0.0027829313543599257


In [25]:
aapl_div_schedule = MarketDividendSchedule(
    ticker='COST',
    start_date=datetime(2025, 1, 3),
    end_date=datetime(2025, 8, 29),
    valuation_date=datetime(2025, 1, 3),
    lookback_years=1,
    growth_method='cagr'
)

aapl_div_schedule.spot_price


916.5800170898438

In [26]:
aapl_div_schedule.amounts

[1.16, 1.214237436915176, 1.2710108217296003]

### Vectorized Dividends

In [27]:
from typing import List


## Seperate Market Query and Raw Calc Function
def get_div_histories(tickers:List):
    assert_equal_length(tickers)
    unique_ticks = set(tickers)
    tick_history = {
        t: get_div_schedule(t) for t in unique_ticks
    }
    return tick_history
def assert_equal_length(*args):
    """
    Assert that all input lists have the same length.
    """
    lengths = [len(arg) for arg in args]
    if len(set(lengths)) != 1:
        raise ValueError("All input lists must have the same length.")
    return True

def convert_to_array(*args):
    """
    Convert input lists to numpy arrays.
    """
    return [np.asarray(arg)  for arg in args]

def get_vectorized_dividend_scehdule(
        tickers:list|np.ndarray,
        start_dates: List[datetime],
        end_dates: List[datetime],
        valuation_dates: List[datetime] = None,
        **kwargs
):

    schedules = []
    lookback_yrs = kwargs.get('lookback_yrs', DIVIDEND_LOOKBACK_YEARS)
    method =  kwargs.get('method', DIVIDEND_FORECAST_METHOD)
    tick_history = get_div_histories(tickers)   
    for ticker, start_date, end_date, val_date in zip(
        tickers,
        start_dates,
        end_dates,
        valuation_dates
    ):
        gr = infer_dividend_growth_rate(tick_history[ticker], 
                                        val_date, 
                                        lookback_yrs, 
                                        method)
        payments, dates, _ = project_dividends(valuation_date=val_date,
                              end_date=end_date,
                              div_history=tick_history[ticker],
                              inferred_growth_rate=gr)
        schedules.append(list(zip(payments, dates)))
    return schedules

def vector_convert_to_time_frac(
        schedules: List[list],
        valuation_dates: List[datetime],
        end_dates: List[datetime]
):
    """
    Convert a list of schedules to a list of time fractions.
    
    schedules: List[list] - List of schedules where each schedule is a list of (amount, date) tuples.
    valuation_dates: List[datetime] - List of valuation dates corresponding to each schedule.
    end_dates: List[datetime] - List of end dates corresponding to each schedule.
    
    Returns a list of lists containing time fractions and amounts.
    """
    assert_equal_length(schedules, valuation_dates, end_dates)
    time_fractions = []
    for i, sch in enumerate(schedules):
        time_fractions.append([
            (time_distance_helper(dt, valuation_dates[i]), amt) 
            for amt, dt in sch if compare_dates.is_after(dt, valuation_dates[i])
        ])
    return time_fractions

def vectorized_discrete_pv(
        schedules: List[list],
        r: List[list],
        _valuation_dates: List[datetime],
        _end_dates: List[datetime]
):
    assert_equal_length(schedules, r, _end_dates, _valuation_dates)
    pv = []
    for i,sch in enumerate(schedules):
        pv.append(sum([ ## Calculating the sum
            (x* math.exp(-r[i] * time_distance_helper(dt, _valuation_dates[i]))) ## Applying discount factor
                        for x, dt in sch if compare_dates.inbetween(dt, start=_valuation_dates[i],
                                                                    end=_end_dates[i],
                                                                    inclusive=False) ## Filtering for dt after Val
        ]))
    return pv


def get_vectorized_dividend_rate(
        tickers:str|List[str],
        spots: List[float],
        valuation_dates: List[datetime]
):
    """
    Get the vectorized dividend rate for a list of tickers based on their historical dividend data.
    
    tickers: str or List[str] - Ticker symbols of the stocks.
    spots: List[float] - Current spot prices for each ticker.
    valuation_dates: List[datetime] - Dates for which to calculate the dividend rates.
    
    Returns a numpy array of dividend rates.
    """
    assert_equal_length(tickers, spots, valuation_dates)
    tick_history = get_div_histories(tickers)
    div_rates = [
        get_last_dividends(tick_history[t], valuation_dates[i]) / spots[i]
        for i, t in enumerate(tickers)
    ] 
    return np.array(div_rates)


def get_vectorized_continuous_dividends(
        div_rates: List[float],
        _valuation_dates: List[datetime],
        _end_dates: List[datetime]
):
    assert_equal_length(div_rates, _valuation_dates, )
    discounted = [
        math.exp(-div_rate * time_distance_helper(_end_dates[i], _valuation_dates[i], ))
        for i, div_rate in enumerate(div_rates)
    ]
    return np.array(discounted)
    


In [28]:
tickers = ['TSLA', 'CVX', 'KO']
r = [0.045, 0.055, 0.005]
spots=[100,200,300]
start_dates = [datetime(2025, 1, 5), datetime(2025, 1, 5), datetime(2025, 1, 5)]
end_dates = [datetime(2025, 8, 31), datetime(2025, 8, 31), datetime(2025, 8, 31)]
valuation_dates = [datetime(2025, 1, 5), datetime(2025, 1, 5), datetime(2025, 1, 5)]
schedules = get_vectorized_dividend_scehdule(
    tickers=tickers,
    start_dates=start_dates,
    end_dates=end_dates,
    valuation_dates=valuation_dates
)
discrete_pv = vectorized_discrete_pv(schedules, 
                                     r,
                                     _valuation_dates=valuation_dates,
                                     _end_dates=end_dates)
discrete_pv

2025-07-22 21:46:54 trade.asset.Stock ERROR: Error getting previous close for COST from yfinance: 'prev_close'


[0.0, 3.313200930937935, 0.9885140505035921]

In [29]:
div_rates = get_vectorized_dividend_rate(
    tickers=tickers,
    spots=spots,
    valuation_dates=valuation_dates
)
cont_q = get_vectorized_continuous_dividends(
    div_rates=div_rates,
    _valuation_dates=valuation_dates,
    _end_dates=end_dates

)
cont_q

array([1.        , 0.97898159, 0.99579513])

### TESTS: MarketDividendSchedule MAE Test

- Testing how MAE Changes for 

In [30]:
from random import randint, seed, random
from itertools import product

## Get Tickers
tickers = [
    'AAPL',
    'BAC',
    'MA',
    'MSFT',
    'NEE',
    'O',
    'AVGO',
    'JNJ',
    'PG',
    'KO',
    'PEP',
    'XOM',
    'CVX',
    'VZ',
    'T',
    'WMT',
    'MCD',
    'HD',
    'PFE',
    'ABBV',
    'MMM',
    'JPM',
    'UNH',
    'TGT',
]

tickers = set(tickers)  # Ensure unique tickers
# Get Random Valuation Dates
valuation_dates = pd.bdate_range('2017-01-01', '2025-05-15')
_seed = np.random.seed(11)
randInts= np.random.random_integers(0, len(valuation_dates), 10)
valuation_dates = valuation_dates[randInts]

## Start Date = Val_date - 1yr
start_dates = [ x - BDay(252) for x in valuation_dates]

## End Date = Val Date + 1 Yr
end_dates = [ x + BDay(252) for x in valuation_dates]


In [31]:

aapl_div_schedule = MarketDividendSchedule(
    ticker='CVX',
    start_date=start_dates[0],
    end_date=end_dates[0],
    valuation_date=valuation_dates[0],
    lookback_years=1,
    growth_method='cagr'
)
aapl_div_schedule.div_history

Unnamed: 0_level_0,amount
ex_dividend_date,Unnamed: 1_level_1
1962-01-31,0.029762
1962-05-07,0.029762
1962-08-07,0.029762
1962-11-07,0.029762
1963-02-06,0.031250
...,...
2023-05-18,1.510000
2023-08-17,1.510000
2023-11-16,1.510000
2024-02-15,1.630000


In [32]:
def div_test(years=1, method='cagr'):
    _product = list(product(tickers, list(zip(start_dates, end_dates, valuation_dates))))

    ## Seperate Tickers and Dates
    tickers_list = [x[0] for x in _product]
    start_dates_list =[x[1][0] for x in _product]
    end_dates_list = [x[1][1] for x in _product]
    val_dates_list = [x[1][2] for x in _product]

    ## Use Vectorized to get Dividend Schedules
    schedules = get_vectorized_dividend_scehdule(
    tickers=tickers_list,
    start_dates=start_dates_list,
    end_dates=end_dates_list,
    valuation_dates=val_dates_list,
    lookback_yrs=years,
    method=method
    )
    forecasted_divs = np.asarray([sum([x[0] for x in inner_sch]) for inner_sch in schedules])

    ## Get actual Divs
    actual_divs = []
    for ticker, pack in _product:
        start_date, end_date, val_date = pack
        div_history = get_div_schedule(ticker, filter_specials=True)
        actual_divs.append(div_history[(div_history.index.date >= val_date.date()) &
                                        (div_history.index.date <= end_date.date())]['amount'].sum())
    actual_divs = np.asarray(actual_divs)

    mae = np.mean(np.abs(forecasted_divs - actual_divs))
    mse = np.mean((forecasted_divs - actual_divs) ** 2)
    return mae, mse, forecasted_divs, actual_divs, _product

In [33]:
res = div_test(1, method='avg')


In [34]:
tickers_list = [x[0] for x in res[-1]]
val_date = [x[1][2] for x in res[-1]]
start_dates_list = [x[1][0] for x in res[-1]]
end_dates_list = [x[1][1] for x in res[-1]]
data=pd.DataFrame({'tickers': tickers_list,
                     'forecasted_divs': res[2],
                     'actual_divs': res[3],
                     'valuation_date': val_date,
                     'start_date': start_dates_list,
                     'end_date': end_dates_list})
parse = data[data.forecasted_divs.isna()][[
    'tickers',  'start_date', 'end_date', 'valuation_date',
]].T.values

new_info = get_vectorized_dividend_scehdule(*parse, 
                                            lookback_yrs=1, 
                                            method='avg')
new_info

[]

#### CAGR Test

In [35]:
cagr_mae_mse = pd.DataFrame(index = range(1, 11))
for year in cagr_mae_mse.index:
    mae, mse, forecast, actual, prod = div_test(year)
    cagr_mae_mse.at[year, 'mae'] = mae
    cagr_mae_mse.at[year, 'mse'] = mse

In [36]:
cagr_mae_mse

Unnamed: 0,mae,mse
1,0.240033,0.264383
2,0.22371,0.199274
3,0.21934,0.191144
4,0.217838,0.189975
5,0.219191,0.194396
6,0.22393,0.203692
7,0.227584,0.210536
8,0.230634,0.217901
9,0.238458,0.233203
10,0.241452,0.248738


#### Regression Test

In [37]:
reg_mae_mse = pd.DataFrame(index = range(1, 11))
for year in reg_mae_mse.index:
    mae, mse, forecast, actual, prod = div_test(year, method='regression')
    reg_mae_mse.at[year, 'mae'] = mae
    reg_mae_mse.at[year, 'mse'] = mse

In [38]:
reg_mae_mse

Unnamed: 0,mae,mse
1,0.221925,0.190977
2,0.212759,0.181556
3,0.213386,0.18304
4,0.213827,0.183573
5,0.214181,0.18626
6,0.216194,0.191255
7,0.219135,0.197613
8,0.222468,0.203856
9,0.225827,0.210577
10,0.228104,0.21771


#### Avg Test

In [39]:
avg_mae_mse = pd.DataFrame(index = range(1, 11))
for year in avg_mae_mse.index:
    mae, mse, forecast, actual, prod = div_test(year, method='avg')
    avg_mae_mse.at[year, 'mae'] = mae
    avg_mae_mse.at[year, 'mse'] = mse

In [40]:
avg_mae_mse

Unnamed: 0,mae,mse
1,0.208434,0.168747
2,0.211427,0.175966
3,0.213045,0.179357
4,0.214778,0.185454
5,0.22047,0.195815
6,0.224687,0.203138
7,0.229254,0.212254
8,0.234993,0.220774
9,0.242785,0.242282
10,0.246944,0.257319


#### Combined Comparison

In [41]:
mse_combined = pd.concat([cagr_mae_mse['mse'], reg_mae_mse['mse'], avg_mae_mse['mse']], axis=1)
mse_combined.columns = ['CAGR', 'Regression', 'Average']
mse_combined.index.name = '# Lookback Years'
mse_combined.plot()

In [42]:
mae_combined = pd.concat([cagr_mae_mse['mae'], reg_mae_mse['mae'], avg_mae_mse['mae']], axis=1)
mae_combined.columns = ['CAGR', 'Regression', 'Average']
mae_combined.index.name = '# Lookback Years'
mae_combined.plot()

## FORWARDS

### Base Model 

In [43]:
dividend_factory = {
    "discrete": DividendSchedule,
    "continuous": ContinuousDividendYield
}

class ForwardModel(ABC):
    @abstractmethod
    def get_forward_price(self) -> float:
        """
        Calculate the forward price.
        """
        pass

    @abstractmethod
    def summary(self) -> dict:
        """
        Return a summary of the forward model.
        """
        pass

    @abstractmethod
    def get_end_date(self) -> datetime:
        """
        Get the end date of the forward contract.
        """
        pass

    @abstractmethod
    def get_start_date(self) -> datetime:
        """
        Get the start date of the forward contract.
        """
        pass


## Should regular Forward initialize a Market Dividend?: No, seperated to AssetMarketForward
## Technically, regular Forward is a Model. It differs from market forward because it is the actual Forward/Future Price: No reply needed
## Decided to separate the two classes to avoid confusion and maintain clarity in the API.
## Should Dividends see Stock & Stock see dividends?: Stock can't see dividends, but it can have a dividend schedule.
    ## Dividends can see stock.

class Forward(ForwardModel):
    def __init__(self,
                 start_date: datetime|str,
                 end_date: datetime|str,
                 risk_free_rate: float,
                 dividend_type: str,
                 dividend: Union[Dividend, None] = None,
                 valuation_date: datetime = None,
                 spot_price: float=None,
                 freq: str = "quarterly",
                 div_amount: Union[float, List[float]] = 1.0,
                 **kwargs):
        """
        Initialize a Forward object.
        start_date: datetime or str - The start date of the forward contract.
        end_date: datetime or str - The end date of the forward contract.
        spot_price: float - The current spot price of the underlying asset.
        risk_free_rate: float - The risk-free interest rate (annualized).
        dividend_type: str - The type of dividend ('discrete' or 'continuous').
        dividend: Dividend or None - An instance of a Dividend subclass or None.
        valuation_date: datetime - The date for valuation purposes (default is start_date). 
        freq: str - The frequency of dividends ('monthly', 'quarterly', 'semiannual', 'annual').
        div_amount: float or list - The dividend amount (can be a scalar or a list). For discrete dividends, this is the amount per period, while for continuous dividends, it is the yield rate.
        """

    
        self.spot_price = spot_price
        self.risk_free_rate = risk_free_rate
        self.dividend_type = dividend_type
        self.valuation_date = pd.to_datetime(valuation_date) if valuation_date else pd.to_datetime(start_date)  
        self.start_date = start_date if isinstance(start_date, datetime) else pd.to_datetime(start_date)
        self.end_date = end_date if isinstance(end_date, datetime) else pd.to_datetime(end_date)        
        self._initalize_dividend(dividend_type=dividend_type, 
                                 dividend=dividend, 
                                 freq=freq, 
                                 div_amount=div_amount, **kwargs)
        


    def _initalize_dividend(self, 
                            dividend_type: str, 
                            dividend:Union[None, Dividend], 
                            freq:str, 
                            div_amount:float, 
                            ticker = None,
                            **kwargs):
        """
        Initialize the dividend based on the type.
        """
        ## Ensure dividend is permitted
        if dividend is None:
            if dividend_type not in dividend_factory:
                raise ValueError(f"Unsupported dividend type '{dividend_type}'. Use one of {list(dividend_factory.keys())}.")
        
        ## Create the dividend object based on the type
        if isinstance(dividend, Dividend):
            self.dividend = dividend

        elif dividend_type == 'continuous':

            logger.info("No ticker provided for a continuous dividend yield. Will construct ContinuousDividendYield instead.")
            self.dividend = ContinuousDividendYield(
                yield_rate=div_amount,
                start_date=self.start_date,
                end_date=self.end_date,
                valuation_date=self.valuation_date
            )
        elif dividend_type == 'discrete':
            logger.info("No ticker provided for a discrete dividend schedule. Will construct DividendSchedule instead.")
            self.dividend = DividendSchedule(
                start_date=self.start_date,
                end_date=self.end_date,
                freq=freq,
                amount=div_amount,
                valuation_date=self.valuation_date
            )

        elif not isinstance(dividend, Dividend):
            raise TypeError("Dividend must be an instance of Dividend or its subclasses.")
        
    def summary(self) -> dict:
        return {
        "spot": self.spot_price,
        "forward": self.get_forward_price(),
        "type": self.dividend.get_type(),
        "valuation": self.valuation_date.date(),
        "expiry": self.end_date.date()
        }

    def get_forward_price(self) -> float:
        """
        Calculate the forward price using the formula:
        F = S * e^{(r - q)T}
        where:
        S = spot price
        r = risk-free rate
        q = dividend yield
        T = time to maturity in years
        """
        T = time_distance_helper(self.end_date, self.valuation_date)
        if T <= 0:
            raise ValueError("End date must be after valuation date.")
        
        # Get the present value of the dividend
        if self.dividend.get_type() == "discrete":
            dividend_pv = self.dividend.get_present_value(self.risk_free_rate, sum_up = True)
            return (self.spot_price - dividend_pv) * math.exp(self.risk_free_rate * T)
        elif self.dividend.get_type() == "continuous":
            dividend_factor = self.dividend.get_present_value(self.end_date, **{})
            return self.spot_price * (math.exp(self.risk_free_rate * T) * dividend_factor) ## Already discounted
        else:
            raise ValueError(f"Unsupported dividend type '{self.dividend.get_type()}'. Use 'discrete' or 'continuous'.")
        
    def get_end_date(self) -> datetime:
        return self.end_date
    def get_start_date(self) -> datetime:
        return self.start_date

    def __repr__(self):
        return f"<Forward: ({repr(self.summary())}>"


### Market Model

In [44]:
## Would it be better to have Forward access Stock? And enforce singleton?
## For spot bumps, we can have a setter, where additions are moved to bump var, and anytime spot is called, it returns the spot + bump.
## Probably would be a good idea to cache spot timeseries in Stock.


class MarketForward(ForwardModel):
    def __init__(self, forward_price: float, start_date: datetime, end_date: datetime):
        """
        Initialize a MarketForward object.
        forward_price: float - The market forward price.
        start_date: datetime - The start date of the forward contract.
        end_date: datetime - The end date of the forward contract.
        """
        self.forward_price = forward_price
        self.start_date = start_date
        self.end_date = end_date

    def get_forward_price(self) -> float:
        return self.forward_price

    def summary(self) -> dict:
        return {
            "forward": self.forward_price,
            "start_date": self.start_date.date(),
            "end_date": self.end_date.date()
        }
    
    def get_end_date(self) -> datetime:
        return self.end_date
    def get_start_date(self) -> datetime:
        return self.start_date
    def __repr__(self):
        return f"<MarketForward: {self.forward_price} from {self.start_date.date()} to {self.end_date.date()}>"
    


### Asset Model

In [45]:

class EquityForward(Forward):
    def __init__(self,
                start_date: datetime|str,
                end_date: datetime|str,
                dividend_type: str,
                dividend: Union[Dividend, None] = None,
                valuation_date: datetime = None,
                risk_free_rate: float = None,
                spot_price: float=None,
                ticker = None,
                **kwargs):
        
        
        self.dividend_type = dividend_type
        self.valuation_date = pd.to_datetime(valuation_date) if valuation_date else pd.to_datetime(start_date)  
        self.start_date = start_date if isinstance(start_date, datetime) else pd.to_datetime(start_date)
        self.end_date = end_date if isinstance(end_date, datetime) else pd.to_datetime(end_date)  
        self.ticker = ticker
        self.forward_spot_price = spot_price
        self._initalize_dividend(dividend_type=dividend_type,
                                    dividend=dividend, 
                                    ticker=ticker, **kwargs)
        self.risk_free_rate = risk_free_rate or self.dividend.asset.rf_rate

    @property
    def spot_price(self):
        logger.info(f"Accessing spot_price of {self.ticker} from {self.dividend.__class__.__name__}")
        if isinstance(self.dividend, (MarketContinuousDividends,
                                        MarketDividendSchedule)):
            return self.dividend.spot_price
        
    @spot_price.setter
    def spot_price(self, v):
        logger.info(f"Setting spot_price of {self.ticker} to {v} in {self.dividend.__class__.__name__}")
        if isinstance(self.dividend, (MarketContinuousDividends,
                                        MarketDividendSchedule)):
            self.dividend.spot_price = v
            self.forward_spot_price = v
        else:
            raise TypeError("Spot price can only be set for MarketContinuousDividends or MarketDividendSchedule.")
            


    def _initalize_dividend(self, 
                            dividend_type: str, 
                            dividend:Union[None, Dividend], 
                            ticker = None,
                            **kwargs):
        """
        Initialize the dividend based on the type.
        """
        ## Ensure dividend is permitted
        if dividend is None:
            if dividend_type not in dividend_factory:
                raise ValueError(f"Unsupported dividend type '{dividend_type}'. Use one of {list(dividend_factory.keys())}.")
        
        ## Create the dividend object based on the type
        if isinstance(dividend, Dividend):
            self.dividend = dividend

        elif dividend_type == 'continuous':
            logger.info("Ticker provided for a continuous dividend yield. Will construct MarketContinuousDividends instead.")
            self.dividend = MarketContinuousDividends(
                            ticker=ticker,
                            start_date=self.start_date,
                            end_date=self.end_date,
                            valuation_date=self.valuation_date,
                            spot_price=self.forward_spot_price
                        )
        elif dividend_type == 'discrete':
            logger.info("Ticker provided for a discrete dividend schedule. Will construct MarketDividendSchedule instead.")
            self.dividend = MarketDividendSchedule(
                ticker=ticker,
                start_date=self.start_date,
                end_date=self.end_date,
                valuation_date=self.valuation_date,
                lookback_years=kwargs.get('lookback_years', DIVIDEND_LOOKBACK_YEARS),
                growth_method=kwargs.get('growth_method', DIVIDEND_FORECAST_METHOD),
                spot_price=self.forward_spot_price
            )

        elif not isinstance(dividend, Dividend):
            raise TypeError("Dividend must be an instance of Dividend or its subclasses.")


### Vectorized Forwards

In [46]:
# ----------------------
# Vectorized Forward Models
# ----------------------

def vectorized_forward_continuous(S, r, q_factor, T):
    """
    S: spot prices (array)
    r: risk-free rates (array)
    q: Discounted Dividend Factor
    T: time to maturity (array)
    """
    assert_equal_length(S, r, q_factor, T)
    S, r, T, q_factor = convert_to_array(S, r, T, q_factor)
    return S * np.exp(r * T) * q_factor

def vectorized_forward_discrete(S, r, T, pv_divs):
    """
    S: spot prices (array)
    r: risk-free rates (array)
    T: time to maturity (array)
    pv_divs: Summation of present value of all dividends till end date
    """
    assert_equal_length(S, r, pv_divs, T)
    S, r, T, pv_divs = convert_to_array(S, r, T, pv_divs)
    forward = (S - pv_divs) * np.exp(r * T)
    return forward

In [47]:
tickers = ['TSLA', 'CVX', 'KO']
r = [0.045, 0.055, 0.005]
spots=[100,200,300]
start_dates = [datetime(2025, 1, 5), datetime(2025, 1, 5), datetime(2025, 1, 5)]
end_dates = [datetime(2025, 8, 31), datetime(2025, 8, 31), datetime(2025, 8, 31)]
valuation_dates = [datetime(2025, 1, 5), datetime(2025, 1, 5), datetime(2025, 1, 5)]
vector_f_discrete = vectorized_forward_discrete(
    S=spots,
    r=r,
    T=[time_distance_helper(end_dates[i], valuation_dates[i]) for i in range(len(end_dates))],
    pv_divs=discrete_pv
)

vector_f_continuous = vectorized_forward_continuous(
    S=spots,
    r=r,
    q_factor=cont_q,
    T=[time_distance_helper(end_dates[i], valuation_dates[i]) for i in range(len(end_dates))]
)

vector_f_continuous, vector_f_discrete

(array([102.97565159, 202.94061447, 299.71342922]),
 array([102.97565159, 203.86358679, 299.98726676]))

### TESTS

#### Plain Model (No Tickers)

In [48]:
forward = Forward(
    start_date="2023-01-01",
    end_date="2024-01-01",
    spot_price=100,
    risk_free_rate=0.05,
    dividend_type="discrete",
    freq="quarterly",
    div_amount=[0, 0, 0, 0]
)
print(forward.get_forward_price())  # Example usage
print(forward.dividend.get_present_value(forward.risk_free_rate, sum_up=True))
print(forward.dividend.get_present_value(forward.risk_free_rate, sum_up=False))
forward

105.12351191991695
0.0
[0.0, 0.0, 0.0, 0.0]


<Forward: ({'spot': 100, 'forward': 105.12351191991695, 'type': 'discrete', 'valuation': datetime.date(2023, 1, 1), 'expiry': datetime.date(2024, 1, 1)}>

In [49]:
forward_continous = Forward(
    start_date="2023-01-01",
    end_date="2024-01-01",
    spot_price=100,
    risk_free_rate=0.05,
    dividend_type="continuous",
    div_amount=0.0  # 2% continuous dividend yield
)
print(forward_continous.get_forward_price())  # Example usage

105.12351191991695


#### Model (Tickers)

- This section carries out market data injections. Instead of relying on user inputs for

In [50]:
forward = EquityForward(
    start_date="2023-01-03",
    end_date="2024-01-03",
    risk_free_rate=None,
    # risk_free_rate=0.05,
    dividend_type="discrete",
    ticker = 'AAPL'
)
print(forward.get_forward_price())  # Example usage
print(forward.dividend.get_present_value(forward.risk_free_rate, sum_up=True))
print(forward.dividend.get_present_value(forward.risk_free_rate, sum_up=False))
forward

129.54945167385944
0.9198354331887906
[0.22914318517856802, 0.22972632646061583, 0.23023038067973967, 0.23073554086986708]


<Forward: ({'spot': 125.06999969482422, 'forward': 129.54945167385944, 'type': 'discrete', 'valuation': datetime.date(2023, 1, 3), 'expiry': datetime.date(2024, 1, 3)}>

In [51]:
forward.dividend.asset.rf_rate
forward.risk_free_rate
forward.dividend.amounts

[0.23, 0.2329913294797688, 0.23602156353369644, 0.23909120814612894]

In [52]:
forward.dividend.spot_price

125.06999969482422

In [53]:
forward_continous = EquityForward(
    start_date="2023-01-03",
    end_date="2024-01-03",
    risk_free_rate=0.05,
    dividend_type="continuous",
    div_amount=0.0,  # 2% continuous dividend yield
    ticker = 'AAPL'
)
print(forward_continous.get_forward_price())  # Example usage

130.75511472843496


In [54]:
forward_continous.dividend.spot_price 
forward_continous.dividend.yield_rate  

0.005516910543564624

## BLACK SCHOLES MODEL

### BASE

In [55]:


class BlackScholes:
    __GREEK_CALCULATION_STYLE = 'analytic'  # Default to analytic Greeks, else numerical
    def __init__(self,
                 strike_price: float,
                 expiration: datetime,
                 risk_free_rate: float,
                 volatility: float,
                 start_date: datetime,
                 spot_price: float,
                 dividend_type: str = 'discrete',
                 valuation_date: datetime = None,
                 freq: str = "quarterly",
                 div_amount: Union[float, List[float]] = 1.0,
                 option_type: str = "c"):
        """
        Black-Scholes model using forward price directly.

        Parameters:
        - forward: F (e.g., from a ForwardModel)
        - strike_price: K
        - expiration: T (datetime)
        - risk_free_rate: r
        - volatility: sigma (annualized)
        - option_type: "call" or "put"
        """ 
        self.T = time_distance_helper(expiration, valuation_date)
        risk_free_rate = float(risk_free_rate) if risk_free_rate else 0  # Ensure risk-free rate is a float
        option_inputs_assert(sigma=volatility,
                             K=strike_price,
                             S0=spot_price,
                             T=self.T,
                             r=risk_free_rate,
                             market_price=0.1,
                             q=0.0,
                             flag=option_type.lower(),)
        self.forward= Forward(
            start_date=start_date,
            end_date=expiration,
            spot_price=spot_price,
            risk_free_rate=risk_free_rate,
            dividend_type=dividend_type,
            valuation_date=valuation_date,
            freq=freq,
            div_amount=div_amount
        )
        self.expiration = expiration
        self.F = self.forward.get_forward_price()
        self.K = strike_price
        
        self.r = risk_free_rate
        self.sigma = volatility
        self.option_type = option_type.lower()
        self.finite_estimator = FiniteGreeksEstimator(
            price_func=self.price,
            base_params={
                'F': self.F,
                'K': self.K,
                'T': self.T,
                'r': self.r,
                'sigma': self.sigma,
                'q': 0.0, # Assuming no continuous dividend yield for simplicity
                'S': spot_price,  # Including spot price for delta calculation
                'option_type': self.option_type
            },
            dx_thresh=0.00001,
            method='central'  # Use backward method for finite differences
        )

        # Ensure option_type is either 'call' or 'put'
        if self.option_type not in ["c", "p"]:
            raise ValueError("option_type must be 'c' or 'p'")
        
    @property
    def valuation_date(self):
        return self.forward.valuation_date

    @property
    def spot_price(self):
        return self.forward.spot_price

    
    @classmethod
    def set_greek_calculation_style(cls, style: str):
        """
        Set the style for Greek calculations.
        :param style: 'analytic' or 'numerical'
        """
        if style not in ['analytic', 'numerical']:
            raise ValueError("Style must be either 'analytic' or 'numerical'")
        cls.__GREEK_CALCULATION_STYLE = style

    @classmethod
    def get_greek_calculation_style(cls) -> str:
        """
        Get the current Greek calculation style.
        :return: 'analytic' or 'numerical'
        """
        return cls.__GREEK_CALCULATION_STYLE


    def _d1(self):
        # d1 = [ln(F/K) + 0.5*sigma^2*T] / (sigma * sqrt(T))
        numerator = math.log(self.F / self.K) + 0.5 * self.sigma ** 2 * self.T
        denominator = self.sigma * math.sqrt(self.T)
        return numerator / denominator

    def _d2(self):
        # d2 = d1 - sigma * sqrt(T)
        return self._d1() - self.sigma * math.sqrt(self.T)

    def price(self, F=None, K=None, T=None, r=None, sigma=None, option_type=None, S=None,*args, **kwargs) -> float:
        # Compute option price using forward-based Black-Scholes formula
        # Call price: e^(-rT) * (F * N(d1) - K * N(d2))
        # Put price: e^(-rT) * (K * N(-d2) - F * N(-d1))

        ## Handle Forward Price, it has to be repriced
        temp_S = self.forward.spot_price 
        temp_rf = self.forward.risk_free_rate


        ## New inputs into the forward model
        self.forward.spot_price = S if S is not None else temp_S  
        self.forward.risk_free_rate = r if r is not None else temp_rf
        if T is not None:
            # Set valuation date back so that (end_date - valuation_date) = T
            temp_val_date = self.forward.valuation_date
            new_val_date = self.expiration - timedelta(days=T * DAILY_BASIS)
            self.forward.valuation_date = new_val_date
            self.forward.dividend.valuation_date = new_val_date


        else:
            temp_val_date = self.forward.valuation_date


        ## Recalculate the forward price
        F = self.forward.get_forward_price()

        ## Reset the forward inputs
        self.forward.spot_price = temp_S  # Reset to original spot price
        self.forward.risk_free_rate = temp_rf
        self.forward.valuation_date = temp_val_date
        self.forward.dividend.valuation_date = temp_val_date


        # Ensure all parameters are numpy arrays for vectorized operations
        K = np.asarray(K) if K is not None else self.K
        T = np.asarray(T) if T is not None else self.T
        r = np.asarray(r) if r is not None else self.r
        sigma = np.asarray(sigma) if sigma is not None else self.sigma
        option_type = option_type if option_type is not None else self.option_type

        return black_scholes_vectorized(F, K, T, r, sigma, option_type=option_type)
    def summary(self) -> dict:
        # Return a dictionary summarizing the model inputs and price
        return {
            "forward": self.F,
            "strike": self.K,
            "T": self.T,
            "r": self.r,
            "vol": self.sigma,
            "type": self.option_type,
            "price": self.price()
        }
    
    def greeks(self) -> dict:
        """
        Calculate the Greeks using finite differences.
        Returns a dictionary with keys 'delta', 'gamma', 'vega', 'theta', 'rho'.
        """
        if self.__GREEK_CALCULATION_STYLE == 'analytic':
            greek = black_scholes_analytic_greeks_vectorized(
                F=self.F, 
                K=self.K, 
                T=self.T, 
                r=self.r, 
                sigma=self.sigma, 
                option_type=self.option_type
            )
            
        elif self.__GREEK_CALCULATION_STYLE == 'numerical':
            greek = self.finite_estimator.all_first_order()
            greek.update(self.finite_estimator.all_second_order())
            
        else:
            raise ValueError(f"Unknown Greek calculation style '{self.GREEK_CALCULATION_STYLE}'. Use 'analytic' or 'numerical'.")
        greek = dict(sorted(greek.items(), key=lambda item: item[0]))
        return greek
    
    def __repr__(self):
        return f"<BlackScholes: {self.option_type.capitalize()} option forward={self.F:.2f}, strike={self.K:.2f}, T={self.T:.2f}, r={self.r:.4f}, vol={self.sigma:.4f}>"


### Market

In [56]:
class MarketBlackScholes(BlackScholes):
    def __init__(self,
                 ticker: str,
                 strike_price: float,
                 expiration: datetime,
                 risk_free_rate: float,
                 volatility: float,
                 start_date: datetime,
                 dividend_type: str = 'discrete',
                 valuation_date: datetime = None,
                 option_type: str = "c"):
        """
        Market Black-Scholes model using forward price directly.
        """
        super().__init__(strike_price=strike_price,
                         expiration=expiration,
                         risk_free_rate=risk_free_rate,
                         volatility=volatility,
                         start_date=start_date,
                         spot_price=1, ## Spot price will be set later from the forward
                         dividend_type=dividend_type,
                         valuation_date=valuation_date,
                         freq='quarterly', ## Default to allow initialization
                         div_amount=0,
                         option_type=option_type)
        
        ## Override super's initialization of forward with EquityForward
        self.forward= EquityForward(
            start_date=start_date,
            end_date=expiration,
            dividend_type=dividend_type,
            dividend=None,  # Market dividend will be set later
            valuation_date=valuation_date,
            risk_free_rate=risk_free_rate,
            spot_price=None, 
            ticker=ticker
        )

        self.expiration = expiration
        self.F = self.forward.get_forward_price()
        self.r = self.forward.risk_free_rate
        self.finite_estimator = FiniteGreeksEstimator(
            price_func=self.price,
            base_params={
                'F': None,
                'K': self.K,
                'T': self.T,
                'r': self.r,
                'sigma': self.sigma,
                'q': 0.0, # Assuming no continuous dividend yield for simplicity
                'S': self.spot_price,  # Including spot price for delta calculation
                'option_type': self.option_type
            },
            dx_thresh=0.00001,
            method='central'  # Use backward method for finite differences
        )


    @property
    def spot_price(self):
        """
        Override spot_price to use the forward's spot price.
        """
        return self.forward.spot_price

    def price(self, F=None, K=None, T=None, r=None, sigma=None, option_type=None, S=None,*args, **kwargs) -> float:
        """
        Override price method to use the forward price from the EquityForward.
        """
        ## S, if user overrides Spot, need to update forward spot price
        if S is not None:
            self.forward.spot_price = S
        else:
            S = self.forward.spot_price


        ## If K is not provided, use the BSM Model's strike price
        if K is None:
            K = self.K

        ## If T is not provided, use the BSM Model's T
        if T is None:
            T = self.T

        else:
            # Set valuation date back so that (end_date - valuation_date) = T
            temp_val_date = self.forward.valuation_date
            new_val_date = self.expiration - timedelta(days=T * DAILY_BASIS)
            self.forward.valuation_date = new_val_date
            self.forward.dividend.valuation_date = new_val_date

        ## If r is not provided, use the BSM Model's risk-free rate
        if r is None:
            r = self.r
            r_old = self.forward.risk_free_rate
        else:
            # Update the forward's risk-free rate if provided
            r_old = self.forward.risk_free_rate
            self.forward.risk_free_rate = r

        ## If sigma is not provided, use the BSM Model's volatility
        if sigma is None:
            sigma = self.sigma

        ## If option_type is not provided, use the BSM Model's option type
        if option_type is None:
            option_type = self.option_type

        ## If F is not provided, use the forward's price
        if F is None:
            F = self.forward.get_forward_price()

        ## Clear all bumps to forward to avoid unintended effects
        self.forward.dividend.asset.clear_bump()
        self.forward.risk_free_rate = r_old  # Reset to original risk-free rate

        price = black_scholes_vectorized(F, K, T, r, sigma, option_type=option_type)

        

        
        return price
    

### Vectorized

#### BSM

In [57]:
def black_scholes_vectorized_base(F: np.ndarray|List[float], 
                                  K: np.ndarray|List[float],
                                  T: np.ndarray|List[float], 
                                  r: np.ndarray|List[float], 
                                  sigma: np.ndarray|List[float], 
                                  option_type: str|List[str]="c",
                                  **kwargs) -> np.ndarray:
    """
    Vectorized Black-Scholes formula for option pricing.
    F: Forward prices (array)
    K: Strike prices (array)
    T: Time to maturity (array)
    r: Risk-free rates (array)
    sigma: Volatilities (array)
    option_type: "c" for call, "p" for put (array or single string)

    Returns: Option prices (array)
    """
    # Ensure all inputs are numpy arrays for vectorized operations
    F = np.asarray(F)
    K = np.asarray(K)
    T = np.asarray(T)
    r = np.asarray(r)
    sigma = np.asarray(sigma)

    d1 = (np.log(F / K) + 0.5 * sigma**2 * T) / (sigma * np.sqrt(T))
    d2 = d1 - sigma * np.sqrt(T)
    df = np.exp(-r * T)
    # Handle both scalar and vector string inputs for option_type
    option_type = np.asarray(option_type)

    # Enfores option_type
    if not np.all(np.isin(option_type, ["c", "p"])):
        raise ValueError("option_type must be 'c' or 'p'")
    
    call_mask = option_type == "c"
    put_mask = option_type == "p"
    price = np.zeros_like(F)
    # Calculate call prices
    price[call_mask] = df[call_mask] * (F[call_mask] * norm.cdf(d1[call_mask]) - K[call_mask] * norm.cdf(d2[call_mask]))
    # Calculate put prices
    price[put_mask] = df[put_mask] * (K[put_mask] * norm.cdf(-d2[put_mask]) - F[put_mask] * norm.cdf(-d1[put_mask]))
    return price


def vectorized_market_forward_calc(ticks: List[str], 
                                   S: List[float], 
                                   valuation_dates: List[datetime], 
                                   end_dates: List[datetime], 
                                   r: List[float], 
                                   div_type='discrete',
                                   return_div=False) -> np.ndarray:
    """
    Vectorized calculation of forward prices for multiple tickers.
    ticks: List of ticker symbols
    S: List of spot prices (current prices of the underlying assets)
    valuation_dates: List of valuation dates (dates for which the option is priced)
    end_dates: List of end dates (expiration dates of the options)
    r: List of risk-free rates (annualized)
    div_type: Type of dividend ('discrete' or 'continuous')
    Returns: Forward prices (array)
    """
        
    # Get Dividends & Calculate the forward price
    if div_type == 'discrete':

        schedule = get_vectorized_dividend_scehdule(
            tickers=ticks,
            start_dates=valuation_dates,
            end_dates=end_dates,
            valuation_dates=valuation_dates,
        )
        div_amt=vectorized_discrete_pv(schedules=schedule,
                                  r=r,
                                  _valuation_dates=valuation_dates,
                                  _end_dates=end_dates,)
        F = vectorized_forward_discrete(
            S=S,
            r=r,
            T=[time_distance_helper(end_dates[i], valuation_dates[i]) for i in range(len(end_dates))],
            pv_divs=div_amt
        )

    elif div_type == 'continuous':
        div_rate = get_vectorized_dividend_rate(
            tickers=ticks,
            spots=S,
            valuation_dates=valuation_dates,
        )

        div_amt = get_vectorized_continuous_dividends(div_rates=div_rate,
                                                _valuation_dates=valuation_dates,
                                                _end_dates=end_dates,)
        F = vectorized_forward_continuous(
            S=S,
            r=r,
            q_factor=div_amt,
            T=[time_distance_helper(end_dates[i], valuation_dates[i]) for i in range(len(end_dates))]
        )
    else:
        raise ValueError(f"Unsupported dividend type '{div_type}'. Use 'discrete' or 'continuous'.")
    
    return (F, div_amt) if return_div else F
    

def black_scholes_vectorized_market(ticks: List[str],
                                    S: List[float], 
                                    K: List[float], 
                                    valuation_dates: List[datetime],
                                    end_dates: List[datetime],
                                    r: List[float], 
                                    sigma: List[float], 
                                    option_type: str|List[str] = "c",
                                    div_type='discrete'):
    """
    Vectorized Black-Scholes model for market data.
    ticks: List of ticker symbols
    S: List of spot prices (current prices of the underlying assets)
    K: List of strike prices
    valuation_dates: List of valuation dates (dates for which the option is priced)
    end_dates: List of end dates (expiration dates of the options)
    r: List of risk-free rates (annualized)
    sigma: List of volatilities (annualized)
    option_type: "c" for call, "p" for put (single string or list of strings)
    div_type: Type of dividend ('discrete' or 'continuous')
    
    Returns: Option prices (array)
    """
    F = vectorized_market_forward_calc(
        ticks=ticks,
        S=S,
        valuation_dates=valuation_dates,
        end_dates=end_dates,
        r=r,
        div_type=div_type
    )

    # Ensure option_type is a list
    if isinstance(option_type, str):
        option_type = [option_type] * len(ticks)
    elif len(option_type) != len(ticks):
        raise ValueError("option_type must be a single string or a list of strings with the same length as ticks.")
    
    # Convert valuation_dates and end_dates to Timedelta
    T = [time_distance_helper(end_dates[i], valuation_dates[i]) for i in range(len(end_dates))]

    return black_scholes_vectorized_base(
        F=F, 
        K=K, 
        T=T, 
        r=r, 
        sigma=sigma, 
        option_type=option_type
    )


### TESTING

#### Base Model

##### Discrete Dividends Pricing

In [58]:
bs_model = BlackScholes(
    strike_price=210,
    expiration=datetime(2025, 12, 19),
    risk_free_rate=0.045,
    volatility=0.2725,
    start_date=datetime(2023, 1, 1),
    spot_price=210.85,
    dividend_type='discrete',
    valuation_date=datetime(2025, 7, 9),
    freq='quarterly',
    div_amount=0.26,
    option_type='c'
)
print(bs_model.summary())
print(bs_model.forward.summary())
bs_model.forward.dividend
bs_model

{'forward': 214.8645784470177, 'strike': 210, 'T': 0.4462696783025325, 'r': 0.045, 'vol': 0.2725, 'type': 'c', 'price': 17.60284866827155}
{'spot': 210.85, 'forward': 214.8645784470177, 'type': 'discrete', 'valuation': datetime.date(2025, 7, 9), 'expiry': datetime.date(2025, 12, 19)}


<BlackScholes: C option forward=214.86, strike=210.00, T=0.45, r=0.0450, vol=0.2725>

In [59]:
bs_model.set_greek_calculation_style('analytic')
(bs_model.greeks())

{'delta': array(0.58582531),
 'gamma': 0.009962580537501141,
 'rho': 0.47200904637776536,
 'theta': -0.05978444198260355,
 'vega': 0.5593264832634446,
 'volga': 0.00015478263709686236}

In [60]:
bs_model.finite_estimator.method='central'
bs_model.set_greek_calculation_style('numerical')
bs_model.greeks()

{'delta': 0.5858253086494187,
 'gamma': 0.010164673913553489,
 'rho': 0.47235573244424556,
 'theta': -0.058873690712538014,
 'vega': 0.5482060477821096,
 'volga': 0.00015137831603029753}

In [61]:
bs_model_put = BlackScholes(
    strike_price=210,
    expiration=datetime(2025, 12, 19),
    risk_free_rate=0.045,
    volatility=0.2757,
    start_date=datetime(2023, 1, 1),
    spot_price=210.10,
    dividend_type='discrete',
    valuation_date=datetime(2025, 7, 9),
    freq='quarterly',
    div_amount=0.26,
    option_type='p'
)
print(bs_model_put.summary())
print(bs_model_put.forward.summary())
bs_model_put.forward.dividend
bs_model_put

{'forward': 214.09936459333818, 'strike': 210, 'T': 0.4462696783025325, 'r': 0.045, 'vol': 0.2757, 'type': 'p', 'price': 13.323998912169074}
{'spot': 210.1, 'forward': 214.09936459333818, 'type': 'discrete', 'valuation': datetime.date(2025, 7, 9), 'expiry': datetime.date(2025, 12, 19)}


<BlackScholes: P option forward=214.10, strike=210.00, T=0.45, r=0.0450, vol=0.2757>

In [62]:
bs_model_put.set_greek_calculation_style('analytic')
(bs_model_put.greeks())


{'delta': array(-0.42189165),
 'gamma': 0.009922637529837549,
 'rho': -0.4545474105116634,
 'theta': -0.05987609679442674,
 'vega': 0.5596184570322142,
 'volga': 5.1516364463452093e-05}

In [63]:
bs_model_put.finite_estimator.method='central'
bs_model_put.set_greek_calculation_style('numerical')
(bs_model_put.greeks())


{'delta': -0.4218916505427727,
 'gamma': 0.01012392946402011,
 'rho': -0.45479708198965024,
 'theta': -0.03382428316210273,
 'vega': 0.5484922166138241,
 'volga': 5.094636161262145e-05}

##### Continuous Dividend Pricing

In [64]:
bs_model = BlackScholes(
    strike_price=210,
    expiration=datetime(2025, 12, 19),
    risk_free_rate=0.045,
    volatility=0.2725,
    start_date=datetime(2023, 1, 1),
    spot_price=210.85,
    dividend_type='continuous',
    valuation_date=datetime(2025, 7, 9),
    freq='quarterly',
    div_amount=1.2331041024424947e-05 * 4,
    option_type='c'
)
print(bs_model.summary())
print(bs_model.forward.summary())
bs_model.forward.dividend
bs_model

{'forward': 215.1223860977551, 'strike': 210, 'T': 0.4462696783025325, 'r': 0.045, 'vol': 0.2725, 'type': 'c', 'price': 17.75120036942414}
{'spot': 210.85, 'forward': 215.1223860977551, 'type': 'continuous', 'valuation': datetime.date(2025, 7, 9), 'expiry': datetime.date(2025, 12, 19)}


<BlackScholes: C option forward=215.12, strike=210.00, T=0.45, r=0.0450, vol=0.2725>

In [65]:
bs_model.set_greek_calculation_style('analytic')
(bs_model.greeks())

{'delta': array(0.58839035),
 'gamma': 0.0099362237603873,
 'rho': 0.4744211445871562,
 'theta': -0.059839309369576855,
 'vega': 0.5591862221356935,
 'volga': 0.00018964403099594948}

In [66]:
bs_model.finite_estimator.method='central'
bs_model.set_greek_calculation_style('numerical')
bs_model.greeks()

{'delta': 0.5883773962922071,
 'gamma': 0.010137328763478634,
 'rho': 0.4744211442660647,
 'theta': -0.058893465673126144,
 'vega': 0.5480685753177087,
 'volga': 0.00018568244137597491}

In [67]:
bs_model_put = BlackScholes(
    strike_price=210,
    expiration=datetime(2025, 12, 19),
    risk_free_rate=0.045,
    volatility=0.2757,
    start_date=datetime(2023, 1, 1),
    spot_price=210.10,
    dividend_type='continuous',
    valuation_date=datetime(2025, 7, 9),
    freq='quarterly',
    div_amount=1.2331041024424947e-05 * 4,
    option_type='p'
)
print(bs_model_put.summary())
print(bs_model_put.forward.summary())
bs_model_put.forward.dividend
bs_model_put

{'forward': 214.35718908768482, 'strike': 210, 'T': 0.4462696783025325, 'r': 0.045, 'vol': 0.2757, 'type': 'p', 'price': 13.217710506842806}
{'spot': 210.1, 'forward': 214.35718908768482, 'type': 'continuous', 'valuation': datetime.date(2025, 7, 9), 'expiry': datetime.date(2025, 12, 19)}


<BlackScholes: P option forward=214.36, strike=210.00, T=0.45, r=0.0450, vol=0.2757>

In [68]:
bs_model_put.set_greek_calculation_style('analytic')
(bs_model_put.greeks())


{'delta': array(-0.41933655),
 'gamma': 0.009897738015178278,
 'rho': -0.45215321413234394,
 'theta': -0.059805006074437554,
 'vega': 0.5595594143299075,
 'volga': 8.02200697622426e-05}

In [69]:
bs_model_put.finite_estimator.method='central'
bs_model_put.set_greek_calculation_style('numerical')
(bs_model_put.greeks())


{'delta': -0.41932732401218403,
 'gamma': 0.01009806883384709,
 'rho': -0.4521532142081557,
 'theta': -0.0339107594961634,
 'vega': 0.5484343477403055,
 'volga': 7.861631213984337e-05}

#### Market Model

##### Continuous Dividends

In [70]:
mbs=MarketBlackScholes(
    ticker='AAPL',
    strike_price=210,
    expiration=datetime(2025, 12, 19),
    risk_free_rate=None, # Use 0 to let the model use the dividend's risk-free rate
    volatility=0.2678,
    start_date=datetime(2023, 1, 3),
    dividend_type='discrete',
    valuation_date=datetime(2025, 7, 14),
    option_type='c'
)
print(mbs.spot_price)
print(f"Forward Price: {mbs.forward.get_forward_price()}")
print(f"BSM Price: {mbs.price()}")

208.6199951171875
Forward Price: 211.94729151184956
BSM Price: 15.512748505148558


In [71]:
print(mbs.get_greek_calculation_style())
mbs.set_greek_calculation_style('numerical')
mbs.greeks()

numerical


{'delta': 0.5558560087961321,
 'gamma': 0.010777150478159912,
 'rho': 0.43387253917622887,
 'theta': -0.057459418105310966,
 'vega': 0.5406747178126954,
 'volga': -0.00010137965565221817}

In [72]:
mbs.set_greek_calculation_style('analytic')
mbs.greeks()

{'delta': array(0.55585601),
 'gamma': 0.010581650875179385,
 'rho': 0.4332824356194001,
 'theta': -0.058272410282780074,
 'vega': 0.5506638993040862,
 'volga': -0.00010301154988852094}

In [73]:
mbs=MarketBlackScholes(
    ticker='AAPL',
    strike_price=210,
    expiration=datetime(2025, 12, 19),
    risk_free_rate=None, # Use 0 to let the model use the dividend's risk-free rate
    volatility=0.2728,
    start_date=datetime(2023, 1, 3),
    dividend_type='discrete',
    valuation_date=datetime(2025, 7, 14),
    option_type='p'
)
print(mbs.spot_price)
print(f"Forward Price: {mbs.forward.get_forward_price()}")
print(f"BSM Price: {mbs.price()}")
print(mbs.get_greek_calculation_style())
print(mbs.forward.__class__.__name__)
print(mbs.forward.dividend.__class__.__name__)
print(mbs.forward.dividend.asset.__class__.__name__)

208.6199951171875
Forward Price: 211.94729151184956
BSM Price: 13.871105818430156
analytic
EquityForward
MarketDividendSchedule
Stock


In [74]:

mbs.set_greek_calculation_style('numerical')
print(mbs.get_greek_calculation_style())
mbs.greeks()

numerical


{'delta': -0.4438738884063381,
 'gamma': 0.010578602492142673,
 'rho': -0.46005495212701575,
 'theta': -0.034335234296367645,
 'vega': 0.5406226604282662,
 'volga': -0.00010707812300522983}

In [75]:
mbs.set_greek_calculation_style('analytic')
print(mbs.get_greek_calculation_style())
mbs.greeks()

analytic


{'delta': array(-0.44387389),
 'gamma': 0.010386705510666578,
 'rho': -0.45958372978984224,
 'theta': -0.05984361197862239,
 'vega': 0.5506108802057498,
 'volga': -0.00010902696444173094}

In [76]:
mbs.forward.risk_free_rate

0.0423199987411499

#### Vectorized Pricing

In [77]:
print("Vectorized Black-Scholes Market Model Example (Discrete Dividend):",
      black_scholes_vectorized_market(
    ticks=['AAPL', 'MSFT', 'GOOGL'],
    S=[150, 250, 2800],
    K=[150, 250, 2800],
    valuation_dates=[datetime(2023, 1, 1), datetime(2023, 1, 1), datetime(2023, 1, 1)],
    end_dates=[datetime(2024, 1, 1), datetime(2024, 1, 1), datetime(2024, 1, 1)],
    r=[0.05, 0.04, 0.03],
    sigma=[0.2, 0.25, 0.3],
    option_type=['c', 'p', 'c'],
    div_type='continuous'
))

Vectorized Black-Scholes Market Model Example (Discrete Dividend): [ 15.09713366  20.78417622 371.79467198]


In [78]:
print("Vectorized Black-Scholes Market Model Example (Continuous Dividend):",
      black_scholes_vectorized_market(
    ticks=['AAPL', 'MSFT', 'GOOGL'],
    S=[150, 250, 2800],
    K=[150, 250, 2800],
    valuation_dates=[datetime(2023, 1, 1), datetime(2023, 1, 1), datetime(2023, 1, 1)],
    end_dates=[datetime(2024, 1, 1), datetime(2024, 1, 1), datetime(2024, 1, 1)],
    r=[0.05, 0.04, 0.03],
    sigma=[0.2, 0.25, 0.3],
    option_type=['c', 'p', 'c'],  # Mixed option types
    div_type='discrete'
))

Vectorized Black-Scholes Market Model Example (Continuous Dividend): [ 15.08353295  20.91310523 371.79467198]


## GREEKS

##### Numerical

In [79]:
## For numerical Greeks, we can use the patched BSM model to calculate Greeks
## This is because scenario bumps are applied to T & S which also affect the forward price.
def _ptched_bsm_for_numerical(
    K: float,
    T: List[float],
    r: float,
    sigma: float,
    S: float,
    dividend_type: str = 'discrete',
    div_amount: Union[float, List[float]] = 1.0,
    option_type: str = "c",
    **kwargs
):
    
    """
    Patched Black-Scholes model for numerical Greeks.
    This model allows for scenario bumps on T & S which affect the forward price.
    """
    if dividend_type == 'continuous' :
        F = vectorized_forward_continuous(
            S=S,
            r=r,
            q_factor=div_amount,  # Assuming div_amount is the continuous yield rate
            T=T
        )
    elif dividend_type == 'discrete':
        F = vectorized_forward_discrete(
            S=S,
            r=r,
            T=T,
            pv_divs=div_amount  # Assuming div_amount is the present value of discrete dividends
        )
    else:
        raise ValueError(f"Unsupported dividend type '{dividend_type}'. Use 'discrete' or 'continuous'.")
    return black_scholes_vectorized_base(
        F=F, 
        K=K, 
        T=T,
        r=r, 
        sigma=sigma, 
        option_type=option_type
    )

In [80]:

def vectorized_market_greeks_numerical(
        ticks: List[str],
        S: List[float],
        K: List[float],
        valuation_dates: List[datetime],
        end_dates: List[datetime],
        r: List[float],
        sigma: List[float],
        option_type: str|List[str] = "c",
        div_type='discrete',
        div_amount=None
) -> List[dict]:
    """
    Vectorized calculation of Greeks for market options using analytical method.
    """

    ## For analytical greeks, bumps are applied and recalculation is needed.
    ## Therefore, we need to ensure that either div_amount or ticks are provided.
    
    if div_amount is None and ticks is None:
        raise ValueError("div_amount must be provided if ticks are not provided.")

    F, div_amount = vectorized_market_forward_calc(
        ticks=ticks,
        S=S,
        valuation_dates=valuation_dates,
        end_dates=end_dates,
        r=r,
        div_type=div_type,
        return_div=True
    )
    
    T = [time_distance_helper(end_dates[i], valuation_dates[i]) for i in range(len(end_dates))]
    finite_estimator = FiniteGreeksEstimator(
        price_func=_ptched_bsm_for_numerical,
        base_params={
            'F': np.asarray(F),
            'K': np.asarray(K),
            'T': np.asarray(T),
            'r': np.asarray(r),
            'sigma': np.asarray(sigma),
            'q': 0.0,  # Assuming no continuous dividend yield for simplicity
            'S': np.asarray(S),  # Including spot price for delta calculation
            'option_type': option_type,
            'div_type': div_type,
            'div_amount': div_amount  # Placeholder, will be ignored in the patched function
        },
        dx_thresh=0.00001,
        method='central'  # Use backward method for finite differences
    )
    greeks = finite_estimator.all_first_order()
    greeks.update(finite_estimator.all_second_order())
    greeks = dict(sorted(greeks.items(), key=lambda item: item[0]))
    return greeks


##### Analytical

In [81]:
## To seperate the F calculation from Market Greeks Funciton
def _ptched_bsm_for_analytical(
    ticks: List[str],
    S: List[float],
    K: List[float],
    valuation_dates: List[datetime],
    end_dates: List[datetime],
    r: List[float],
    sigma: List[float],
    option_type: str|List[str] = "c",
    div_type='discrete',
    **kwargs
):
    
    F = vectorized_market_forward_calc(
        ticks=ticks,
        S=S,
        valuation_dates=valuation_dates,
        end_dates=end_dates,
        r=r,
        div_type=div_type
    )
    T = [time_distance_helper(end_dates[i], valuation_dates[i]) for i in range(len(end_dates))]
    greeks = black_scholes_analytic_greeks_vectorized(
        F=F, 
        K=K, 
        T=T, 
        r=r, 
        sigma=sigma, 
        option_type=option_type
    )
    greeks = dict(sorted(greeks.items(), key=lambda item: item[0]))
    return greeks

##### Combined

In [82]:
def vectorized_market_greeks(
        ticks: List[str],
        S: List[float],
        K: List[float],
        valuation_dates: List[datetime],
        end_dates: List[datetime],
        r: List[float],
        sigma: List[float],
        option_type: str|List[str] = "c",
        div_type='discrete',
        greek_style: str = 'analytic'
) -> List[dict]:
    """
    Vectorized calculation of Greeks for market options.
    ticks: List of ticker symbols
    S: List of spot prices (current prices of the underlying assets)
    K: List of strike prices
    valuation_dates: List of valuation dates (dates for which the option is priced)
    end_dates: List of end dates (expiration dates of the options)
    r: List of risk-free rates (annualized)
    sigma: List of volatilities (annualized)
    option_type: "c" for call, "p" for put (single string or list of strings)
    div_type: Type of dividend ('discrete' or 'continuous')
    greek_style: 'analytic' or 'numerical' for Greek calculation style
    
    Returns: Dictionary of Greeks for each option
    """
    # Ensure option_type is a list
    if isinstance(option_type, str):
        option_type = [option_type] * len(ticks)
    elif len(option_type) != len(ticks):
        raise ValueError("option_type must be a single string or a list of strings with the same length as ticks.")
    
    # Convert valuation_dates and end_dates to Timedelta
    T = [time_distance_helper(end_dates[i], valuation_dates[i]) for i in range(len(end_dates))]

    # Calculate the Greeks using the specified style
    if greek_style == 'analytic':
        greeks = _ptched_bsm_for_analytical(
            ticks=ticks,
            S=S,
            K=K,
            valuation_dates=valuation_dates,
            end_dates=end_dates,
            r=r,
            sigma=sigma,
            option_type=option_type,
            div_type=div_type
        )
    elif greek_style == 'numerical':
        greeks=vectorized_market_greeks_numerical(
            ticks=ticks,
            S=S,
            K=K,
            valuation_dates=valuation_dates,
            end_dates=end_dates,
            r=r,
            sigma=sigma,
            option_type=option_type,
            div_type=div_type
        )
    else:
        raise ValueError(f"Unknown Greek calculation style '{greek_style}'. Use 'analytic' or 'numerical'.")
    greeks = dict(sorted(greeks.items(), key=lambda item: item[0]))
    return greeks



##### TESTS

In [83]:
vectorized_market_greeks(
    ticks=['AAPL', 'MSFT', 'GOOGL'],
    S=[211.12, 250, 2800],
    K=[215, 250, 2800],
    valuation_dates=[datetime(2025, 7, 18), datetime(2023, 1, 1), datetime(2023, 1, 1)],
    end_dates=[datetime(2025, 12, 19), datetime(2024, 1, 1), datetime(2024, 1, 1)],
    r=[0.05, 0.04, 0.03],
    sigma=[0.2611, 0.25, 0.3],
    option_type=['c', 'p', 'c'], 
    div_type='discrete',
    greek_style='numerical'
)

{'delta': array([ 0.53760409, -0.40550645,  0.59867324]),
 'gamma': array([0.01110992, 0.00627671, 0.00046049]),
 'rho': array([ 0.41721394, -1.21053483, 13.03597517]),
 'theta': array([-0.05964481, -0.01953839, -0.55193583]),
 'vega': array([ 0.54379121,  0.95789124, 10.8232329 ]),
 'volga': array([-1.47766544e-04, -9.82822712e-05, -4.50641993e-03])}

In [84]:
vectorized_market_greeks(
    ticks=['AAPL', 'MSFT', 'GOOGL'],
    S=[211.12, 250, 2800],
    K=[215, 250, 2800],
    valuation_dates=[datetime(2025, 7, 18), datetime(2023, 1, 1), datetime(2023, 1, 1)],
    end_dates=[datetime(2025, 12, 19), datetime(2024, 1, 1), datetime(2024, 1, 1)],
    r=[0.05, 0.04, 0.03],
    sigma=[0.2, 0.25, 0.3],
    option_type=['c', 'p', 'c'], 
    div_type='discrete',
    greek_style='analytic'
)

{'delta': array([ 0.53088881, -0.40550645,  0.59867324]),
 'gamma': array([0.01422207, 0.00603076, 0.00044689]),
 'rho': array([ 0.42526334, -1.21053483, 13.03597517]),
 'theta': array([-0.04992309, -0.04740847, -0.56547187]),
 'vega': array([ 0.5561839 ,  0.99695623, 11.1526204 ]),
 'volga': array([-0.00011286, -0.00010294, -0.00464374])}

## VOL ESTIMATION

- Market Angle will be able to pass single, or vectorized.


### Base

In [85]:
np.set_printoptions(suppress=True)

In [86]:
from scipy.optimize import minimize

def vol_est_minimization(
        F: float,
        K: float,
        T: float,
        r: float,
        market_price: float,
        option_type: str = 'c',
):
    """
    Objective function for volatility estimation using minimization.
    This function calculates the difference between the market price and the Black-Scholes price.
    
    Parameters:
    - F: Forward price
    - K: Strike price
    - T: Time to maturity
    - r: Risk-free rate
    - market_price: Market price of the option
    - option_type: 'c' for call, 'p' for put
    
    Returns:
    - Difference between market price and Black-Scholes price
    """
    
    def objective_function(sigma):
        bs_price = black_scholes_vectorized(
            F=F, 
            K=K, 
            T=T, 
            r=r, 
            sigma=sigma, 
            option_type=option_type
        )
        return (bs_price - market_price) ** 2

    # Initial guess for volatility
    initial_guess = 0.2

    # Minimize the objective function to find the implied volatility
    result = minimize(objective_function, initial_guess, bounds=[(0.01, None)])
    
    if result.success:
        return result.x[0]  # Return the estimated volatility
    else:
        raise ValueError("Volatility estimation failed.")
    
def vol_est_brute_force(
        F: float,
        K: float,
        T: float,
        r: float,
        market_price: float,
        option_type: str = 'c',
):
    """

    Brute force method to estimate implied volatility by minimizing the difference
    between the market price and the Black-Scholes price.
    Parameters:
    - F: Forward price
    - K: Strike price
    - T: Time to maturity
    - r: Risk-free rate
    - market_price: Market price of the option
    - option_type: 'c' for call, 'p' for put
    Returns:
    - Estimated volatility
    """
    
    sigmas = np.linspace(0.001, 5, 40000)  # Range of volatilities to test

    prices = black_scholes_vectorized(
        F=F, 
        K=K, 
        T=T, 
        r=r, 
        sigma=sigmas, 
        option_type=option_type
    )

    # Calculate the absolute differences between market price and calculated prices
    differences = np.abs(prices - market_price)
    # Find the index of the minimum difference
    min_index = np.argmin(differences)

    # Return the corresponding volatility
    return sigmas[min_index]  # Return the estimated volatility and corresponding price

#### TESTS

In [87]:
mkt_forward = EquityForward(
    start_date=test_start,
    end_date=datetime(2025, 12, 19),
    ticker='AAPL',
    valuation_date=test_valuation_date,
    risk_free_rate=rates,
    dividend_type='discrete',
    dividend=None,  # Market dividend will be set later

)
mkt_forward.get_forward_price(), mkt_forward.risk_free_rate

(213.46615757988374, 0.0423199987411499)

In [88]:
vol_est_minimization(
    F=mkt_forward.get_forward_price(),  # Forward price
    K=220,     # Strike price
    T=time_distance_helper('2025-12-19', test_valuation_date),     # Time to maturity in years
    r=mkt_forward.risk_free_rate,   # Risk-free rate
    market_price=11.85,  # Market price of the option
    option_type='c'     # Option type: 'c' for call
)

0.2677398648854257

In [89]:
vol_est_brute_force(
    F=mkt_forward.get_forward_price(),  # Forward price
    K=220,     # Strike price
    T=time_distance_helper('2025-12-19', test_start),     # Time to maturity in years
    r=mkt_forward.risk_free_rate,   # Risk-free rate
    market_price=11.85,  # Market price of the option
    option_type='c'     # Option type: 'c' for call
)

0.2677033175829395

### Vectorized

In [90]:
import numpy as np
from scipy.optimize import minimize
from typing import Callable, Optional
from functools import partial

## Brute force is faster than minimization for volatility estimation
## Standardize this before using it in production
class ImpliedVolEstimator:
    def __init__(self, 
                 loss_func: Callable,
                 fallback_bounds: tuple = None,
                 method: str = "L-BFGS-B",
                 default_vol: float = 0.2,
                 dx: float = 1e-4,
                 fall_back_grid_search: Optional[Callable] = None):
        """
        Wraps implied volatility estimation with robust handling.

        Parameters:
        - loss_func: Callable taking (sigma, *args) and returning squared error.
        - fallback_bounds: Tuple of lower and upper bounds for fallback grid search.
        - method: Optimization method to use.
        - default_vol: Default volatility returned if all else fails.
        - dx: Finite grid step size for fallback.
        """
        self.loss_func = loss_func
        self.bounds = fallback_bounds if fallback_bounds is not None else (VOL_EST_LOWER_BOUND, VOL_EST_UPPER_BOUND)
        self.method = method
        self.default_vol = default_vol
        self.dx = dx
        self.fallback_grid_search = fall_back_grid_search

    def _fallback_grid_search(self, *args, **kwargs) -> float:
        """
        Fallback grid search method to estimate implied volatility.
        Uses a brute force method if no custom fallback is provided.
        """
        if self.fallback_grid_search:
            logger.info("Using custom fallback grid search function.")
            return self.fallback_grid_search(*args, **kwargs)
        else:
            logger.warning("No custom fallback grid search function provided. Using brute force method.")
            loss_func = partial(self.loss_func, *args, **kwargs)
            grid = np.linspace(self.bounds[0], self.bounds[1], int((self.bounds[1] - self.bounds[0]) / self.dx))
            losses = np.array([loss_func(sigma, *args, **kwargs) for sigma in grid])
            if np.isfinite(losses).all():
                return grid[np.argmin(losses)]
            return self.default_vol

    def estimate_single(self, x0: float, *args, **kwargs) -> float:
        """
        Estimate implied volatility for a single input using minimization.
        x0: Initial guess for the volatility.
        args: Additional positional arguments passed to loss_func.
        kwargs: Additional keyword arguments passed to loss_func.
        """
        try:
                res = minimize(
                    lambda sigma: self.loss_func(sigma, *args, **kwargs),
                    x0=np.array([x0]),
                    bounds=[self.bounds],
                    method=self.method
                )
                minimized_value = self.loss_func(res.x[0], *args, **kwargs)
                if res.success and np.isfinite(res.fun) and np.isfinite(res.x).all() and minimized_value < 1e-6:
                    return float(res.x[0])
                if minimized_value >= 1e-6:
                    logger.warning(f"Minimized value {minimized_value} is above threshold. Falling back to grid search.")
                return self._fallback_grid_search(*args, **kwargs)
        except Exception as e:
            return self._fallback_grid_search(*args, **kwargs)

    def estimate_batch(self, x0: float, param_list: list) -> np.ndarray:
        """
        Estimate implied vol for a batch of inputs.
        param_list: List of *args passed to loss_func (excluding sigma).
        """
        return np.array([self.estimate_single(x0, *params) for params in param_list])


In [91]:
def objective_function_bsm(sigma: float, F: float, K: float, T: float, r: float, market_price: float, option_type: str) -> float:
    """
    Objective function for volatility estimation.
    """
    intrinsic_value = max(F - K if option_type == 'c' else K - F, 0)

    ##TODO: Take this out of objective function to avoid repeated logging during minimization
    if intrinsic_value < market_price:
        logger.warning("Market price exceeds intrinsic value, returning NaN.")
        logger.info(f"Intrinsic Value: {intrinsic_value}, Market Price: {market_price}. Option Details: F={F}, K={K}, T={T}, r={r}, sigma={sigma}, option_type={option_type}")
    bs_price = black_scholes_vectorized(F=F, K=K, T=T, r=r, sigma=sigma, option_type=option_type)
    
    return (bs_price - market_price) ** 2

def objective_function_brute(sigma: float, F: float, K: float, T: float, r: float, market_price: float, option_type: str) -> float:
    """
    Objective function for brute force volatility estimation.
    """
    intrinsic_value = max(F - K if option_type == 'c' else K - F, 0)

    if intrinsic_value < market_price:
        logger.warning("Market price exceeds intrinsic value, returning NaN.")
        logger.info(f"Intrinsic Value: {intrinsic_value}, Market Price: {market_price}. Option Details: F={F}, K={K}, T={T}, r={r}, sigma={sigma}, option_type={option_type}")
    
    bs_price = black_scholes_vectorized(F=F, K=K, T=T, r=r, sigma=sigma, option_type=option_type)
    
    return (bs_price - market_price) ** 2   

In [92]:
# 2025-07-18	AAPL	2026-06-18	5.0	C	70	205.25	44	206.50	20250718	205.875	205.732456	1.000000
# 2025-07-18	AAPL	2026-06-18	250.0	C	323	9.15	294	9.30	20250718	9.225	9.221475	0.211616
# 2025-07-18	AAPL	2026-06-18	255.0	C	6	7.95	386	8.05	20250718	8.000	8.048469	0.211616
# end, K, mid = '2026-06-18', 255.0, 8
end, K, mid  = '2026-06-18', 250.0, 0.1
mkt_forward = EquityForward(
    start_date=test_start,
    end_date=end,
    ticker='AAPL',
    valuation_date=test_valuation_date,
    risk_free_rate=0.04235,
    dividend_type='discrete',
    dividend=None,  # Market dividend will be set later

)
mkt_forward.get_forward_price(), mkt_forward.risk_free_rate
iv_estimator = ImpliedVolEstimator(
    loss_func=vol_est_brute_force,
    method="L-BFGS-B",
    default_vol=0.2,
    fallback_bounds = (0.01,1.0),
    dx=0.0001,
    fall_back_grid_search=vol_est_brute_force
)
iv_estimator.estimate_single(
    x0=0.2,
    F=mkt_forward.get_forward_price(),  # Forward price
    K=K,     # Strike price
    T=time_distance_helper(end, test_valuation_date),     # Time to maturity in years
    r=mkt_forward.risk_free_rate,   # Risk-free rate
    market_price=mid,  # Market price of the option
    option_type='c'     # Option type: 'c' for call
)


0.069488012200305

#### Test Vol Surface Fit

In [93]:
aapl_chain=retrieve_chain_bulk(
    'AAPL',
    0,
    change_to_last_busday(test_valuation_date),
    change_to_last_busday(test_valuation_date),
    '16:00'

)
S = get_spot('AAPL', (test_valuation_date))

In [94]:
aapl_chain = aapl_chain[aapl_chain['Expiration'] >= test_valuation_date]
valuation_dates = [test_valuation_date] * len(aapl_chain)
end_dates = aapl_chain['Expiration'].tolist()
r = [rates] * len(aapl_chain)
s = [S] * len(aapl_chain)
tickers = ['AAPL'] * len(aapl_chain)
F = vectorized_market_forward_calc(
    ticks=tickers,
    S=s,
    valuation_dates=valuation_dates,
    end_dates=end_dates,
    r=r,
    div_type='discrete'
)
F

array([211.06290119, 211.23415549, 211.06290119, ..., 210.72080899,
       217.71608065, 217.71608065])

In [95]:
vol_batch = iv_estimator.estimate_batch(
    x0=0.2,
    param_list=[
        (F[i], 
         aapl_chain['Strike'][i], 
         time_distance_helper(end_dates[i], valuation_dates[i]), 
         r[i],
         aapl_chain['Midpoint'][i],
         aapl_chain['Right'][i].lower())
        for i in range(len(aapl_chain))
    ]
)
vol_batch

array([0.29094925, 0.27470209, 0.27845144, ..., 0.29832296, 0.28082602,
       0.27470209])

In [96]:
len(vol_batch), len(aapl_chain)

(2428, 2428)

In [97]:
aapl_chain['iv'] = vol_batch
aapl_chain[(aapl_chain['Expiration'] == '2026-12-18') & (aapl_chain['Right'] == 'C')].sort_values('Strike').tail(60).plot(x = 'Strike', y='iv', kind='line', title='AAPL Call IV on 2026-06-18')

In [98]:
aapl_chain.sort_values('Strike')

Unnamed: 0_level_0,Root,Expiration,Strike,Right,Bid_size,CloseBid,Ask_size,CloseAsk,Date,Midpoint,Weighted_midpoint,iv
datetime,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1
2025-07-16,AAPL,2025-09-19,5.0,C,152,204.75,180,205.75,20250716,205.250,205.292169,3.398780
2025-07-16,AAPL,2025-09-19,5.0,P,0,0.00,500,0.01,20250716,0.005,0.010000,2.734522
2025-07-16,AAPL,2025-12-19,5.0,C,1,203.80,100,206.40,20250716,205.100,206.374257,2.409328
2025-07-16,AAPL,2025-12-19,5.0,P,0,0.00,1,0.02,20250716,0.010,0.020000,1.872672
2025-07-16,AAPL,2026-01-16,5.0,C,100,203.35,100,206.40,20250716,204.875,204.875000,2.267853
...,...,...,...,...,...,...,...,...,...,...,...,...
2025-07-16,AAPL,2026-12-18,450.0,C,8,0.44,18,0.48,20250716,0.460,0.467692,0.269203
2025-07-16,AAPL,2027-06-17,450.0,C,1,0.90,1,1.04,20250716,0.970,0.970000,0.253956
2025-07-16,AAPL,2027-06-17,450.0,P,23,237.90,23,241.85,20250716,239.875,239.875000,0.662884
2025-07-16,AAPL,2027-01-15,450.0,C,20,0.50,1,0.53,20250716,0.515,0.501429,0.265829


## BINOMIAL TREE

In [99]:

class BinomialBase(ABC):
    def __init__(self, 
                 K: float,
                 expiration: datetime|str,
                 sigma: float,
                 r: float,
                 N: int = 100,
                 spot_price: float = None,
                 dividend_type: str = 'discrete',
                 div_amount: float = 0.0,
                 option_type: str = 'c',
                 start_date: datetime|str = None,
                 valuation_date: datetime|str = None,
                 american: bool = False):
        super().__init__()
        """
        Base class for Binomial Tree models.
        K: Strike price
        expiration: Expiration date of the option
        sigma: Volatility of the underlying asset
        r: Risk-free interest rate
        N: Number of time steps in the binomial tree
        spot_price: Current price of the underlying asset (optional)
        dividend_type: Type of dividend ('discrete' or 'continuous')
        div_amount: Amount of dividend (if applicable)
        option_type: 'c' for call, 'p' for put
        start_date: Start date for the option pricing (optional)
        valuation_date: Date for which the option is priced (optional)
        """
        self._initialized = False
        self.K = K
        self.expiration = expiration
        self.sigma = sigma
        self.r = r
        self.N = N
        self.S0 = spot_price
        self.dividend_type = dividend_type
        self.div_yield = div_amount if dividend_type == 'continuous' else 0.0
        self.discrete_dividends = div_amount if dividend_type == 'discrete' else []
        self.option_type = option_type
        self.start_date = start_date
        self.valuation_date = valuation_date
        self.T = time_distance_helper(self.expiration, self.valuation_date or datetime.now())
        self.american = american
        self.dt = self.T / self.N
        self.priced = False
        self.tree = []
        self.option_values = []
        self.stock_tree = []
        self.init_parameters()
        self.build_tree()
        self._initialized = True

    @abstractmethod
    def build_tree(self):
        pass

    @abstractmethod
    def init_parameters(self):
        pass

    @abstractmethod
    def _apply_discrete_dividends(self):
        pass

    @abstractmethod
    def delta(self):
        pass

    @abstractmethod
    def gamma(self):
        pass

    def pricing_warning(self):
        """
        Warning message for pricing issues.
        This method can be overridden in subclasses to provide specific warnings.
        """
        if not self.priced:
            logger.warning("Option has not been priced yet. Please call the price() method first.")
            print("Option has not been priced yet. Please call the price() method first.")


    def reset_pricing_variables(self):
        """
        Reset pricing variables for a new calculation.
        """
        self.tree = []
        self.option_values = []
        self.stock_tree = []
        self.init_parameters()
        self.build_tree()

  
    def _tree_numerical(self, attr, dx_thresh=0.01):
        """
        Calculate the numerical value of a Greek (delta, gamma, etc.) using the binomial tree.
        This method is used for numerical approximation of Greeks.
        """
        self.pricing_warning()
        actual_value = getattr(self, attr)
        bump = actual_value * dx_thresh
        up_bump = actual_value + bump
        down_bump = actual_value - bump
        
        setattr(self, attr, up_bump)
        price_up = self.price()

        setattr(self, attr, down_bump)
        price_down = self.price()

        ## Reset
        setattr(self, attr, actual_value)

        return (price_up - price_down) / (2 * bump)
    
    def _tree_numerical_second_order(self, attr, dx_thresh=0.01):
        """
        Calculate the second-order numerical value of a Greek using the binomial tree.
        This method is used for numerical approximation of second-order Greeks.
        """
        self.pricing_warning()
        actual_value = getattr(self, attr)
        bump = actual_value * dx_thresh
        up_bump = actual_value + bump
        down_bump = actual_value - bump
        
        setattr(self, attr, up_bump)
        price_up = self.price()

        setattr(self, attr, actual_value)
        price_mid = self.price()

        setattr(self, attr, down_bump)
        price_down = self.price()

        ## Reset
        setattr(self, attr, actual_value)

        return (price_up - 2 * price_mid + price_down) / (bump ** 2)
    
    def theta(self, dx_thresh=0.0001):
        """
        Calculate the theta of the option using the binomial tree.
        Theta is the change in option price with respect to a change in time to expiration.
        Returns:
        Theta value as a float.
        """
        return -self._tree_numerical('T', dx_thresh)/DAILY_BASIS
    
    def vega(self, dx_thresh=0.0001):
        """
        Calculate the vega of the option using the binomial tree.
        Vega is the change in option price with respect to a change in volatility.
        Returns:
        Vega value as a float.
        """
        return self._tree_numerical('sigma', dx_thresh)/100
    
    def rho(self, dx_thresh=0.0001):
        """
        Calculate the rho of the option using the binomial tree.
        Rho is the change in option price with respect to a change in risk-free interest rate.
        Returns:
        Rho value as a float.
        """
        return self._tree_numerical('r', dx_thresh)/100


    def volga(self, dx_thresh=0.0001):
        """
        Calculate the volga of the option using the binomial tree.
        Volga is the change in vega with respect to a change in volatility.
        Returns:
        Volga value as a float.
        """
        return self._tree_numerical_second_order('sigma', dx_thresh)/100**2



    def __setattr__(self, name, value):
        protected = [
            'K', 'expiration', 'sigma', 'r', 'N', 'S0', 'dividend_type', 
            'div_yield', 'discrete_dividends', 'option_type', 
            'start_date', 'valuation_date', 'T', 'american'
        ]
        
        if not hasattr(self, '_initialized') or not self._initialized:
            # Allow setting attributes before initialization
            super().__setattr__(name, value)
            return
        
        if hasattr(self, '_initialized') and self._initialized:
            if name in protected:
                # raise AttributeError(f"'{name}' is read-only after initialization.")
                logger.warning(f"'{name}' is read-only after initialization. Resetting pricing variables.")
        super().__setattr__(name, value) ## Set
        if name in protected:
            # Reset pricing variables if a protected attribute is set
            logger.info(f"Resetting pricing variables due to change in '{name}'.")
            self.reset_pricing_variables()

    def __repr__(self):
        return f"{self.__class__.__name__}(K={self.K}, expiration={self.expiration}, dividend_type={self.dividend_type}"

    
    

In [100]:
def crr_init_parameters(
    sigma: float,
    r: float,
    T: float,
    N: int,
    div_yield: float = 0.0,
    dividend_type: str = 'discrete'
):
    """
    params:
    sigma: Volatility of the underlying asset
    r: Risk-free interest rate
    dt: Time step size
    div_yield: Dividend yield (if applicable)
    dividend_type: Type of dividend ('discrete' or 'continuous'
    """
    if N <= 0:
        raise ValueError("N must be a positive integer.")
    if T <= 0:
        raise ValueError("T must be a positive number.")
    if sigma < 0:
        raise ValueError("sigma must be a non-negative number.")
    dt = T / N
    if dividend_type == 'continuous':
        y = div_yield ## Continuous dividend yield adjustment
    else:
        y = 0.0
    u = np.exp(sigma * np.sqrt(dt))
    d = 1 / u
    p = (np.exp((r - y) * dt) - d) / (u - d)
    if p < 0 or p > 1:
        raise ValueError(f"Invalid probability p={p}. It must be between 0 and 1.")
    return u, d, p


def build_tree(
    S0: float, 
    u: float, 
    d: float, 
    N: int
):
    """
    params:
    S0: Initial stock price
    u: Up factor (multiplier for upward movement)
    d: Down factor (multiplier for downward movement)
    N: Number of time steps in the binomial tree
    Returns:
    A 2D list representing the binomial tree of stock prices.
    """
    if N < 0:
        raise ValueError("N must be a non-negative integer.")

    stock_tree = [
        [S0 * (u ** j) * (d ** (i - j)) for j in range(i + 1)]
        for i in range(N + 1)
    ]
    if len(stock_tree) != N + 1:
        raise ValueError(f"Expected {N + 1} rows in the stock tree, got {len(stock_tree)}.")
    return stock_tree


def apply_discrete_dividends(
    discrete_dividends: List[tuple],
    stock_tree: List[List[float]],
    N: int

)-> List[List[float]]: 
    """
    Apply discrete dividends to the stock tree.
    discrete_dividends: List of tuples (time_fraction, dividend_amount)
    stock_tree: The binomial tree of stock prices
    N: Number of time steps in the binomial tree
    Returns:
    A modified stock tree with dividends applied.
    """
    if not list(discrete_dividends):
        return stock_tree 
    
    for t_frac, div in discrete_dividends:
        div_step = min(int(round(t_frac * N)), N)
        for i in range(div_step, N + 1):
            stock_tree[i] = [max(s - div, 0) for s in stock_tree[i]]
    return stock_tree


def create_option_tree(
        stock_tree: List[List[float]],
        K: float,
        option_type: str,
        N: int
)-> List[List[float]]:
    """
    Create the option value tree based on the stock price tree.
    stock_tree: The binomial tree of stock prices
    K: Strike price of the option
    option_type: 'c' for call, 'p' for put
    N: Number of time steps in the binomial tree
    Returns:
    A 2D list representing the option value tree.
    """
    tree = deepcopy(stock_tree)
    terminal_prices = tree[-1]  # Get the terminal prices from the last row of the stock tree
    if option_type == 'c':
        option_values = [max(0, price - K) for price in terminal_prices]  # Call option payoff
    elif option_type == 'p':
        option_values = [max(0, K - price) for price in terminal_prices]
    
    tree[-1] = option_values  # Set the terminal option values in the last row of the tree
    return option_values

def calculate_option_values(
    stock_tree: List[List[float]],
    option_values: List[float],
    K: float,
    r: float,
    dt: float,
    N: int,
    p: float = 0.5,  # Probability of upward movement
    american: bool = False,
    option_type: str = 'c'
) -> List[List[float]]:
    """
    Calculate the option values at each node in the binomial tree.
    stock_tree: The binomial tree of stock prices
    option_values: The terminal option values
    r: Risk-free interest rate
    dt: Time step size
    N: Number of time steps in the binomial tree
    Returns:
    A 2D list representing the option value tree.
    """
    # Backward induction to calculate option values at each node
    for i in range(N - 1, -1, -1):
        option_values = [
            np.exp(-r * dt) * (p * option_values[j+1] + (1 - p) * option_values[j]) ## Ordered from down to up.
            ## Moves from all power in d, to all power in u by 1 step. Counting down on size i
            for j in range(i + 1) ## At each node, there is Node+1 size
        ]

        # If American option, check for early exercise
        if american:
            early_exercise = [
                max(val, (p - K) if option_type == 'c' else (K - p))
                for p, val in zip(stock_tree[i], option_values)
            ]
            option_values = early_exercise
        if i==1:
            V1 = option_values.copy()
        elif i==2:
            V2 = option_values.copy()
    return option_values[0], V1, V2

def crr_binomial_pricing(
    K: float,
    T: float,
    sigma: float,
    r: float,
    N: int = 100,
    spot_price: float = None,
    dividend_type: str = 'discrete',
    div_amount: float = 0.0,
    option_type: str = 'c',
    american: bool = False
) -> float:
    """
    Calculate the price of an option using the Cox-Ross-Rubinstein binomial model.
    
    Parameters:
    - K: Strike price
    - expiration: Expiration date of the option
    - sigma: Volatility of the underlying asset
    - r: Risk-free interest rate
    - N: Number of time steps in the binomial tree
    - spot_price: Current price of the underlying asset (optional)
    - dividend_type: Type of dividend ('discrete' or 'continuous')
    - div_amount: Amount of dividend (if applicable)
    - option_type: 'c' for call, 'p' for put
    - start_date: Start date for the option pricing (optional)
    - valuation_date: Date for which the option is priced (optional)
    
    Returns:
    The calculated price of the option.
    """
    if spot_price is None:
        raise ValueError("spot_price must be provided.")
    u, d, p = crr_init_parameters(sigma, r, T, N, div_yield=div_amount if dividend_type == 'continuous' else 0.0, dividend_type=dividend_type)
    stock_tree = build_tree(spot_price, u, d, N)
    if dividend_type == 'discrete':
        stock_tree = apply_discrete_dividends(div_amount, stock_tree, N)

    option_values = create_option_tree(stock_tree, K, option_type, N)
    option_price, _, _ = calculate_option_values(stock_tree, option_values, K, r, T / N, N, p, american, option_type)
    return option_price


In [113]:

class VectorBinomialBase(BinomialBase):

    @abstractmethod
    def init_parameters(self):
        """
        Initialize parameters for the binomial tree.
        This method should be called before building the tree.
        """
        pass

    def build_tree(self):
        """
        Build the binomial tree structure.
        This method should be implemented in subclasses.
        """
        self.stock_tree = build_tree(
            S0=self.S0,
            u=self.u,
            d=self.d,
            N=self.N
        )
        if self.dividend_type == 'discrete':
            self._apply_discrete_dividends()  # Apply discrete dividends at time step 0

    def _apply_discrete_dividends(self) -> float:
        """
        Apply discrete dividend adjustment to the stock price at a given time step.
        """
        if not list(self.discrete_dividends):
            return 
        self.stock_tree = apply_discrete_dividends(
            discrete_dividends=self.discrete_dividends,
            stock_tree=self.stock_tree,
            N=self.N
        )

    def __create_option_tree(self):
        self.option_values = create_option_tree(
            stock_tree=self.stock_tree,
            K=self.K,
            option_type=self.option_type,
            N=self.N
        )

    def price(self):
        self.__create_option_tree()  # Create the option tree based on terminal stock prices
        option_values = self.option_values
        price, self.V1, self.V2 = calculate_option_values(
            stock_tree=self.stock_tree,
            option_values=option_values,
            K=self.K,
            r=self.r,
            dt=self.dt,
            N=self.N,
            p=self.p,
            american=self.american,
            option_type=self.option_type
        )
        self.priced = True
        return price
    
    def delta(self):
        """
        Calculate the delta of the option using the binomial tree.
        Delta is the change in option price with respect to a change in the underlying asset price.
        Returns:
        Delta value as a float.
        """
        self.pricing_warning()
        if self.N < 1:
            raise ValueError("N must be at least 1 to calculate delta.")
        
        if not hasattr(self, 'V1'):
            self.price()
        
        stock_tree = self.stock_tree
        delta = (self.V1[1] - self.V1[0]) / (stock_tree[1][1] - stock_tree[1][0])
        return delta
    
    def gamma(self):
        """
        Calculate the gamma of the option using the binomial tree.
        Gamma is the rate of change of delta with respect to a change in the underlying asset price.
        Returns:
        Gamma value as a float.
        """
        self.pricing_warning()
        if self.N < 2:
            raise ValueError("N must be at least 2 to calculate gamma.")
        
        if not hasattr(self, 'V2'):
            self.price()
        
        V2, S2 = self.V2, self.stock_tree[2]
        delta_up = (V2[2] - V2[1]) / (S2[2] - S2[1])
        delta_down = (V2[1] - V2[0]) / (S2[1] - S2[0])
        gamma = (delta_up - delta_down) / ((S2[2] - S2[0]) / 2)  # Average change in delta over the interval
        return gamma
    



class VectorBinomialCRR(VectorBinomialBase):

    def init_parameters(self):
        """
        Initialize parameters for the binomial tree.
        This method should be called before building the tree.
        """
        q = self.div_yield if self.dividend_type == 'continuous' else 0.0
        self.u, self.d, self.p = crr_init_parameters(
            sigma=self.sigma,
            r=self.r,
            T=self.T,
            N=self.N,
            div_yield=q,
            dividend_type=self.dividend_type
        )

class VectorBinomialLR(VectorBinomialBase):  # or NodeBinomialBase
    def init_parameters(self):
        """
        Initialize Leisen-Reimer parameters: u, d, p.
        """
        q = self.div_yield if self.dividend_type == 'continuous' else 0.0
        self.dt = self.T / self.N
        v = self.sigma * np.sqrt(self.dt)

        self.u = np.exp(v)
        self.d = np.exp(-v)

        d1 = (
            np.log(self.S0 / self.K) +
            (self.r - q + 0.5 * self.sigma ** 2) * self.T
        ) / (self.sigma * np.sqrt(self.T))

        x = d1  # Can also use d2 for puts, but d1 gives better results overall

        # Peizer-Pratt inversion of CDF (used by Leisen-Reimer)
        w = np.sqrt(1 - np.exp(-2 * (x ** 2) / self.N))
        self.p = 0.5 + np.sign(x) * w / 2

In [114]:

class Node(Scalar):
    def __init__(self, stock_price, position,option_value=0.0):
        super().__init__(value=option_value)
        self.stock_price = stock_price
        self.value = option_value
        self.up = None
        self.down = None
        self.position = position  # Position in the binomial tree (e.g., index or identifier)

    @property
    def option_value(self):
        return self.value
    
    @option_value.setter
    def option_value(self, value):
        self.value = value

    def __eq__(self, value):
        if isinstance(value, Node):
            return self.stock_price == value.stock_price and self.position == value.position
        return False
    
    def __repr__(self):
        return f"Node(price={self.stock_price}, option_value={self.option_value}, pos={self.position})"




class NodeBinomialBase(BinomialBase):

    @abstractmethod
    def init_parameters(self):
        """
        Initialize parameters for the binomial tree.
        This method should be called before building the tree.
        """
        pass

    def build_tree(self):
        """
        Build the binomial tree structure.
        This method should be implemented in subclasses.
        """
        self.tree = []
        for i in range(self.N + 1):
            level = []
            for j in range(i + 1):
                S = self.S0 * (self.u ** j) * (self.d ** (i - j))
                node = Node(stock_price=S, position=(i, j))
                level.append(node)
            self.tree.append(level)

        for i in range(self.N):
            for j in range(i + 1):
                current = self.tree[i][j]
                current.down = self.tree[i+1][j]     # one down move
                current.up = self.tree[i+1][j+1]     # one up move


        if self.dividend_type == 'discrete':
            self._apply_discrete_dividends()  # Apply discrete dividends at time step 0

    def _apply_discrete_dividends(self) -> float:
        """
        Apply discrete dividend adjustment to the stock price at a given time step.
        """
        if not list(self.discrete_dividends):
            return 
        
        for t_frac, div in self.discrete_dividends:
            div_step = min(int(round(t_frac * self.N)), self.N)
            for i in range(div_step, self.N + 1):
                for node in self.tree[i]:
                    node.stock_price = max(node.stock_price - div, 0)


    def __create_option_tree(self):
        terminal_nodes = self.tree[-1]
        for node in terminal_nodes:
            node.option_value = (
                max(0, node.stock_price - self.K)
                if self.option_type == 'c'
                else max(0, self.K - node.stock_price)
            )

    def price(self):
        self.__create_option_tree()
        tree = self.tree

        for i in range(self.N - 1, -1, -1):
            for j, node in enumerate(tree[i]):
                up_val = node.up.option_value
                down_val = node.down.option_value
                expected = np.exp(-self.r * self.dt) * (
                    self.p * up_val + (1 - self.p) * down_val
                )

                if self.american:
                    intrinsic = (
                        max(node.stock_price - self.K, 0)
                        if self.option_type == 'c'
                        else max(self.K - node.stock_price, 0)
                    )
                    node.option_value = max(expected, intrinsic)
                else:
                    node.option_value = expected

        self.priced = True
        return tree[0][0].option_value
    
    
    def delta(self):
        """
        Calculate the delta of the option using the binomial tree.
        Delta is the change in option price with respect to a change in the underlying asset price.
        Returns:
        Delta value as a float.
        """
        self.pricing_warning()
        if self.N < 1:
            raise ValueError("N must be at least 1 to calculate delta.")
        
        stock_tree = self.tree
        V1 = self.tree[1]
        delta = (V1[1] - V1[0]) / (stock_tree[1][1].stock_price - stock_tree[1][0].stock_price)
        return delta
    
    def gamma(self):
        """
        Calculate the gamma of the option using the binomial tree.
        Gamma is the rate of change of delta with respect to a change in the underlying asset price.
        Returns:
        Gamma value as a float.
        """
        self.pricing_warning()
        if self.N < 2:
            raise ValueError("N must be at least 2 to calculate gamma.")
        
        if not hasattr(self, 'V2'):
            self.price()
        
        V2, S2 = self.tree[2], self.tree[2]
        delta_up = (V2[2] - V2[1]) / (S2[2].stock_price - S2[1].stock_price)
        delta_down = (V2[1] - V2[0]) / (S2[1].stock_price - S2[0].stock_price)
        gamma = (delta_up - delta_down) / ((S2[2].stock_price - S2[0].stock_price) / 2)  # Average change in delta over the interval
        return gamma
  
        
        


class NodeBinomialCRR(NodeBinomialBase):

    def init_parameters(self):
        """
        Initialize parameters for the binomial tree.
        This method should be called before building the tree.
        """
        if self.dividend_type == 'continuous':
            y = self.div_yield ## Continuous dividend yield adjustment
        else:
            y = 0.0
        self.u = np.exp(self.sigma * np.sqrt(self.dt))
        self.d = np.exp(-(self.sigma * np.sqrt(self.dt)))
        self.p = (np.exp((self.r - y) * self.dt) - self.d) / (self.u - self.d)

        

class NodeBinomialLR(NodeBinomialBase):  # or NodeBinomialBase
    def init_parameters(self):
        """
        Initialize Leisen-Reimer parameters: u, d, p.
        """
        q = self.div_yield if self.dividend_type == 'continuous' else 0.0
        self.dt = self.T / self.N
        v = self.sigma * np.sqrt(self.dt)

        self.u = np.exp(v)
        self.d = np.exp(-v)

        d1 = (
            np.log(self.S0 / self.K) +
            (self.r - q + 0.5 * self.sigma ** 2) * self.T
        ) / (self.sigma * np.sqrt(self.T))

        x = d1  # Can also use d2 for puts, but d1 gives better results overall

        # Peizer-Pratt inversion of CDF (used by Leisen-Reimer)
        w = np.sqrt(1 - np.exp(-2 * (x ** 2) / self.N))
        self.p = 0.5 + np.sign(x) * w / 2


In [115]:
class MarketBinomial(VectorBinomialCRR):
    def __init__(self, 
                 tick: str,
                 K: float,
                 expiration: datetime|str,
                 sigma: float,
                 N: int = 100,
                 dividend_type: str = 'discrete',
                 option_type: str = 'c',
                 start_date: datetime|str = None,
                 valuation_date: datetime|str = None,
                 r: float = None,
                 american: bool = False):
        # super().__init__()
        """
        Base class for Binomial Tree models.
        K: Strike price
        expiration: Expiration date of the option
        sigma: Volatility of the underlying asset
        r: Risk-free interest rate
        N: Number of time steps in the binomial tree
        spot_price: Current price of the underlying asset (optional)
        dividend_type: Type of dividend ('discrete' or 'continuous')
        div_amount: Amount of dividend (if applicable)
        option_type: 'c' for call, 'p' for put
        start_date: Start date for the option pricing (optional)
        valuation_date: Date for which the option is priced (optional)
        """
        self._initialized = False
        self.K = K
        self.expiration = expiration
        self.sigma = sigma
        self.N = N
        self.forward = EquityForward(
            start_date=start_date or datetime.now(),
            end_date=expiration,
            ticker=tick,
            valuation_date=valuation_date or datetime.now(),
            risk_free_rate=r,
            dividend_type=dividend_type,
            dividend=None  # Market dividend will be set later
        )
        self.r = r or self.forward.risk_free_rate
        self.dividend_type = dividend_type
        self.option_type = option_type
        self.start_date = start_date
        self.valuation_date = valuation_date
        self.T = time_distance_helper(self.expiration, self.valuation_date or datetime.now())
        self.american = american
        self.dt = self.T / self.N
        self.tree = []
        self.option_values = []
        self.stock_tree = []
        self.init_parameters()
        self.build_tree()
        self._initialized = True

    @property
    def asset(self):
        """
        Property to access the underlying asset of the forward contract.
        """
        return self.forward.dividend.asset

    @property
    def S0(self):
        """
        Property to access the current spot price of the underlying asset.
        """
        return self.asset.spot_price

    @property
    def discrete_dividends(self):
        """
        Property to access the discrete dividends of the forward contract.
        """
        if isinstance(self.forward.dividend, DividendSchedule):
            return self.forward.dividend.get_year_fractions()
        else:
            return []
        
    @property
    def div_yield(self):
        """
        Property to access the continuous dividend yield of the forward contract.
        """
        if isinstance(self.forward.dividend, ContinuousDividendYield):
            return self.forward.dividend.yield_rate
        else:
            return 0.0


    

In [116]:
from trade.assets.rates import reset_rates_cache
Stock.list_instances()

{('AAPL',
  '2025-01-06 16:00:00'): Stock(Ticker: AAPL, Build Date: 2025-01-06 16:00:00),
 ('COST',
  '2025-01-03 16:00:00'): Stock(Ticker: COST, Build Date: 2025-01-03 16:00:00),
 ('CVX',
  '2024-06-17 16:00:00'): Stock(Ticker: CVX, Build Date: 2024-06-17 16:00:00),
 ('AAPL',
  '2023-01-03 16:00:00'): Stock(Ticker: AAPL, Build Date: 2023-01-03 16:00:00),
 ('AAPL',
  '2025-07-14 16:00:00'): Stock(Ticker: AAPL, Build Date: 2025-07-14 16:00:00),
 ('AAPL',
  '2025-07-16 16:00:00'): Stock(Ticker: AAPL, Build Date: 2025-07-16 16:00:00)}

In [117]:
# 2025-07-16	AAPL	2026-12-18	450.0	C	8	0.44	18	0.48	20250716	0.460	0.467692	0.269251
start, valuation_date = test_start, test_valuation_date
strike = 450.0
opt_type = 'c'
mid = 0.460
vol = 0.269251
exp_date = '2026-12-18'

mkt_forward = EquityForward(
    start_date=start,
    end_date=end,
    ticker='AAPL',
    valuation_date=valuation_date,
    risk_free_rate=rates,
    dividend_type='discrete',
    dividend=None,  # Market dividend will be set later
)

mkt_forward_cont = EquityForward(
    start_date=start,
    end_date=end,
    ticker='AAPL',
    valuation_date=valuation_date,
    risk_free_rate=rates,
    dividend_type='continuous',
    dividend=None,  # Market dividend will be set later

)
# get_spot('AAPL', start)
mkt_forward_cont.dividend.yield_rate

0.0036162922856715937

In [118]:
mkt_bt = MarketBinomial(
    tick='AAPL',
    K=strike,
    expiration=exp_date,
    sigma=vol,
    N=500,
    dividend_type='continuous',
    option_type=opt_type,
    start_date=start,
    valuation_date=valuation_date,
    american=True
)

mkt_bt.price()

0.46254616656251185

In [119]:
mkt_bt.asset.spot_price

210.16000366210938

In [120]:
crr_binomial_pricing(
    K=strike,
    sigma=vol,
    r=mkt_forward.risk_free_rate,
    N=500,
    spot_price=mkt_bt.asset.spot_price,
    dividend_type='continuous',
    option_type=opt_type,
    T= time_distance_helper(
        mkt_forward.end_date, mkt_forward.valuation_date
    ),
    american=True,
    div_amount=mkt_forward_cont.dividend.yield_rate
)

0.05664847099734474

In [122]:
bt = NodeBinomialLR(
    K=strike,
    expiration=exp_date,
    sigma=vol,
    r=rates,
    N=200,
    spot_price=S,
    dividend_type='discrete',
    div_amount=mkt_forward.dividend.get_year_fractions(),
    option_type=opt_type,
    start_date=start,
    valuation_date=valuation_date,
    american=True
)
# bt.price()
bt.tree
bt.price()

7.688747432059079e-06

In [123]:
bt = VectorBinomialLR(
    K=strike,
    expiration=exp_date,
    sigma=vol,
    r=rates,
    N=200,
    spot_price=S,
    dividend_type='discrete',
    div_amount=mkt_forward.dividend.get_year_fractions(),
    option_type='c',
    start_date=start,
    valuation_date=valuation_date,
    american=False
)
# bt.price()
bt.price(),bt.d, bt.u, bt.p

# bt.price()


(2.0549032318330954e-06,
 0.9775391951331914,
 1.0229768841787958,
 0.3999360964036095)

In [124]:
bt = VectorBinomialCRR(
    K=strike,
    expiration=exp_date,
    sigma=vol,
    r=rates,
    N=1000,
    spot_price=S,
    dividend_type='discrete',
    div_amount=mkt_forward.dividend.get_year_fractions(),
    option_type=opt_type,
    start_date=start,
    valuation_date=valuation_date,
    american=True
)
# bt.price()
bt.tree
bt.price(),bt.d, bt.u, bt.p


(0.47693312688475326,
 0.9898921290700986,
 1.0102110832413596,
 0.5004255089632036)

In [125]:
bt = NodeBinomialCRR(
    K=strike,
    expiration=exp_date,
    sigma=vol,
    r=rates,
    N=1000,
    spot_price=S,
    dividend_type='discrete',
    div_amount=mkt_forward.dividend.get_year_fractions(),
    option_type=opt_type,
    start_date=start,
    valuation_date=valuation_date,
    american=True
)
# bt.price()
# bt.tree
# bt.price()
bt.price(),bt.d, bt.p, bt.u

(0.4769331268847631,
 0.9898921290700985,
 0.5004255089632063,
 1.0102110832413596)

### Vectorized

Using: bjerksund_stensland

In [126]:

def norm_cdf(x):
    return 0.5 * (1 + erf(x / sqrt(2)))


def phi(s, t, gamma, H, I, r, b, sigma):
    lamb = -r + gamma * b + 0.5 * gamma * (gamma - 1) * sigma ** 2
    kappa = 2 * b / (sigma ** 2) + (2 * gamma - 1)
    d = -(log(s / H) + (b + (gamma - 0.5) * sigma ** 2) * t) / (sigma * sqrt(t))
    return (
        exp(lamb * t)
        * s ** gamma
        * (norm_cdf(d) - (I / s) ** kappa * norm_cdf(d - 2 * log(I / s) / (sigma * sqrt(t))))
    )


def bjerksund_stensland_2002(
    S: float,
    K: float,
    T: float,
    r: float,
    sigma: float,
    option_type: Literal["c", "p"] = "c",
    dividend_type: Literal["continuous", "discrete"] = "continuous",
    dividend: float | List[Tuple[float, float]] = 0.0,
) -> float:
    """
    Vectorized Bjerksund-Stensland (2002) American option pricer.
    
    Parameters
    ----------
    S : float
        Spot price
    K : float
        Strike price
    T : float
        Time to maturity (in years)
    r : float
        Risk-free interest rate
    sigma : float
        Volatility
    option_type : 'c' or 'p'
        Option type: call or put
    dividend_type : 'continuous' or 'discrete'
        How dividends are modeled
    dividend : float or list of (t_frac, amount)
        - If continuous, pass yield (float)
        - If discrete, pass list of (time_frac, amount)
        
    Returns
    -------
    float
        Option price
    """
    if dividend_type == "continuous":
        q = float(dividend)
        pv_divs = 0.0
        S_adj = S
    else:
        # Discrete dividend: subtract PV(divs) from spot
        pv_divs = sum([amount * exp(-r * t_frac * T) for t_frac, amount in dividend])
        S_adj = max(S - pv_divs, 1e-8)  # avoid negative

        q = 0.0  # no continuous div
    
    b = r - q
    sigma_sqrt_T = sigma * sqrt(T)
    if S_adj <= 0 or T <= 0 or sigma <= 0:
        return max(0.0, (S_adj - K) if option_type == "c" else (K - S_adj))
    


    beta = (0.5 - b / sigma ** 2) + sqrt(((b / sigma ** 2 - 0.5) ** 2) + 2 * r / sigma ** 2)
    if abs(beta - 1) < 1e-6:
        beta += 1e-4  # or skip BS pricing entirely here, fallback to European
    B_inf = beta / (beta - 1) * K
    B0 = max(K, r / (r - b) * K) if b != r else K * 1.5  # avoids division by zero
    h = -(b * T + 2 * sigma_sqrt_T) * (B0 / (B_inf - B0))
    I = B0 + (B_inf - B0) * (1 - exp(h))

    if S_adj >= I:
        call_price = S_adj - K
    else:
        alpha = (I - K) * I ** (-beta)
        phi_1 = phi(S_adj, T, beta, I, I, r, b, sigma)
        phi_2 = phi(S_adj, T, 1, I, I, r, b, sigma)
        phi_3 = phi(S_adj, T, 1, K, I, r, b, sigma)
        phi_4 = phi(S_adj, T, 0, I, I, r, b, sigma)
        phi_5 = phi(S_adj, T, 0, K, I, r, b, sigma)

        call_price = alpha * S_adj ** beta - alpha * phi_1 + phi_2 - phi_3 - K * phi_4 + K * phi_5

    if option_type == "c":
        return call_price
    else:
        # American put via put-call parity (approximate, not exact)
        european_put = K * exp(-r * T) - S_adj * exp(-q * T)
        return call_price - (S_adj * exp(-q * T) - K * exp(-r * T)) + european_put
    



bjerksund_stensland_2002(
    S=S, 
    K=strike, 
    T=time_distance_helper(exp_date, test_valuation_date), 
    r=rates, 
    sigma=vol, 
    option_type=opt_type, 
    dividend_type='continuous', 
    dividend=mkt_forward_cont.dividend.yield_rate
)

0.4645375373328875

#### BJS Pricing

In [127]:
import numpy as np
from numpy import exp, sqrt, log, maximum
from scipy.stats import norm
from typing import Literal


import numpy as np
from numpy import log, sqrt, exp
from scipy.special import erf

def norm_cdf(x):
    return 0.5 * (1 + erf(x / sqrt(2)))

def phi_vectorized(s, t, gamma, H, I, r, b, sigma):
    lamb = -r + gamma * b + 0.5 * gamma * (gamma - 1) * sigma ** 2
    kappa = 2 * b / (sigma ** 2) + (2 * gamma - 1)
    sigma_sqrt_t = sigma * np.sqrt(t)
    log_s_H = np.log(s / H)
    log_I_s = np.log(I / s)
    d = -(log_s_H + (b + (gamma - 0.5) * sigma ** 2) * t) / sigma_sqrt_t

    return (
        np.exp(lamb * t)
        * s ** gamma
        * (norm_cdf(d) - (I / s) ** kappa * norm_cdf(d - 2 * log_I_s / sigma_sqrt_t))
    )



def bjerksund_stensland_2002_vectorized(
    S: np.ndarray,
    K: np.ndarray,
    T: np.ndarray,
    r: np.ndarray,
    sigma: np.ndarray,
    option_type: Literal["c", "p"] = "c",
    dividend_type: Literal["continuous", "discrete"] = "continuous",
    dividend: float | np.ndarray = 0.0,
    **kwargs
) -> np.ndarray:
    S, K, T, r, sigma = map(np.asarray, (S, K, T, r, sigma))
    zero = np.zeros_like(S)

    # Adjust spot for dividends
    if dividend_type == "continuous":
        q = float(dividend)
        S_adj = S.copy()
    else:
        pv_divs = np.asarray(dividend)
        S_adj = np.maximum(S - pv_divs, 1e-8)
        q = 0.0
    b = r - q
    sigma_sqrt_T = sigma * np.sqrt(T)

    # Mask for invalid values
    invalid = (S_adj <= 0) | (T <= 0) | (sigma <= 0)

    beta = (0.5 - b / sigma**2) + np.sqrt(((b / sigma**2 - 0.5)**2) + 2 * r / sigma**2)
    beta = np.where(np.abs(beta - 1) < 1e-6, beta + 1e-4, beta)

    # Avoid div-by-zero or negatives in B0/B_inf
    B_inf = beta / np.maximum(beta - 1, 1e-8) * K
    B0 = np.where(np.abs(r - b) > 1e-8, r / (r - b) * K, 1.5 * K)

    h = -(b * T + 2 * sigma_sqrt_T) * (B0 / np.maximum(B_inf - B0, 1e-8))
    I = B0 + (B_inf - B0) * (1 - np.exp(h))
    I = np.clip(I, 1e-6, 1e6)

    # Call value if immediate exercise
    early_ex = S_adj >= I
    call_exercise = S_adj - K

    # BS-2002 logic
    S_adj = np.clip(S_adj, 1e-6, 1e6)
    alpha = np.where(I > 0, (I - K) * I**(-beta), 0.0)

    phi_1 = phi_vectorized(S_adj, T, beta, I, I, r, b, sigma)
    phi_2 = phi_vectorized(S_adj, T, 1, I, I, r, b, sigma)
    phi_3 = phi_vectorized(S_adj, T, 1, K, I, r, b, sigma)
    phi_4 = phi_vectorized(S_adj, T, 0, I, I, r, b, sigma)
    phi_5 = phi_vectorized(S_adj, T, 0, K, I, r, b, sigma)

    call_bs2002 = (
        alpha * S_adj**beta
        - alpha * phi_1
        + phi_2
        - phi_3
        - K * phi_4
        + K * phi_5
    )

    call_price = np.where(early_ex, call_exercise, call_bs2002)
    call_price = np.where(invalid, zero, call_price)


    if option_type == "c":
        return call_price
    else:
        # American put via parity approximation
        european_put = K * np.exp(-r * T) - S_adj * np.exp(-q * T)
        pc_parity = call_price - (S_adj * np.exp(-q * T) - K * np.exp(-r * T)) + european_put
        return np.where(invalid, zero, pc_parity)

bjerksund_stensland_2002_vectorized(
    S=S, 
    K=strike, 
    T=time_distance_helper(exp_date, test_valuation_date), 
    r=rates, 
    sigma=vol, 
    option_type=opt_type, 
    dividend_type='continuous', 
    dividend=mkt_forward_cont.dividend.yield_rate
)

array(0.46453754)

#### Vols

- Brute Force BJS
- Scalar Minimize CRR Binomial

In [128]:
def vol_est_brute_force_bjs_2002(
        S: float,
        K: float,
        T: float,
        r: float,
        market_price: float,
        q: float = 0.0,
        option_type: str = 'c',
):
    """

    Brute force method to estimate implied volatility by minimizing the difference
    between the market price and the Black-Scholes price.
    Parameters:
    - F: Forward price
    - K: Strike price
    - T: Time to maturity
    - r: Risk-free rate
    - q: Continuous dividend yield
    - market_price: Market price of the option
    - option_type: 'c' for call, 'p' for put
    Returns:
    - Estimated volatility
    """
    # 
    print("Starting Brute")
    sigmas = np.linspace(0.001, 5, 10)  # Range of volatilities to test
    S, K, T, r, q, option_type = map(np.asarray, (S, K, T, r, q, option_type))
    prices = bjerksund_stensland_2002_vectorized(
        S=S,
        K=K,
        T=T,
        r=r,
        sigma=sigmas,
        option_type=option_type,
        dividend_type='continuous',  # Assuming continuous dividends for this example
        dividend=q  # No discrete dividends in this case
    )
    non_na_mask = ~np.isnan(prices) & ~np.isinf(prices)  # Filter out NaN/Inf prices
    prices = prices[non_na_mask]  # Filter prices
    sigmas = sigmas[non_na_mask]  # Filter corresponding sigmas

    # Calculate the absolute differences between market price and calculated prices
    differences = np.abs(prices - market_price)
    # Find the index of the minimum difference
    min_index = np.argmin(differences)

    # Return the corresponding volatility
    return sigmas[min_index]  # Return the estimated volatility and corresponding price


def estimate_crr_implied_volatility(
        S: float,
        K: float,
        T: float,
        r: float,
        market_price: float,
        q: float = 0.0,
        option_type: str = 'c',
        N: int = 1000,
        dividend_type: Literal['continuous', 'discrete'] = 'continuous',
        american: bool = False
) -> float:
    """
    Estimate implied volatility using optimization.
    
    Parameters:
    - S: Spot price
    - K: Strike price
    - T: Time to maturity
    - r: Risk-free interest rate
    - market_price: Market price of the option
    - q: Continuous dividend yield (default is 0.0)
    - option_type: 'c' for call, 'p' for put
    - N: Number of time steps in the binomial tree
    
    Returns:
    - Estimated volatility
    """

    def binomial_objective_function(sigma: float) -> float:
        
        calculated_price = crr_binomial_pricing(
            K=K,
            T=T,
            sigma=sigma,  # Initial guess for sigma, will be optimized
            r=r,
            N=N,
            spot_price=S,
            dividend_type=dividend_type,
            div_amount=q,
            option_type=option_type,
            american=american
        )
        
        return (calculated_price - market_price) ** 4

    
    result = minimize_scalar(
        binomial_objective_function,
        bounds=(0.001, 5.0),  # Reasonable bounds for volatility
        method='bounded'
    )
    
    return result.x if result.success else None


def vector_brute_force(brute_callable, list_input):
    """
    Vectorized brute force method to estimate implied volatility by minimizing the difference
    between the market price and the Black-Scholes price.
    
    Parameters:
    - brute_callable: Function to call for brute force estimation
    - list_input: List of inputs for the brute force estimation
        eg: [
        (S1, K1, T1, r1, market_price1, q1, option_type1),
        ]
    
    Returns:
    - Estimated volatilities as a numpy array
    """
    estimated_vols = [brute_callable(*params) for params in list_input]

    
    return estimated_vols

In [1469]:
aapl_chain
valuation_dates = [test_valuation_date] * len(aapl_chain)
end_dates = aapl_chain['Expiration'].tolist()
r = [rates] * len(aapl_chain)
s = [S] * len(aapl_chain)
tickers = ['AAPL'] * len(aapl_chain)
T = [time_distance_helper(end_dates[i], valuation_dates[i]) for i in range(len(aapl_chain))]


q = get_vectorized_dividend_rate(
    tickers=tickers,
    spots=s,
    valuation_dates=valuation_dates,
)


discrete_q = get_vectorized_dividend_scehdule(
    tickers=['AAPL'] * len(aapl_chain),
    valuation_dates=[test_valuation_date] * len(aapl_chain),
    end_dates=aapl_chain['Expiration'].tolist(),
    start_dates=[test_valuation_date] * len(aapl_chain),
)

discrete_q_convert = vector_convert_to_time_frac(
    discrete_q, 
    valuation_dates=[test_valuation_date] * len(aapl_chain), 
    end_dates=aapl_chain['Expiration'].tolist(), 
)

In [551]:
vols = aapl_chain['iv'].tolist()

In [None]:
i = 8
bsj_est = vol_est_brute_force_bjs_2002(
        S=s[i],
        K=aapl_chain['Strike'][i],
        T=T[i],
        r=r[i],
        market_price=aapl_chain['Midpoint'][i],
        q=q[i],
        option_type=aapl_chain['Right'][i].lower()
    )


crr_continuous_est = estimate_crr_implied_volatility(
        S=s[i],
        K=aapl_chain['Strike'][i],
        T=T[i],
        r=r[i],
        market_price=aapl_chain['Midpoint'][i],
        q=q[i],
        option_type=aapl_chain['Right'][i].lower(),
        N=100,
        dividend_type='continuous',
        american=True
    )

crr_discrete_est = estimate_crr_implied_volatility(
        S=s[i],
        K=aapl_chain['Strike'][i],
        T=T[i],
        r=r[i],
        market_price=aapl_chain['Midpoint'][i],
        q=discrete_q_convert[i],
        option_type=aapl_chain['Right'][i].lower(),
        N=100,
        dividend_type='discrete',
        american=True
    )

print(f"Estimated BJS-2002 Volatility: {bsj_est:.4f}")
print(f"Estimated CRR Continuous Volatility: {crr_continuous_est:.4f}")
print(f"Estimated CRR Discrete Volatility: {crr_discrete_est:.4f}")

In [None]:

bsj_price = bjerksund_stensland_2002_vectorized(
        S=s[i],
        K=aapl_chain['Strike'][i],
        T=T[i],
        r=r[i],
        sigma=[bsj_est, crr_continuous_est, crr_discrete_est],  # Use the estimated volatility
        option_type=aapl_chain['Right'][i].lower(),
        dividend_type='continuous',
        dividend=q[i]
    )

crr_discrete_price = crr_binomial_pricing(
        K=aapl_chain['Strike'][i],
        T=T[i],
        sigma=crr_discrete_est,  # Use the estimated volatility
        r=r[i],
        N=100,
        spot_price=s[i],
        dividend_type='discrete',
        div_amount=discrete_q_convert[i],
        option_type=aapl_chain['Right'][i].lower(),
        american=True
    )

crr_continuous_price = crr_binomial_pricing(
        K=aapl_chain['Strike'][i],
        T=T[i],
        sigma=crr_continuous_est,  # Use the estimated volatility
        r=r[i],
        N=100,
        spot_price=s[i],
        dividend_type='continuous',
        div_amount=q[i],
        option_type=aapl_chain['Right'][i].lower(),
        american=True
    )

print(f"Mid Price: {aapl_chain['Midpoint'][i]:.4f}")
print(f"Estimated BJS-2002 Price: {float(bsj_price[0]):.4f}")
print(f"Estimated CRR Continuous Price: {crr_continuous_price:.4f}")
print(f"Estimated CRR Discrete Price: {crr_discrete_price:.4f}")

In [1487]:
vol_batch_bjs = np.array([
    vol_est_brute_force_bjs_2002(
        S=s[i],
        K=aapl_chain['Strike'][i],
        T=T[i],
        r=r[i],
        market_price=aapl_chain['Midpoint'][i],
        q=q[i],
        option_type=aapl_chain['Right'][i].lower()
    )
    for i in range(len(aapl_chain))
])

In [None]:
vector_brute_force(
    vol_est_brute_force_bjs_2002,
list(zip(
    s, 
    aapl_chain['Strike'], 
    T, 
    r, 
    aapl_chain['Midpoint'], 
    q, 
    aapl_chain['Right'].str.lower()
))
)

In [583]:
crr_discrete_batch = np.array([
    estimate_crr_implied_volatility(
        S=s[i],
        K=aapl_chain['Strike'][i],
        T=T[i],
        r=r[i],
        market_price=aapl_chain['Midpoint'][i],
        q=discrete_q_convert[i],
        option_type=aapl_chain['Right'][i].lower(),
        N=100,
        dividend_type='discrete',
        american=True
    )
    for i in range(len(aapl_chain))
])

In [None]:
vol_batch_bjs
aapl_chain['iv_bjs'] = vol_batch_bjs
aapl_chain['iv_crr_discrete'] = crr_discrete_batch
aapl_chain[(aapl_chain['Expiration'] == '2025-07-25') & (aapl_chain['Right'] == 'C')].sort_values('Strike').tail(60).plot(x='Strike', y=['iv_bjs', 'iv', 'iv_crr_discrete'], kind='line', title='AAPL Call IV BJS on 2026-06-18')

#### BJS 2002 Greeks

In [1256]:
np.set_printoptions(suppress=True)

In [1264]:
## Numerical
def convert_to_array(value):
    if isinstance(value, (list, np.ndarray)):
        return np.array(value)
    elif isinstance(value, (int, float, str, datetime, np.datetime64)):
        return np.array([value])
    else:
        raise ValueError(f"Unsupported type for value conversion : {type(value)}")

def to_1d_array(x):
    x = np.atleast_1d(x)
    if x.ndim > 1:
        return x.flatten()
    return x


def bjs2002_numerical_greeks(
    K: float,
    T: List[float],
    r: float,
    sigma: float,
    S: float,
    div_yield: Union[float, List[float]] = 1.0,
    option_type: str = "c",
    **kwargs
):
    
    option_type = list(map(lambda x: x.lower(), option_type)) 
    K, T, r, sigma, S, div_yield, option_type = map(
        convert_to_array, (K, T, r, sigma, S, div_yield, option_type)
    )
     # Ensure option_type is lowercase
    finite_estimator = FiniteGreeksEstimator(
        price_func = bjerksund_stensland_2002_vectorized,
        base_params = {
            'K': K,
            'T': T,
            'r': r,
            'sigma': sigma,
            'S': S,
            'div_amount': div_yield,
            'option_type': option_type,
            'q': None
        },
        dx_thresh = 0.00001,
        method = 'backward',
    )
    greeks = finite_estimator.all_first_order()
    greeks.update(finite_estimator.all_second_order())
    greeks = dict(sorted(greeks.items(), key=lambda item: item[0]))
    del finite_estimator
    return greeks


# g = bjs2002_numerical_greeks(
#     K=aapl_chain['Strike'].tolist()[:1],
#     T= [time_distance_helper(date, test_valuation_date) for date in aapl_chain['Expiration'].tolist()[:1]],
#     r=[rates] * 1,
#     sigma=aapl_chain['iv'].tolist()[:1],
#     S=s[:1],
#     dividend_type='discrete',
#     div_amount=discrete_q_convert[:1],
#     option_type=aapl_chain['Right'].tolist()[:1]
# )

# g

In [None]:
g = bjs2002_numerical_greeks(
    K=[215],
    T= [time_distance_helper('2025-12-19', datetime.now())],
    r=[0.04223],
    sigma=[0.2579],
    S=[214.67],
    dividend_type=['continuous'],
    div_amount=[0.004],
    option_type=['C'],
)
g

In [None]:
bt = VectorBinomialCRR(
    K=215,
    expiration='2025-12-19',
    sigma=0.2579,
    r=0.04223,
    N=100,
    spot_price=214.67,
    dividend_type='continuous',
    div_amount=0.004,
    option_type='c',
    start_date=datetime.now(),
    valuation_date=datetime.now(),
    american=True
)
bt.price()

In [None]:
bt.delta(), bt.gamma(), bt.vega(), bt.theta(), bt.rho(), bt.volga()

In [None]:
bt = NodeBinomialCRR(
    K=215,
    expiration='2025-12-19',
    sigma=0.2579,
    r=0.04223,
    N=100,
    spot_price=214.67,
    dividend_type='continuous',
    div_amount=0.004,
    option_type='c',
    start_date=datetime.now(),
    valuation_date=datetime.now(),
    american=True
)
bt.price()

In [None]:
bt.delta(), bt.gamma(), bt.vega(), bt.theta(), bt.rho(), bt.volga()

#### Binomial Tree Greeks

In [1407]:
def binomial_tree_greeks(
    K: float|np.ndarray,
    expiration: float|np.ndarray,
    sigma: float|np.ndarray,
    r: float|np.ndarray,
    N: float|np.ndarray,
    S: float|np.ndarray,
    dividend_type: float|np.ndarray,
    div_amount: float|np.ndarray,
    option_type: float|np.ndarray,
    start_date: float|np.ndarray,
    valuation_date: float|np.ndarray,
    american: float|np.ndarray,
):
    """
    Calculate Greeks using a binomial tree model.
    
    Parameters:
    - K: Strike price
    - expiration: Expiration date of the option
    - sigma: Volatility of the underlying asset
    - r: Risk-free interest rate
    - N: Number of time steps in the binomial tree
    - spot_price: Current price of the underlying asset (optional)
    - dividend_type: Type of dividend ('discrete' or 'continuous')
    - div_amount: Amount of dividend (if applicable)
    - option_type: 'c' for call, 'p' for put
    - start_date: Start date for the option pricing (optional)
    - valuation_date: Date for which the option is priced (optional)
    
    Returns:
    Dictionary with calculated Greeks.
    """

    K, expiration, sigma, r, N, S, dividend_type, option_type, start_date, valuation_date, american = map(
        convert_to_array, 
        (K, expiration, sigma, r, N, S, dividend_type, option_type, start_date, valuation_date, american)
    )
    if dividend_type == 'discrete':
        div_amount = np.array([div_amount])
    else:
        div_amount = convert_to_array(div_amount)
    # Ensure all inputs are numpy arrays
    models = [
        VectorBinomialCRR(
            K=k,
            expiration=exp,
            sigma=s,
            r=ri,
            N=int(n),
            spot_price=sp,
            dividend_type=dt,
            div_amount=da,
            option_type=ot,
            start_date=start_date,
            valuation_date=valuation_date,
            american=am
        )
        for k, exp, s, ri, n, sp, dt, da, ot, start_date, valuation_date, am in zip(
            K, expiration, sigma, r, N, S, dividend_type, div_amount, option_type, start_date, valuation_date, american
        )
    ]
    price = np.array([model.price() for model in models])
    

    return {
        'delta': np.array([model.delta() for model in models]),
        'gamma': np.array([model.gamma() for model in models]),
        'vega': np.array([model.vega() for model in models]),
        'theta': np.array([model.theta() for model in models]),
        'rho': np.array([model.rho() for model in models]),
        'volga': np.array([model.volga() for model in models]),
        'model': np.array([model for model in models]),
    }

In [None]:
res = binomial_tree_greeks(
    K=215,
    expiration='2025-12-19',
    sigma=0.2579,
    r=0.04223,
    N=100,
    S=214.67,
    dividend_type='discrete',
    div_amount=mkt_forward.dividend.get_year_fractions(),
    option_type='c',
    start_date=datetime.now(),
    valuation_date=datetime.now(),
    american=True
)
res

## BATCH Operations

In [1442]:
from trade.helpers.pools import runProcesses
from typing import List, Union

def vector_batch_processor(callable, OrderedInputs: List[List], num_process = None):
    """
    Process a list of inputs in parallel using multiprocessing. This processor assumes the callable works with vectorization
    
    Parameters:
    - callable: Function to call with each set of inputs
    - OrderedInputs: List of lists, where each sublist contains arguments for the callable
    - num_process: Number of processes to use (default is None, which uses all available cores)
    
    Returns:
    List of results from the callable.
    """
    if num_process is None:
        num_process = os.cpu_count() or 8


    OrderedInputs = np.array(OrderedInputs, dtype=object)

    paritioned_inputs = np.array_split(OrderedInputs, num_process, axis=1)
    


In [None]:
# vectorized_market_forward_calc(
#     ticks=tickers,
#     S=s,
#     valuation_dates=valuation_dates,
#     end_dates=end_dates,
#     r=r,
#     div_type='discrete'
# )

ordered_inputs = np.array([
    tickers[:8], 
    [s] * 8,  # Limit to first 8 for testing
    valuation_dates[:8],  # Limit to first 8 for testing
    end_dates[:8],  # Limit to first 8 for testing
    r[:8], 
    ['discrete'] * 8
])

vector_batch_processor(
    vectorized_market_forward_calc,
    OrderedInputs=ordered_inputs,
    num_process=None
)

In [1502]:
runProcesses(vector_brute_force,
             np.array_split(list(zip(
    s, 
    aapl_chain['Strike'], 
    T, 
    r, 
    aapl_chain['Midpoint'], 
    q, 
    aapl_chain['Right'].str.lower()
)), 8,axis=1))

In [None]:
np.array_split(list(zip(
    s, 
    aapl_chain['Strike'], 
    T, 
    r, 
    aapl_chain['Midpoint'], 
    q, 
    aapl_chain['Right'].str.lower()
)), 8)

In [None]:
np.array_split(ordered_inputs, 8,axis=1)

In [None]:
import numpy as np

a = np.array([7, 2, 5, 1, 8, 3])
k = 3
parted = np.partition(a, k)

print(parted)  # e.g., [2 1 3 5 8 7]
print(parted[k])  # 4th smallest element (index 3) = 5


## MISC

In [None]:
black_scholes_vectorized_base(
    F=[100, 105, 110],
    K=[100, 100, 100],
    T=[1.0, 1.0, 1.0],
    r=[0.05, 0.05, 0.05],
    sigma=[0.2, 0.2, 0.2],
    option_type=["c", "p", "c"]
)

In [None]:
ds = DividendSchedule(
    start_date=datetime(2025, 7, 4),
    end_date=datetime(2026, 7, 4),
    freq="quarterly",
    amount=1.5
)

print(ds.get_schedule())
print(ds.get_year_fractions())
ds

In [None]:
ds = DividendSchedule(
    # last_dividend_date=datetime(2025, 6, 4),
    start_date=datetime(2025, 1, 4),
    end_date=datetime(2026, 7, 4),
    valuation_date=datetime(2025, 6, 4),
    freq="quarterly",
    amount=0.75
)

print(ds.get_schedule())
print(ds.get_year_fractions())
print(ds.get_present_value(0.05, sum_up=True))  # Example discount rate of 5%
print(ds.get_present_value(0.05, sum_up=False))  # Example discount rate of 5%
ds

In [None]:
PRECISION = 5
american = True
u = 1.1
d = 1/u
s0 = [100]
k =  K  = 110
N = 5
tree = [s0]
T = 0.5
dt = T / N
opt_type = 'c'
r, y = 0.0045, 0
p = (np.exp((r - y) * dt) - d) / (u - d)
## Start level loop
for i in range(N):
    level = []
    for j in tree[i]:
        level.append(round(j * u, PRECISION)) ## append up
        level.append(round(j * d, PRECISION)) ## appemd down
    level = list(set(level))  ## Unique set
    tree.append(sorted(level))


The stock price at node \( (i, j) \) in a recombining binomial tree is:

\[
S_{i,j} = S_0 \cdot u^{2j - i}
\]

Where:
- \( i \): time step
- \( j \): number of up moves
- \( u \): up factor
- \( d = 1/u \): down factor
- \( S_0 \): initial stock price


In [None]:


tree = []
for i in range(N + 1):
    level = [s0[0] * (u ** (2 * j - i)) for j in range(i + 1)]
    tree.append(level)



In [None]:
option_tree = deepcopy(tree)

terminal_node =option_tree[-1]
for i in range(len(terminal_node)):
    terminal_node[i] = max(0, terminal_node[i] - k if opt_type == 'c' else k - terminal_node[i])

# Backward induction to calculate option values at each node
for i in range(N, -1, -1):
    current_node = option_tree[i]
    previous_node = option_tree[i-1]
    intrinsic = [x - k if opt_type == 'c' else k - x for x in previous_node]
    for j in range(1,len(option_tree[i])):
        disc_value = ((current_node[j-1] * (1-p)) + (current_node[j] * p)) * np.exp(-r * dt)
        if american:
            disc_value = max(intrinsic[j-1], disc_value)
        previous_node[j-1] = disc_value
    
option_tree[0][0]

        
option_tree


In [None]:
from copy import deepcopy
import numpy as np

option_tree = deepcopy(tree)

# Initialize terminal payoffs
terminal_node = option_tree[-1]
for i in range(len(terminal_node)):
    terminal_node[i] = max(0, terminal_node[i] - k if opt_type == 'c' else k - terminal_node[i])

# Backward induction
for i in range(N - 1, -1, -1):  # Go from N-1 down to 0
    current_node = option_tree[i + 1]
    previous_node = option_tree[i]
    intrinsic = [x - k if opt_type == 'c' else k - x for x in previous_node]

    for j in range(len(previous_node)):
        disc_value = np.exp(-r * dt) * ((1 - p) * current_node[j] + p * current_node[j + 1])
        if american:
            disc_value = max(disc_value, intrinsic[j])
        previous_node[j] = disc_value
option_tree[0][0]

        
option_tree

In [None]:
stock_tree = [
    [s0[0] * (u ** j) * (d ** (i - j)) for j in range(i + 1)]
    for i in range(N + 1)
]
# stock_tree

In [None]:

terminal_prices = stock_tree[-1]  # Get the terminal prices from the last row of the stock tree
if opt_type == 'c':
    option_values = [max(0, price - k) for price in terminal_prices]  # Call option payoff
elif opt_type == 'p':
    option_values = [max(0, k - price) for price in terminal_prices]

option_values = option_values

option_values = option_values
# Backward induction to calculate option values at each node
for i in range(N - 1, -1, -1):
    option_values = [
        np.exp(-r * dt) * (p * option_values[j+1] + (1 - p) * option_values[j]) ## Ordered from down to up.
        ## Moves from all power in d, to all power in u by 1 step. Counting down on size i
        for j in range(i + 1) ## At each node, there is Node+1 size
    ]

    # If American option, check for early exercise
    if american:
        early_exercise = [
            max(val, (p - K) if opt_type == 'c' else (K - p))
            for p, val in zip(stock_tree[i], option_values)
        ]
        option_values = early_exercise
option_values[0]
