In [1]:
import os
import json
import requests

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import plotly.graph_objects as go

from alpaca_trade_api.rest import REST, TimeFrame
from datetime import datetime

import dask
import dask.bag as db
from dask.diagnostics import ProgressBar

import statsmodels.api as sm 
from typing import Optional , Tuple

import ast

In [2]:
key = json.loads(open("key.txt","r").read())
api_key_id = key["APCA-API-KEY-ID"]
api_secret_key = key["APCA-API-SECRET-KEY"]

headers = {
    "accept": "application/json",
    "APCA-API-KEY-ID": api_key_id,
    "APCA-API-SECRET-KEY": api_secret_key
}

api = REST(api_key_id, api_secret_key, base_url='https://data.alpaca.markets/v2')

In [3]:
# Get NYSE Open days from 2024-06-01 to 2024-08-31

url = "https://paper-api.alpaca.markets/v2/calendar?start=2024-06-01T00%3A00%3A00Z&end=2024-08-31T23%3A59%3A59Z"
response = requests.get(url, headers=headers)
df_market_hours = pd.DataFrame(response.json())

market_open_days = df_market_hours.date

print(market_open_days.head())

0    2024-06-03
1    2024-06-04
2    2024-06-05
3    2024-06-06
4    2024-06-07
Name: date, dtype: object


In [4]:
## Get trading day and the corresponding parameter formation days

lookback = 2
formation_and_trading_days = []

for i in range(lookback, len(market_open_days)):
    trade_day = datetime.strptime(market_open_days[i], '%Y-%m-%d').strftime('%Y%m%d')
    formation_days = []
    for j in range(lookback, 0 , -1):
        formation_days.append(datetime.strptime(market_open_days[(i - j)], '%Y-%m-%d').strftime('%Y%m%d'))
    formation_and_trading_days.append({"Trading Day":trade_day , "Formation Days":formation_days})

print(formation_and_trading_days[0 : 2])

[{'Trading Day': '20240605', 'Formation Days': ['20240603', '20240604']}, {'Trading Day': '20240606', 'Formation Days': ['20240604', '20240605']}]


In [5]:
# Find unique pairs among the symbols

from itertools import combinations 
#symbol_list = ["SMH", "SOXX","NVDA", "TSM", "AVGO", "TXN", "AMD", "ASML", "ADI", "AMAT", "QCOM", "KLAC","INTC"] ## drop TXN, ADI, KLAC, AVGO
symbol_list = ["SMH", "SOXX","NVDA", "TSM", "AMD", "ASML", "AMAT", "QCOM","INTC"]
pairs = list(combinations(symbol_list, 2))

In [6]:
def load_quote_data(symbol: str, date: str) -> pd.DataFrame:
    csv_name = f"{symbol}_{date}_quote.csv"
    quote_data = pd.read_csv(f"data/{csv_name}", sep=",")
    
    # Convert timestamp to datetime, handling potential errors
    try:
        quote_data['timestamp'] = pd.to_datetime(quote_data['timestamp']) 
    except ValueError as e:
        print(f"Error parsing timestamps for {symbol}: {e}")
        # Optionally use coerce to handle this
        quote_data['timestamp'] = pd.to_datetime(quote_data['timestamp'], errors='coerce')
    
    quote_data.set_index('timestamp', inplace=True)
    return quote_data

def calculate_midprice_and_downsample(quote_data : pd.DataFrame , downsample : int) -> pd.Series:
    quote_data["mid_price"] = np.where(
        (quote_data["ask_price"] > 0) & (quote_data["bid_price"] > 0),
        (quote_data["ask_price"] + quote_data["bid_price"]) * 0.5,
        np.nan
    )
    downsampled_mid_price = quote_data["mid_price"].resample(f'{downsample}S').last()
    downsampled_mid_price = downsampled_mid_price.fillna(method='ffill')
    #print(type(downsampled_mid_price))
    return downsampled_mid_price

In [7]:
def calculate_half_life(spread : np.ndarray) -> float:
    # Ensure spread is a Pandas Series for easy manipulation
    df_spread = pd.DataFrame(spread, columns=["spread"])
    
    # Calculate lagged spread and spread returns
    spread_lag = df_spread.spread.shift(1).fillna(method="bfill")  # Backfill the first NaN
    spread_ret = df_spread.spread - spread_lag

    # Add a constant to the lagged spread
    spread_lag2 = sm.add_constant(spread_lag)
    
    # Fit the OLS model
    model = sm.OLS(spread_ret, spread_lag2)
    res = model.fit()

    # Calculate the half-life
    halflife = -np.log(2) / res.params[1]
    
    return round(halflife, 0)

def cointegration_check(price_data: pd.DataFrame, asset1: str, asset2: str) -> Tuple[bool, Optional[dict]]:
    #print("--------------------")
    #print(f"Check cointegration of {asset1} and {asset2}")

    # Step 1: Perform OLS regression
    model1 = sm.OLS(price_data[asset2], sm.add_constant(price_data[asset1])).fit()
    res = model1.resid  # Note the change to model1.resid

    # Calculate the mean and std of the residual
    res_mean = np.mean(res)
    res_std = np.std(res)
    rsquared_adj = model1.rsquared_adj
    const = model1.params[0]
    hedge_ratio = model1.params[1]
    half_life = calculate_half_life(np.array(res))
    #hurst_exponent = hurst(res)

    #print(model1.summary())  # Also corrected from 'model' to 'model1'

    # Step 2: ADF test on residuals
    adf_test_result = sm.tsa.stattools.adfuller(res)
    p_value_adf_test = adf_test_result[1]

    if p_value_adf_test > 0.05:
        #print(f"{asset1} and {asset2} are not cointegrated (ADF p-value: {p_value_adf_test})")
        return False, None

    #print("Estimate Error Correction Model (ECM)")
    
    # Step 3: Error Correction Model (ECM)
    # Include lagged residuals and the difference of asset1 as regressors
    asset1_diff = sm.add_constant(pd.concat([price_data[asset1].diff(), res.shift(1)], axis=1).dropna())
    asset2_diff = price_data[asset2].diff().dropna()
    
    # Aligning asset1_diff and asset2_diff to ensure they have the same length
    #asset1_diff = asset1_diff.dropna()
    #asset2_diff = asset2_diff.dropna()
    asset1_diff = asset1_diff.loc[asset2_diff.index]
    
    model2 = sm.OLS(asset2_diff, asset1_diff).fit()

    #print(model2.summary())

    # Step 4: Check if the adjustment coefficient is negative
    adjustment_coefficient = list(model2.params)[-1]

    coint_result = {"asset 1": asset1,
                    "asset 2": asset2,
                    "res mean":res_mean ,
                    "res std":res_std,
                    "adj rsquared": rsquared_adj,
                    "constant":const,
                    "hedge ratio": hedge_ratio,
                    "half life": half_life,
                    #"hurst exponent" : hurst_exponent,
                    "p-value of adf test" : p_value_adf_test,
                    "adjustment coef":adjustment_coefficient}
    
    if adjustment_coefficient > 0:
        #print(f"{asset1} and {asset2} are not cointegrated (Adjustment coefficient: {adjustment_coefficient})")
        return False, None
    else:
        #print(f"{asset1} and {asset2} are cointegrated")
        return True, coint_result

def cointegration_check_weighted(price_data: pd.DataFrame, asset1: str, asset2: str) -> Tuple[bool, Optional[dict]]:
    #print("--------------------")
    #print(f"Check cointegration of {asset1} and {asset2}")
    
    res_mean = 0 
    res_std = 0
    rsquared_adj = 0
    const = 0
    hedge_ratio = 0 
    half_life = 0
    #hurst_exponent = 0
    p_value_adf_test = 0
    adjustment_coefficient = 0
    length = len(price_data)

    
    price_data["date"] = price_data.index.date
    unique_date = price_data["date"].unique()

    for date in unique_date:

        price_data_by_date = price_data[price_data.date == date]
        weight = len(price_data_by_date) / length
    
        # Step 1: Perform OLS regression
        model1 = sm.OLS(price_data_by_date[asset2], sm.add_constant(price_data_by_date[asset1])).fit()
        res_by_date = model1.resid  # Note the change to model1.resid

        # Calculate the mean and std of the residual
        res_mean_by_date = np.mean(res_by_date)
        res_std_by_date= np.std(res_by_date)
        rsquared_adj_by_date = model1.rsquared_adj
        const_by_date = model1.params[0]
        hedge_ratio_by_date = model1.params[1]
        half_life_by_date = calculate_half_life(np.array(res_by_date))   
        #hurst_exponent_by_date = hurst(res_by_date)

        # Step 2: ADF test on residuals
        adf_test_result_by_date = sm.tsa.stattools.adfuller(res_by_date)
        p_value_adf_test_by_date = adf_test_result_by_date[1]   

        # Step 3: Error Correction Model (ECM)
        # Include lagged residuals and the difference of asset1 as regressors
        asset1_diff = sm.add_constant(pd.concat([price_data_by_date[asset1].diff(), res_by_date.shift(1)], axis=1).dropna())
        asset2_diff = price_data_by_date[asset2].diff().dropna()

        # Aligning asset1_diff and asset2_diff to ensure they have the same length
        #asset1_diff = asset1_diff.dropna()
        #asset2_diff = asset2_diff.dropna()
        asset1_diff = asset1_diff.loc[asset2_diff.index]

        model2 = sm.OLS(asset2_diff, asset1_diff).fit()

        # Step 4: Check if the adjustment coefficient is negative
        adjustment_coefficient_by_date = list(model2.params)[-1]

        res_mean += res_mean_by_date * weight 
        res_std += res_std_by_date * weight 
        rsquared_adj += rsquared_adj_by_date * weight
        const += const_by_date * weight
        hedge_ratio += hedge_ratio_by_date * weight
        half_life += half_life_by_date * weight
        #hurst_exponent += hurst_exponent_by_date * weight
        #p_value_adf_test += p_value_adf_test_by_date * weight
        p_value_adf_test = max(p_value_adf_test_by_date, p_value_adf_test)
        adjustment_coefficient += adjustment_coefficient_by_date * weight


    coint_result = {"asset 1": asset1,
                    "asset 2": asset2,
                    "res mean":res_mean ,
                    "res std":res_std,
                    "adj rsquared": rsquared_adj,
                    "constant" : const,
                    "hedge ratio": hedge_ratio,
                    "half life": half_life,
                    #"hurst exponent": hurst_exponent,
                    "p-value of adf test" : p_value_adf_test,
                    "adjustment coef":adjustment_coefficient}
    
    if (p_value_adf_test < 0.05 ) and (adjustment_coefficient < 0):
        return True, coint_result
    else:
        return False, None

In [10]:
## use find co-integrated pairs and the corresponding parameters in historical data
## use dask for parallel computation to accelerate the calculation
from dask.distributed import Client
client = Client(n_workers=20)  # Adjust based on your CPU cores

def parallel_process(func, data, **kwargs):
    with ProgressBar():
        results = db.from_sequence(data).map(lambda x: func(x, **kwargs)).compute()
    return results

def process_day(day : dict, symbol_list : list, pairs : list , downsample : int, weighted : bool = True) -> dict:

    try:
        trading_day = day["Trading Day"]   
        df_mid_price = pd.DataFrame({})

        for formation_day in day["Formation Days"]: 
            midprice_by_formation_day = pd.DataFrame()
            for symbol in symbol_list:
                quote_data_by_symbol = load_quote_data(symbol= symbol, date=formation_day)
                downsampled_mid_price_by_symbol = calculate_midprice_and_downsample(quote_data = quote_data_by_symbol, downsample = downsample)
                midprice_by_formation_day[symbol] = downsampled_mid_price_by_symbol

            df_mid_price = pd.concat([df_mid_price, midprice_by_formation_day])
            df_mid_price = df_mid_price.fillna(method='ffill')
            df_mid_price = df_mid_price.fillna(method='bfill')

        coint_pair_and_params_by_day = []

        for pair in pairs:
            if weighted:
                coint_result = cointegration_check_weighted(df_mid_price, pair[0], pair[1]) 
            else:
                coint_result = cointegration_check(df_mid_price, pair[0], pair[1])   
            if coint_result[0]: 
                coint_pair_and_params_by_day.append(coint_result[1])

        return {"Trading Day": trading_day, "Coint Pairs and Params": coint_pair_and_params_by_day}
    
    except (IndexError, ValueError) as e:
        print(f"Error processing day {trading_day}: {e}")
        return {"Trading Day": trading_day, "Coint Pairs and Params": []}

if __name__ == "__main__":
    # Pass the necessary arguments to the parallel process function
    results = parallel_process(
        process_day, 
        formation_and_trading_days, 
        symbol_list=symbol_list, 
        pairs=pairs, 
        downsample=60, ## intercal for resampling the mid-price. Use 5s, 10s, 20s, 30s...
        weighted=True
    )

    # You can now aggregate results or process them further
    for result in results:
        print(result)

Perhaps you already have a cluster running?
Hosting the HTTP server on port 52853 instead
  next(self.gen)


{'Trading Day': '20240605', 'Coint Pairs and Params': [{'asset 1': 'SMH', 'asset 2': 'TSM', 'res mean': -1.0629719326971098e-12, 'res std': 0.3342293809181469, 'adj rsquared': 0.6743598760271154, 'constant': 17.52860457580286, 'hedge ratio': 0.5610625446460012, 'half life': 16.0, 'p-value of adf test': 0.014969716427744025, 'adjustment coef': -0.043945058583503435}, {'asset 1': 'SOXX', 'asset 2': 'TSM', 'res mean': -4.2083442144175e-12, 'res std': 0.3748594884310825, 'adj rsquared': 0.5907157241660228, 'constant': 38.73732770069715, 'hedge ratio': 0.49161936398860906, 'half life': 17.0, 'p-value of adf test': 0.012354724209240242, 'adjustment coef': -0.03806959696268971}]}
{'Trading Day': '20240606', 'Coint Pairs and Params': [{'asset 1': 'SMH', 'asset 2': 'SOXX', 'res mean': 1.0359713087382262e-12, 'res std': 0.13090212162445275, 'adj rsquared': 0.9807263069558536, 'constant': 17.769629893260444, 'hedge ratio': 0.8875352821629328, 'half life': 5.0, 'p-value of adf test': 0.00839735138

In [11]:
df_results = pd.DataFrame(results)
df_results.to_csv("parameters/cointPair_params_formation2_ds60_weighted.csv", sep = ";", index= False)
pd.read_csv("parameters/cointPair_params_formation2_ds60_weighted.csv", sep = ";")

Unnamed: 0,Trading Day,Coint Pairs and Params
0,20240605,"[{'asset 1': 'SMH', 'asset 2': 'TSM', 'res mea..."
1,20240606,"[{'asset 1': 'SMH', 'asset 2': 'SOXX', 'res me..."
2,20240607,"[{'asset 1': 'SMH', 'asset 2': 'SOXX', 'res me..."
3,20240610,[]
4,20240611,[]
...,...,...
56,20240826,"[{'asset 1': 'AMD', 'asset 2': 'QCOM', 'res me..."
57,20240827,"[{'asset 1': 'SMH', 'asset 2': 'TSM', 'res mea..."
58,20240828,"[{'asset 1': 'SOXX', 'asset 2': 'AMD', 'res me..."
59,20240829,"[{'asset 1': 'SMH', 'asset 2': 'SOXX', 'res me..."
