In [1]:
import yfinance as yf
import pandas as pd
from scipy.optimize import brentq
from scipy.stats import norm
from datetime import datetime, timedelta,date
import requests
import json
from polygon import RESTClient
from datetime import date as date_type, timedelta
import pandas as pd
import re
import numpy as np


## Historical Price of Underlying Asset

In [2]:
symbol = "AAPL"
r = 0.045
ticker = yf.Ticker(symbol)
history = ticker.history(start="2023-12-01")
df = pd.DataFrame(history)
df.reset_index(inplace=True)
df['Date'] = df['Date'].dt.date
df['Return'] = df['Close'].pct_change()
df['RV'] = df['Return'].rolling(window=20).std() * np.sqrt(252)
df.to_csv(f"DataSet/underlying.csv", index=False)


## Trade every first Tuesday, get the list of trade dates

In [3]:
start_date = date(2024, 1, 1)
end_date = date(2025, 11, 30)

trade_dates = []
trade_dates_str = []
current = date(start_date.year, start_date.month, 1)

while current <= end_date:
    days_to_tuesday = (1 - current.weekday()) % 7
    first_tuesday = current.replace(day=1) + timedelta(days=days_to_tuesday)
    if start_date <= first_tuesday <= end_date:
        trade_dates.append(first_tuesday)
        trade_dates_str.append(first_tuesday.strftime("%Y-%m-%d"))
    if current.month == 12:
        current = date(current.year + 1, 1, 1)
    else:
        current = date(current.year, current.month + 1, 1)

# for d, ds in zip(trade_dates, trade_dates_str):
#     print(repr(d), ds)

## Get the underlying Price at each trading dates

In [4]:
s0_list = []
underlying_df = pd.read_csv("DataSet/underlying.csv", parse_dates=['Date'])
underlying_df.set_index('Date', inplace=True)

for date_str in trade_dates_str:
    date_pd = pd.to_datetime(date_str)
    close_price = underlying_df.loc[date_pd]['Close']
    s0_list.append(close_price)
# print(s0_list)

## Get option date. Always buy Option expired in the second friday of next month

In [5]:
def atm_option(symbol, s0, date):
    api_key = "XN1r3nHQ1Rb3SsbqdwDI72dE35vJkCzP"
    client = RESTClient(api_key)

    # date is guaranteed to be a datetime.date
    date_obj = date

    date_str = date_obj.strftime("%Y-%m-%d")
    # print(f"Using spot price s0: {s0} on date: {date_str}")

    # Calculate second Friday of next month
    if date_obj.month == 12:
        next_month_first = date_obj.replace(year=date_obj.year + 1, month=1, day=1)
    else:
        next_month_first = date_obj.replace(month=date_obj.month + 1, day=1)
    
    # weekday(): 0=Mon, 1=Tue, ..., 4=Fri
    days_to_friday = (4 - next_month_first.weekday()) % 7
    first_friday = next_month_first + timedelta(days=days_to_friday)
    second_friday = first_friday + timedelta(weeks=1)
    exp_date_str = second_friday.strftime("%Y-%m-%d")
    # print(f"Target expiration (2nd Friday next month): {exp_date_str}")

    # Get options contracts as of the given date (limit=1000 for efficiency, even with unlimited calls)
    contracts = list(client.list_options_contracts(underlying_ticker=symbol, as_of=date_str, limit=1000))
    if not contracts:
        print(f"No options contracts found as of {date_str}.")
        return None, None

    # Filter calls and puts for exact expiration
    calls = [c for c in contracts if c.expiration_date == exp_date_str and c.contract_type == 'call']
    puts = [c for c in contracts if c.expiration_date == exp_date_str and c.contract_type == 'put']

    if not calls or not puts:
        print(f"No calls or puts found for expiration {exp_date_str} as of {date_str}.")
        return None, None

    # Find ATM call and put (closest strike to s0)
    atm_call = min(calls, key=lambda c: abs(c.strike_price - s0))
    atm_put = min(puts, key=lambda c: abs(c.strike_price - s0))

    # print(f"ATM Call: {atm_call.ticker} (strike: {atm_call.strike_price})")
    # print(f"ATM Put: {atm_put.ticker} (strike: {atm_put.strike_price})")
    if (atm_call.strike_price != atm_put.strike_price):
        print("Warning, Strike price of call and put are not the same")
    # else:
        # print(abs(atm_call.strike_price - s0))

    return atm_call.ticker, atm_put.ticker

## Download the data, and calculate the implied volitality

In [6]:
# Black-Scholes formula for calls and puts
def bs_call_price(S, K, T, r, sigma):
    d1 = (np.log(S/K) + (r + 0.5*sigma**2)*T) / (sigma * np.sqrt(T))
    d2 = d1 - sigma * np.sqrt(T)
    return S * norm.cdf(d1) - K * np.exp(-r*T) * norm.cdf(d2)

def bs_put_price(S, K, T, r, sigma):
    d1 = (np.log(S/K) + (r + 0.5*sigma**2)*T) / (sigma * np.sqrt(T))
    d2 = d1 - sigma * np.sqrt(T)
    return K * np.exp(-r*T) * norm.cdf(-d2) - S * norm.cdf(-d1)

# Implied volatility calculation using Brent's method
def implied_vol_call(market_price, S, K, T, r):
    def obj_func(sigma):
        return bs_call_price(S, K, T, r, sigma) - market_price
    try:
        return brentq(obj_func, 1e-6, 5)
    except ValueError:
        return np.nan

def implied_vol_put(market_price, S, K, T, r):
    def obj_func(sigma):
        return bs_put_price(S, K, T, r, sigma) - market_price
    try:
        return brentq(obj_func, 1e-6, 5)
    except ValueError:
        return np.nan

def download_hist_option(symbol, start_date, r, s0):
    """
    Downloads historical daily data for the given option ticker starting from the given date to today.
    
    Args:
        symbol (str): Option ticker, e.g., 'O:AAPL240209C00185000'
        start_date (date): Start date for historical data.
        r (float): Risk-free rate.
        s0 (float): Underlying spot price (fixed for all historical dates; for accuracy, consider passing historical prices).
    
    Returns:
        pd.DataFrame: Historical data with columns: timestamp, open, high, low, close, volume, ttm, imp_vol
    """
    api_key = "XN1r3nHQ1Rb3SsbqdwDI72dE35vJkCzP"
    client = RESTClient(api_key)
    
    # Define date range: start from given date to today
    from_str = start_date.strftime("%Y-%m-%d")
    to_str = date.today().strftime("%Y-%m-%d")
    
    # Fetch daily aggregates for the option
    aggs = client.get_aggs(
        ticker=symbol,
        multiplier=1,
        timespan="day",
        from_=from_str,
        to=to_str,
        limit=50000  # High limit to cover all days
    )
    
    if not aggs:
        print(f"No historical data found for {symbol} from {from_str}.")
        return pd.DataFrame()
    
    # Convert to DataFrame
    df = pd.DataFrame([
        {
            "timestamp": pd.to_datetime(agg.timestamp, unit="ms").date(),
            "open": agg.open,
            "high": agg.high,
            "low": agg.low,
            "close": agg.close,
            "volume": agg.volume
        }
        for agg in aggs
    ])
    
    # Sort by date
    df = df.sort_values("timestamp").reset_index(drop=True)
    
    # Extract expiry date, option type, and strike from symbol
    symbol_part = symbol.split(':')[1]
    match = re.search(r'^[A-Z]+(\d{6})([CP])(\d{8})$', symbol_part)
    if not match:
        raise ValueError(f"Could not parse option symbol: {symbol}. Expected format like 'AAPL240209C00185000'.")
    
    expiry_str = match.group(1)
    option_type = match.group(2)
    strike_str = match.group(3)
    
    # Normalize expiry to 8 digits
    if len(expiry_str) == 6:
        expiry_str = '20' + expiry_str  # Assumes 21st century
    
    expiry = pd.to_datetime(expiry_str, format='%Y%m%d').date()
    
    # Strike price
    K = int(strike_str) / 1000.0
    
    # Calculate time to maturity (ttm) in years for each row
    df['ttm'] = df['timestamp'].apply(lambda x: max((expiry - x).days / 365.25, 1e-9))
    df['r'] = r
    df['k'] = K
    
    # Calculate implied volatility using row-wise apply
    if option_type == 'C':
        df['imp_vol'] = df.apply(lambda row: implied_vol_call(row['close'], s0, K, row['ttm'], r), axis=1)
    else:  # 'P'
        df['imp_vol'] = df.apply(lambda row: implied_vol_put(row['close'], s0, K, row['ttm'], r), axis=1)
    
    # print(f"Fetched {len(df)} rows for {symbol} from {from_str} to {to_str}.")
    
    csv_name = f"DataSet/{symbol.replace(':', '_')}.csv"
    df.to_csv(csv_name, index=False)
    
    return df

## Build All Data

In [7]:
call_list = []
put_list = []
# last_trade_date = trade_dates[0]
for idx in range(len(s0_list)):
    date = trade_dates[idx]
    date_str = trade_dates_str[idx]
    s0 = s0_list[idx]
    atm_call, atm_put = atm_option(symbol, s0, date)
    call_list.append(atm_call.replace(':', '_'))
    put_list.append(atm_put.replace(':', '_'))
    download_hist_option(atm_call,date,r,s0)
    download_hist_option(atm_put,date,r,s0)
    # last_trade_date = date

## Store Data

In [8]:
import pickle
with open('DataSet/call_list.pkl', 'wb') as f:
    pickle.dump(call_list, f)
with open('DataSet/put_list.pkl', 'wb') as f:
    pickle.dump(put_list, f)
with open('DataSet/dates.pkl', 'wb') as f:
    pickle.dump(trade_dates, f)
with open('DataSet/date_strs.pkl', 'wb') as f:
    pickle.dump(trade_dates_str, f)