In [1]:
import numpy as np
import pandas as pd

class InvalidTypeError(Exception):
    """enter either a call or put option for the type param"""
    pass

TODO: 
- make a portfolio with weights
- run monte carlo simulation


Step 1: Get baseline monte carlo simulation with constant volatility (we use today's volatility for all steps in the MC simulation)

In [None]:
import requests
from pathlib import Path
import matplotlib.pyplot as plt
import seaborn as sns
from secrets import ALPHAVANTAGE_API_KEY


# ALPHAVANTAGE_API_KEY = 'XJ2WQUNNHBYG6D9S'
NUM_MINS = '1'

################################################################################
#                           DOWNLOAD FUNCTION                                    
################################################################################
def download_once(url: str, local_path: str, repull=False) -> Path:
    p = Path(local_path)
    if p.exists() and not repull: # do not rewrite path
        return p
    r = requests.get(url, timeout=30) # sends http get request to url. stores it to a response object
    r.raise_for_status() # checks if response object's status is 200-299 (not 404 for example)
    p.write_bytes(r.content) 
    return p

################################################################################
#                           CALCULATE PAYOFF FUNCTION                                   
################################################################################
# helper function
def calculate_next_price(price_init, risk_free_rate, div_yield, volatility, maturity, time_steps):
    dt = maturity/time_steps
    normal_RV = np.random.normal()
    price_next = price_init*np.exp((risk_free_rate - div_yield - 0.5*volatility**2)*dt + volatility*np.sqrt(dt)*normal_RV)
    return price_next
def calculate_payoff(price_init, risk_free_rate, div_yield, volatility, strike, maturity, time_steps, type="Call"):
    data = {'Day': [0,],
            'Price': [price_init,],}
    
    next_price = None
    for step in range(time_steps):
        next_price = calculate_next_price(price_init, risk_free_rate, div_yield, volatility, maturity, time_steps)
        price_init = next_price
        data['Price'].append(next_price)
        data['Day'].append(1+step)
    df = pd.DataFrame(data)
    if type == "Call":
        return max(next_price - strike, 0), df
    elif type == "Put":
        return max(strike - next_price, 0), df
    else:
        raise InvalidTypeError
def calculate_MC_estimator(price_init: float, risk_free_rate: float, div_yield: float, volatility: float, strike: float, maturity: float, num_simulations: int, time_steps: int, ticker: str, type="Call") -> tuple[float, float, pd.DataFrame]: 
    payoff_list = []
    list_of_df = []
    for iteration_num in range(num_simulations):
        payoff, df_i = calculate_payoff(price_init, risk_free_rate, div_yield, volatility, strike, maturity, time_steps, type)
        payoff_list.append(payoff)
        df_i['Iteration Number'] = iteration_num
        list_of_df.append(df_i)
    df_MC = pd.concat(list_of_df, ignore_index=True)
    

    fig, ax = plt.subplots()
    ax.hist(payoff_list, bins=100)
    ax.set(title=f'Monte Carlo Simulation for {ticker} (European Option {type})', xlabel='Payoff (in dollars)', ylabel='Frequency')
    plt.show()
    
    
    discount_factor = np.exp(-1*risk_free_rate*maturity)
    MC_estimator = discount_factor*sum(payoff_list)/num_simulations
    standard_error = discount_factor*np.std(payoff_list, ddof=1)/np.sqrt(num_simulations)
    return MC_estimator, standard_error, df_MC



################################################################################
#                           Get risk-free rate                                     
################################################################################
_3_month_treasury_constant_maturity_yield_url = "https://fred.stlouisfed.org/graph/fredgraph.csv?id=DGS3MO"
_2_year_treasury_constant_maturity_yield_url = "https://fred.stlouisfed.org/graph/fredgraph.csv?id=DGS2"
_10_year_treasury_constant_maturity_yield_url = "https://fred.stlouisfed.org/graph/fredgraph.csv?id=DGS10"
csv_path_3m = download_once(_3_month_treasury_constant_maturity_yield_url, "treasury_data/3_month_treasury_constant_maturity_yield.csv")
print("Saved to:", csv_path_3m)
csv_path_2y = download_once(_2_year_treasury_constant_maturity_yield_url, "treasury_data/2_year_treasury_constant_maturity_yield.csv")
print("Saved to:", csv_path_2y)
csv_path_10y = download_once(_10_year_treasury_constant_maturity_yield_url, "treasury_data/10_year_treasury_constant_maturity_yield.csv")
print("Saved to:", csv_path_10y)

raw_3m = np.genfromtxt(csv_path_3m, delimiter=",", names=True, dtype=None, encoding="utf-8", filling_values=None)
df_3m = pd.DataFrame(raw_3m)
df_3m.columns = ['date', 'yield']
risk_free_rate = np.log(1 + df_3m['yield'].iloc[-1]/100) # compounded continuously

print(f"r={risk_free_rate}")


################################################################################
#                           MAIN PORTFOLIO                                     
################################################################################

# maps: tickers (str) --> weights (float)
main_portfolio = {'AAPL': 0.3,}


for ticker in main_portfolio.keys():
    ticker = 'AAPL'
    print(ticker)
    ################################################################################
    #                           Get spot price today (S0)                                     
    ################################################################################
    url_daily = f'https://www.alphavantage.co/query?function=TIME_SERIES_DAILY&symbol={ticker}&interval={NUM_MINS}min&apikey={ALPHAVANTAGE_API_KEY}&datatype=csv'
    csv_path_daily = download_once(url_daily, f"stock_data/{ticker}_alphavantage_daily.csv")
    print("Saved to:", csv_path_daily)
    raw_daily = np.genfromtxt(csv_path_daily, delimiter=",", names=True, dtype=None, encoding="utf-8")

    # columns: timestamp,open,high,low,close,volume
    df_daily = pd.DataFrame(raw_daily) 
    df_daily['timestamp'] = pd.to_datetime(df_daily['timestamp'])
    DAYS_BEFORE_TODAY = 30
    price_init = df_daily['close'].iloc[DAYS_BEFORE_TODAY]
    simulation_start_date = df_daily['timestamp'].iloc[DAYS_BEFORE_TODAY]
    print(f"S0 = {price_init}")

    ################################################################################
    #                           Get volatility (sigma [SD])                                    
    ################################################################################
    num_days = 126
    prices_list = list(df_daily['close'].iloc[DAYS_BEFORE_TODAY-num_days:])
    logret = np.diff(np.log(prices_list))
    volatility = float(logret.std(ddof=1) * np.sqrt(252)) #252 trading days is standard for delta t
    print("sigma (annualized) =", volatility)
    
    ################################################################################
    #                           Get dividend yield of the stock (d)                                    
    ################################################################################
    url_div = f'https://www.alphavantage.co/query?function=DIVIDENDS&symbol={ticker}&apikey={ALPHAVANTAGE_API_KEY}&datatype=csv'
    csv_path_div = download_once(url_div, f"stock_data/{ticker}_alphavantage_dividends.csv")
    raw_div = np.genfromtxt(csv_path_div, delimiter=",", names=True, dtype=None, encoding="utf-8")
    df_div = pd.DataFrame(raw_div)
    for col in ['declaration_date','ex_dividend_date','payment_date','payment_date','record_date']:
        df_div[col] = pd.to_datetime(df_div[col], errors='coerce')
    df_div = df_div.sort_values(by='ex_dividend_date', ascending=True)
    df_div = df_div.set_index(df_div['ex_dividend_date'])
    dividends_start_date = df_div.index.asof(simulation_start_date)
    div_amount = df_div['amount'].asof(simulation_start_date)
    div_yield = np.log(div_amount*0.01 + 1)  # type: ignore  (# make continuous)
    print(f"dividend yield = {div_yield}")


    ################################################################################
    #                           Calculate Monte Carlo Estimator                                    
    ################################################################################
    sns.set_theme()
    strike, maturity, num_simulations, time_steps = price_init, 30/252, 100, 30
    mean, SE, df_MC = calculate_MC_estimator(price_init, risk_free_rate, div_yield, volatility, strike, maturity, num_simulations, time_steps, ticker, type="Call")
    print(f"MC_estimator={mean}, SE={SE}")

    mean, SE, df_MC = calculate_MC_estimator(price_init, risk_free_rate, div_yield, volatility, strike, maturity, num_simulations, time_steps, ticker, type="Put")
    print(f"MC_estimator={mean}, SE={SE}")

    MC_untrained_grid = sns.relplot(data=df_MC, kind="line", x='Day', y='Price', hue='Iteration Number')
    MC_untrained_grid.figure.suptitle(f"Monte Carlo Simulation of {ticker}")
    print(np.mean(df_MC['Price']))






Saved to: treasury_data/3_month_treasury_constant_maturity_yield.csv
Saved to: treasury_data/2_year_treasury_constant_maturity_yield.csv
Saved to: treasury_data/10_year_treasury_constant_maturity_yield.csv
r=0.03575316970581784
AAPL
Saved to: stock_data/AAPL_alphavantage_daily.csv
S0 = 272.41
sigma (annualized) = 0.2029029991092106


KeyError: 'declaration_date'

Now we train a ML model to predict the volatility based on some parameters. But first, let's redefine our pandas dataframe into tensor for pytorch.

In [None]:
import torch
from torch import nn
from torch.utils.data import DataLoader
from torchvision import datasets
from torchvision.transforms import ToTensor



Step 2: MLP regression (maps Xt -> yt)

In [None]:
import torch
from torch import nn
from torch.utils.data import DataLoader
from torchvision import datasets
from torchvision.transforms import ToTensor


# input: L=60 past returns, 3 rolling vols, log price -> volatility 
# (input dimention=64 -> output dimension=1)



import torch
import torch.nn as nn

device = torch.accelerator.current_accelerator().type if torch.accelerator.is_available() else "cpu"
print(f"Using {device} device")

# define tensors X and y

class VolatilityMLP_NN(nn.Module):
    def __init__(self, input_dim):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(input_dim, 64),
            nn.ReLU(),
            nn.Linear(64, 64),
            nn.ReLU(),
            nn.Linear(64, 1)   # outputs vol
        )

    def forward(self, x):
        return self.net(x)


pred_log_sigma = model(X)
pred_sigma = torch.exp(pred_log_sigma)

y = torch.log(realized_vol)

X = torch.tensor(X_np, dtype=torch.float32)
y = torch.tensor(y_np, dtype=torch.float32).view(-1, 1)

X_train, X_val, X_test = ...
y_train, y_val, y_test = ...


# training
model = VolatilityMLP_NN(input_dim=X.shape[1])

criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

num_epochs = 100

for epoch in range(num_epochs):
    model.train()

    optimizer.zero_grad()

    pred = model(X_train)
    loss = criterion(pred, y_train)

    loss.backward()
    optimizer.step()

    if epoch % 10 == 0:
        model.eval()
        with torch.no_grad():
            val_loss = criterion(model(X_val), y_val)
        print(f"Epoch {epoch}: train={loss.item():.4f}, val={val_loss.item():.4f}")

# evaluate on test data
model.eval()
with torch.no_grad():
    test_pred = model(X_test)
    test_loss = criterion(test_pred, y_test)

print("Test MSE:", test_loss.item())






Step 3: LSTM (takes in a sequence. learns from patterns)

In [None]:
print("LSTM")

Step 4: Compare results

In [None]:
# import pandas as pd

# data = {
#     'Name': ['alice', 'bob', 'charlie'],
#     'Age': [25, 30, 35],
#     'City': ['New York', 'Los Angeles', 'Chicago']
# }

# # Create a DataFrame with default integer labels (0, 1, 2)
# df = pd.DataFrame(data)

# print(df)

# # Access the row with the default label 1
# # Note: 1 is treated as the *label*, not the integer position
# print("Accessing row with label 1 (Bob):")
# print(df.loc[1])

# # Set a different column as the index to create custom string labels
# df = df.set_index('Name')

# df.loc['bob', 'Age']


url_div = f'https://www.alphavantage.co/query?function=DIVIDENDS&symbol={ticker}&apikey={ALPHAVANTAGE_API_KEY}&datatype=csv'
csv_path_div = download_once(url_div, f"stock_data/{ticker}_alphavantage_dividends.csv")
raw_div = np.genfromtxt(csv_path_div, delimiter=",", names=True, dtype=None, encoding="utf-8")
df_div = pd.DataFrame(raw_div)
for col in ['declaration_date','ex_dividend_date','payment_date','payment_date','record_date']:
    df_div[col] = pd.to_datetime(df_div[col], errors='coerce')


df_div = df_div.sort_values(by='ex_dividend_date', ascending=True)
df_div = df_div.set_index(df_div['ex_dividend_date'])
dividends_start_date = df_div.index.asof(simulation_start_date)
amount = df_div['amount'].asof(simulation_start_date)




                 ex_dividend_date declaration_date record_date payment_date  \
ex_dividend_date                                                              
2012-08-09             2012-08-09              NaT         NaT          NaT   
2012-11-07             2012-11-07              NaT         NaT          NaT   
2013-02-07             2013-02-07              NaT         NaT          NaT   
2013-05-09             2013-05-09              NaT         NaT          NaT   
2013-08-08             2013-08-08              NaT         NaT          NaT   

                  amount  
ex_dividend_date          
2012-08-09          2.65  
2012-11-07          2.65  
2013-02-07          2.65  
2013-05-09          3.05  
2013-08-08          3.05  
2025-11-12 00:00:00
2025-11-10 00:00:00
0.26
