In [None]:
import yfinance as yf
import numpy as np

ticker = 'AAPL'

start_date = "2015-01-01"
df = yf.download(
    ticker, 
    interval='1d',
    start=start_date,
    # period='10y',
)
df.columns = df.columns.get_level_values(0)
df = df[['Close']] # keep only close prices
df['return'] = df['Close'].pct_change()

df.to_csv('data.csv')

In [None]:
import numpy as np
import pandas as pd

data = pd.read_csv('data.csv', index_col = 'Date')

In [None]:
initial_price = data.loc['2025-01-16', 'Close']

cutoff_ratio = np.array([np.exp(-.3), np.exp(-.03), np.exp(.03), np.exp(.3), ])
cutoff_price = initial_price * cutoff_ratio

In [None]:
cutoff_price

In [None]:
prices_to_run = [ 160, 200, 230, 260, 310 ]

## Side: HMM

In [None]:
import numpy as np
import pandas as pd

import matplotlib.pyplot as plt

from hmmlearn.hmm import GaussianHMM

In [None]:
data.dropna(inplace=True)

X = data['return'].values.reshape(-1, 1)
model = GaussianHMM(n_components=2, covariance_type='full', n_iter=1000).fit(X)

hidden_states = model.predict(X)
hidden_states

plt.figure(figsize=(15,6))
for state in range(model.n_components):
    mask = hidden_states == state
    plt.plot(data.index[mask], data['Close'][mask], label=f"State {state}")
plt.legend()
plt.tight_layout()

from matplotlib.dates import YearLocator, MonthLocator
plt.gca().xaxis.set_major_locator(YearLocator())
plt.gca().xaxis.set_minor_locator(MonthLocator())
plt.xticks(rotation=45)

plt.show() 

In [None]:
# states mean and variances 
for i in range(model.n_components):
    print(f'{i} hidden state:')
    print(f'Annualized Mean: {model.means_[i] * 252}')
    print(f'Annualized Var: {np.sqrt(np.diag(model.covars_[i]) * 252)}')
    print()
print(f'Initial state is State {max(range(model.n_components), key=lambda x: model.startprob_[x])}')

### HMM experiment ends.

In [None]:
def getReturnVolatility_1(df, state, period=50): 
    '''For 2 state trinomial lattice'''

    if state not in (0, 1):
        raise ValueError('The state does not exist.')
    
    local_df = df.copy()
    local_df['state'] = np.where(local_df['return'] >= 0, 0, 1)
    local_df['segment'] = pd.qcut(range(len(local_df)), q=period, labels=False)

    volatility_per_segment = local_df[local_df['state'] == state].groupby('segment')['return'].var()
    weight = pd.Series([2**(-(period-i)) for i in range(period)])
    return np.sqrt(volatility_per_segment.dot(weight) * 252)


def getReturnVolatility_2(df, state, period=20):
    '''For 3-state trinomial lattice'''
    if state not in (0, 1, 2):
        raise ValueError('The state does not exist.')
    
    local_df = df.copy()
    local_df['state'] = 0
    local_df['state'] = local_df['return'].rolling(3).agg(lambda x: 1 if np.all(x >= 0) else (2 if np.all(x < 0) else 0))

    local_df['segment'] = pd.qcut(range(len(local_df)), q=period, labels=False)

    volatility_per_segment = local_df[local_df['state'] == state].groupby('segment')['return'].var().reindex(range(period))
    weight = pd.Series([2**(-(period-i)) for i in range(period)])
    assert abs(sum(weight) - 1) <= 1e-6

    return np.sqrt(volatility_per_segment.dot(weight) * 252)
    

In [None]:
# vols = [ getReturnVolatility_1(data, i, 100).item() for i in range(2) ]
vols = [ getReturnVolatility_2(data, i, 20).item() for i in range(3) ]

res = [{'state': i, 'volatility': vols[i]} for i in range(3)]
pd.DataFrame(res).to_csv('param_3.csv', index=False)

### Phase 2: building matrices

In [None]:
import numpy as np
import pandas as pd
from functools import cache

In [None]:
# generating matrix A
def get_markov_transition_prob(delta_t, A, max_iter=50):
    res = np.eye(A.shape[0])
    prev = np.eye(A.shape[0])
    for i in range(max_iter):
        prev = prev @ A * delta_t / (i+1)
        res += prev
    return res

def get_risk_neu_prob(P, sigma, delta_t, risk_free_int):
    risk_neutral_prob = { i: [-1]*3 for i in range(P.shape[0]) } # [up, mid, down]

    for i in range(P.shape[0]):
        risk_neutral_prob[i][1] = 1 - (vols[i]/sigma) ** 2
        risk_neutral_prob[i][0] = (np.exp(risk_free_int[i]*delta_t)- np.exp(-sigma * np.sqrt(delta_t))- \
                                (1-(vols[i]/sigma)**2)*(1-np.exp(-sigma * np.sqrt(delta_t))))/(np.exp(sigma * np.sqrt(delta_t)) - np.exp(-sigma * np.sqrt(delta_t)))
        risk_neutral_prob[i][2] = (np.exp(sigma * np.sqrt(delta_t)) - np.exp(risk_free_int[i]*delta_t)- \
                                (1-(vols[i]/sigma)**2)*(np.exp(sigma * np.sqrt(delta_t))-1))/(np.exp(sigma * np.sqrt(delta_t)) - np.exp(-sigma * np.sqrt(delta_t)))
        
    return risk_neutral_prob

def trinomial_call_value(state, K, P, risk_neutral_prob, risk_free_int, S0, sigma, T, N, american=False):
    """
    V(t,n,j) be the value of the derivative at the nth node at time step t under the jth regime state.
    time starts from 0
    node index starts from 1 (from bottom)
    """
    if state >= P.shape[0]:
        raise Exception('Invalid state')

    delta_t = T / N
    
    @cache
    def V(t, n, i):
        if t == N:
            return max(0, S0*np.exp((n - 1 - N)*sigma*np.sqrt(delta_t))-K)
        res = 0
        for j in range(P.shape[0]): # regimes
            res += P[i, j] * (
                risk_neutral_prob[i][0] * V(t+1, n+2, j) +
                risk_neutral_prob[i][1] * V(t+1, n+1, j) +
                risk_neutral_prob[i][2] * V(t+1, n, j)
            )
        # number of node = 2t+1
        res *= np.exp(-risk_free_int[i]*delta_t)
        if american:
            intrinsic = max(0, S0*np.exp((n - 1 - t) * sigma * np.sqrt(delta_t)) - K)
            res = max(res, intrinsic)
        
        return res
    
    return V(0, 1, state)
    

def trinomial_put_value(state, K, P, risk_neutral_prob, risk_free_int, S0, sigma, T, N, american=False):
    if state >= P.shape[0]:
        raise Exception('Invalid state')

    delta_t = T / N
    
    @cache
    def V(t, n, i):
        if t == N:
            return max(0, K - S0*np.exp((n - 1 - N)*sigma*np.sqrt(delta_t)))
        
        res = 0
        for j in range(P.shape[0]):  # Iterate over regimes
            res += P[i, j] * (
                risk_neutral_prob[i][0] * V(t+1, n+2, j) +
                risk_neutral_prob[i][1] * V(t+1, n+1, j) +
                risk_neutral_prob[i][2] * V(t+1, n, j)
            )
        
        discounted_value = np.exp(-risk_free_int[i] * delta_t) * res
        
        if american:
            intrinsic_value = max(0, K - S0*np.exp((n - 1 - t) * sigma * np.sqrt(delta_t)))
            return max(discounted_value, intrinsic_value)
        
        return discounted_value
    
    return V(0, 1, state)



In [None]:
T = 1
N = 1000
S0 = data['Close'].loc['2025-01-16'] # take a close price
K = 250
risk_free_int = [.04] * 3 # adjust the state numbers
sigma = max(vols) + (np.sqrt(1.5)-1) * np.mean(vols) # suggested by the paper
# A = np.array([
#     [-0.5,  0.5],
#     [ 0.5, -0.5],
# ])
A = np.array([
    [-1,  0.5,  0.5],
    [ 0.5, -1,  0.5],
    [ 0.5,  0.5, -1]
])
P = get_markov_transition_prob(T/N, A)
risk_neutral_prob = get_risk_neu_prob(P, sigma, T/N, risk_free_int)

In [None]:
result = []

for state in range(3):
    for is_american in [True, False]:
        option_type = "American" if is_american else "European"
        for strike in prices_to_run:
            call_price = trinomial_call_value(state, strike, P, risk_neutral_prob, risk_free_int, S0, sigma, 1, 1000, is_american)
            put_price = trinomial_put_value(state, strike, P, risk_neutral_prob, risk_free_int, S0, sigma, 1, 1000, is_american)

            result.append({
                "State": state,
                "Type": option_type,
                "Strike": strike,
                "Call Price": call_price,
                "Put Price": put_price
            })

df = pd.DataFrame(result)
df['moneyness'] = np.log(df['Strike'] / S0)
df.to_csv("trinomial_result_3_regimes.csv", index=False)


In [None]:
tmp = pd.read_csv('trinomial_3_regimes.csv')
tmp['price/strike'] = initial_price / tmp['Strike']
tmp.to_csv('trinomial_3_regimes.csv', index=False)

## American Monte Carlo

In [None]:
import numpy as np

class AmericanOptionsLSMC:
    """ Class for American options pricing using Longstaff-Schwartz (2001):
    "Valuing American Options by Simulation: A Simple Least-Squares Approach."
    Review of Financial Studies, Vol. 14, 113-147.
    S0 : float : initial stock/index level
    strike : float : strike price
    T : float : time to maturity (in year fractions)
    M : int : grid or granularity for time (in number of total points)
    r : float : constant risk-free short rate
    div :    float : dividend yield
    sigma :  float : volatility factor in diffusion term 
    
    Unitest(doctest): 
    >>> AmericanPUT = AmericanOptionsLSMC('put', 36., 40., 1., 50, 0.06, 0.06, 0.2, 10000)
    >>> AmericanPUT.price
    4.4731177017712209
    """

    def __init__(self, option_type, S0, strike, T, M, r, div, sigma, simulations):
        try:
            self.option_type = option_type
            assert isinstance(option_type, str)
            self.S0 = float(S0)
            self.strike = float(strike)
            assert T > 0
            self.T = float(T)
            assert M > 0
            self.M = int(M)
            assert r >= 0
            self.r = float(r)
            assert div >= 0
            self.div = float(div)
            assert sigma > 0
            self.sigma = float(sigma)
            assert simulations > 0
            self.simulations = int(simulations)
        except ValueError:
            print('Error passing Options parameters')


        if option_type != 'call' and option_type != 'put':
            raise ValueError("Error: option type not valid. Enter 'call' or 'put'")
        if S0 < 0 or strike < 0 or T <= 0 or r < 0 or div < 0 or sigma < 0:
            raise ValueError('Error: Negative inputs not allowed')

        self.time_unit = self.T / float(self.M)
        self.discount = np.exp(-self.r * self.time_unit)

    @property
    def MCprice_matrix(self, seed = 123):
        """ Returns MC price matrix rows: time columns: price-path simulation """
        np.random.seed(seed)
        MCprice_matrix = np.zeros((self.M + 1, self.simulations), dtype=np.float64)
        MCprice_matrix[0,:] = self.S0
        for t in range(1, self.M + 1):
            brownian = np.random.standard_normal( self.simulations // 2)
            brownian = np.concatenate((brownian, -brownian))
            MCprice_matrix[t, :] = (MCprice_matrix[t - 1, :]
                                  * np.exp((self.r - self.sigma ** 2 / 2.) * self.time_unit
                                  + self.sigma * brownian * np.sqrt(self.time_unit)))
        return MCprice_matrix

    @property
    def MCpayoff(self):
        """Returns the inner-value of American Option"""
        if self.option_type == 'call':
            payoff = np.maximum(self.MCprice_matrix - self.strike,
                           np.zeros((self.M + 1, self.simulations),dtype=np.float64))
        else:
            payoff = np.maximum(self.strike - self.MCprice_matrix,
                            np.zeros((self.M + 1, self.simulations),
                            dtype=np.float64))
        return payoff

    @property
    def value_vector(self):
        value_matrix = np.zeros_like(self.MCpayoff)
        value_matrix[-1, :] = self.MCpayoff[-1, :]
        for t in range(self.M - 1, 0 , -1):
            regression = np.polyfit(self.MCprice_matrix[t, :], value_matrix[t + 1, :] * self.discount, 2)
            continuation_value = np.polyval(regression, self.MCprice_matrix[t, :])
            value_matrix[t, :] = np.where(self.MCpayoff[t, :] > continuation_value,
                                          self.MCpayoff[t, :],
                                          value_matrix[t + 1, :] * self.discount)

        return value_matrix[1,:] * self.discount


    @property
    def price(self): return np.sum(self.value_vector) / float(self.simulations)

In [None]:
N = 500
data = []
# risk free interest fixed at 4%
for state in range(3):
    for strike in range(240, 281, 10):
        call_price = AmericanOptionsLSMC('call', S0, strike, 1, N, 0.04, 0, vols[state], 10000).price
        put_price = AmericanOptionsLSMC('put', S0, strike, 1, N, 0.04, 0, vols[state], 10000).price

        # Append a dictionary with results
        data.append({
            "State": state,
            "Type": 'American',
            "Strike": strike,
            "Call Price": call_price,
            "Put Price": put_price
        })


# df.to_csv("MC_3states.csv", index=False, mode='a', header=False)
pd.DataFrame(data).to_csv('tmp.csv', index=False)

In [None]:
# Standard Monte Carlo for European Options
# https://www.kaggle.com/code/ypark4857/monte-carlo-simulation-of-european-option-pricing

import numpy as np

rng = np.random.default_rng(10000)

def mc_european_option(S, K, T, r, sigma, N, type = 'call'):
    
    # Initial Asset Pricie
    S_init = S
    
    # X follows a standard normal distribution
    X = rng.normal(0, 1, N)
    
    # The Distribution of asset prices at the Expiration of the Option
    ST = S_init * np.exp((r-0.5*sigma**2)*T + sigma*np.sqrt(T)*X)
    
    # The Discounted payoff of European call option at expiration
    if type == 'call':
        fST = np.exp(-r*T) * np.maximum(ST-K, 0)
    elif type == 'put':
        fST = np.exp(-r*T) * np.maximum(K-ST, 0)

    # The option value by taking the expected discounted payoff
    price = np.mean(fST)

    return price

In [None]:
data = []

for state in range(3):
    for strike in range(240, 281, 10):
        call_price = mc_european_option(S0, strike, 1, 0.04, vols[state], 10000, 'call')
        put_price = mc_european_option(S0, strike, 1, 0.04, vols[state], 10000, 'put')

        # Append a dictionary with results
        data.append({
            "State": state,
            "Type": 'European',
            "Strike": strike,
            "Call Price": call_price.item(),
            "Put Price": put_price.item()
        })

df = pd.DataFrame(data)
df.to_csv('MC_3states.csv', mode='a', index=False, header=False)

#### For MC, American options (priced using LSMC) are cheaper than European options due possbily to poor fitting (Therefore may not be good source).

## Result post-processing (before review)

In [None]:
price_lsmc = pd.read_csv('MC_3states.csv')
price_tri = pd.read_csv('trinomial_result_2_regimes.csv')

price_merge = pd.merge(price_lsmc, price_tri, on=['State', 'Strike', 'Type'], suffixes=('_MC', '_Trinomial'), how='right')
price_merge.round(5).to_csv('combined_result_2_new.csv', index=False)

In [None]:
# With real market data

def combine(filename: str, output: str) -> None:
    import pandas as pd
    df = pd.read_csv(filename)
    market = pd.read_csv('market_price.csv')
    res = pd.merge(df, market, how='left', on=['Type', 'Strike'])

    res.to_csv(output, index=False)

combine('combined_result_2.csv', 'res2.csv')
combine('combined_result_3.csv', 'res3.csv')

In [None]:
import pandas as pd
pd.read_csv('res3.csv')

### Result processing with new strikes

In [None]:
import pandas as pd
market = pd.read_csv('aapl_op.csv')
res2 = pd.read_csv('trinomial_2_regimes.csv')
res3 = pd.read_csv('trinomial_3_regimes.csv')

market.rename(columns={'call': 'call_market', 'put': 'put_market'}, inplace=True)
res2.rename(columns={'call': 'call_tri', 'put': 'put_tri'}, inplace=True)
res3.rename(columns={'call': 'call_tri', 'put': 'put_tri'}, inplace=True)

In [None]:
market.columns

In [None]:
pd.merge(market, res2, on='Strike', how='right').rename(columns={'State': 'init_state'}).round(4).to_csv('combined_2.csv', index=False)
pd.merge(market, res3, on='Strike', how='right').rename(columns={'State': 'init_state'}).round(4).to_csv('combined_3.csv', index=False)

In [None]:
pd.read_csv('combined_2.csv')