In [None]:
! pip install dill
! pip install statsmodels

import pandas as pd
import numpy as np
from datetime import datetime, timedelta


from sklearn.cluster import KMeans, DBSCAN
from sklearn.decomposition import PCA
from sklearn.manifold import TSNE
from sklearn import preprocessing

from statsmodels.tsa.stattools import coint

from scipy import stats



In [None]:
!pip install hurst
import statsmodels.api as sm
from statsmodels.tsa.stattools import adfuller as ADF_test
import matplotlib.pyplot as plt



In [None]:
import lzma
import dill as pickle

def save_pickle(path,obj):
    with lzma.open(path,"wb") as fp:
        pickle.dump(obj,fp)

def load_pickle(path):
    with lzma.open(path,"rb") as fp:
        file = pickle.load(fp)
    return file


def clean_data(ticker_dfs,tickers):
    intraday_range = ticker_dfs[tickers[0]].index
    for inst in tickers:
        ticker_dfs[inst] = ticker_dfs[inst].reindex(intraday_range)
    closes = []

    for tk in tickers:
        close = ticker_dfs[tk].close
        closes.append(close)

    pricing = pd.concat(closes,axis = 1)
    pricing.columns = tickers

    return pricing




def get_pca_features(ret_df,N_PRIN_COMPONENTS =10):

    pca = PCA(n_components=N_PRIN_COMPONENTS)
    pca.fit(ret_df)

    # Extract factor loadings
    factor_loadings = pca.components_.T  # Transpose the components matrix

    # Create a DataFrame with the correct orientation
    factor_loadings_df = pd.DataFrame(factor_loadings, index=ret_df.columns, columns=[f'Factor {i+1}' for i in range(N_PRIN_COMPONENTS)])

    X = preprocessing.StandardScaler().fit_transform(pca.components_.T)

    return X

def create_clusters(X,index):
    clf = DBSCAN(eps=1, min_samples=3)

    print(clf)

    clf.fit(X)
    labels = clf.labels_
    n_clusters_ = len(set(labels)) - (1 if -1 in labels else 0)
    print("\nClusters discovered: %d" % n_clusters_)

    clustered = clf.labels_

    clustered_series = pd.Series(index=index, data=clustered.flatten())
    clustered_series = clustered_series[clustered_series != -1]

    return clustered_series


def find_cointegrated_pairs(data, significance=0.05):
    # This function is from https://www.quantopian.com/lectures/introduction-to-pairs-trading
    n = data.shape[1]
    score_matrix = np.zeros((n, n))
    pvalue_matrix = np.ones((n, n))
    keys = data.keys()
    pairs = []
    for i in range(n):
        for j in range(i+1, n):
            S1 = data[keys[i]]
            S2 = data[keys[j]]
            result = coint(S1, S2)
            score = result[0]
            pvalue = result[1]
            score_matrix[i, j] = score
            pvalue_matrix[i, j] = pvalue
            if pvalue < significance:
                pairs.append((keys[i], keys[j]))
    return score_matrix, pvalue_matrix, pairs



def get_coint_pairs(prices,clustered_series):

    valid_tickers = clustered_series.index.intersection(prices.columns)
    clustered_series = clustered_series.loc[valid_tickers]

    CLUSTER_SIZE_LIMIT = 9999
    counts = clustered_series.value_counts()
    ticker_count_reduced = counts[(counts>1) & (counts<=CLUSTER_SIZE_LIMIT)]

    cluster_dict = {}
    for i, which_clust in enumerate(ticker_count_reduced.index):
        tickers = clustered_series[clustered_series == which_clust].index
        score_matrix, pvalue_matrix, pairs = find_cointegrated_pairs(
            prices[tickers]
        )
        cluster_dict[which_clust] = {}
        cluster_dict[which_clust]['score_matrix'] = score_matrix
        cluster_dict[which_clust]['pvalue_matrix'] = pvalue_matrix
        cluster_dict[which_clust]['pairs'] = pairs

    pairs = []
    for clust in cluster_dict.keys():
        pairs.extend(cluster_dict[clust]['pairs'])

    return pairs


def get_data():
    data_path = "/content/drive/My Drive/constituents.csv"
    dfs_path = "/content/drive/My Drive/new_dfs.obj"

    ticker_dfs = load_pickle(dfs_path)
    snp_data = pd.read_csv(data_path)
    tickers = []

    for i in range(499):
      tickers.append(snp_data.Symbol[i])

    tickers.remove("BF.B")
    tickers.remove("BRK.B")
    tickers.remove("CPAY")
    tickers.remove("DAY")
    tickers.remove("GEV")
    tickers.remove("SOLV")

    return tickers,ticker_dfs

In [None]:
import statsmodels.api as sm
from hurst import compute_Hc  # Assuming you have a function to compute the Hurst exponent

def process_pair_and_store(pair, prices, h5_file_path):
    s1, s2 = pair

    datax = prices[s1]
    datay = prices[s2]

    # Find the hedge ratio using OLS on the first 500 data points
    hedge_ratio = sm.OLS(datax[:500], sm.add_constant(datay[:500])).fit().params[1]

    # Calculate the normalized spread based on the hedge ratio
    spread = datax - hedge_ratio * datay
    spread_normalized = (spread - spread.mean()) / spread.std()

    # Calculate features for the first 500 points
    rolling_volatility = spread_normalized[:500].rolling(window=10).std()
    hurst_exponent = compute_Hc(spread_normalized[:500])[0]  # Hurst exponent
    spread_normalized = spread_normalized[:500]  # Normalized spread

    features_df = pd.DataFrame({
        'spread_normalized': spread_normalized,
        'rolling_volatility': rolling_volatility[:500],
        'hurst_exponent': np.repeat(hurst_exponent, rolling_volatility.shape[0]),
        'hedge_ratio': np.repeat(hedge_ratio, rolling_volatility.shape[0])
    })

    # Perform ADF test on the first 500 points for stationarity
    adf_result_train = ADF_test(spread_normalized)
    print(pair, adf_result_train[1])
    print(features_df)
    print(features_df.shape)

    features_df.dropna(inplace = True)
    features_df = features_df.reset_index()

    print(features_df)
    print(features_df.shape)


    # Proceed only if the first 500 points spread is stationary
    if adf_result_train[1] < 0.05:
        stationarity_train = adf_result_train[1] < 0.05

        # The stationarity status of the next 124 points and the hedge ratio for those points are used as labels for training the models
        stationarity_test = ADF_test(spread[500:])[1] < 0.05
        hedge_ratio_test = hedge_ratio

        # Store the features and labels in HDF5 file for LSTM training
        # store_to_hdf5(h5_file_path, pair, features_df, stationarity_train, stationarity_test, hedge_ratio_test)
        input(spread_normalized)

        spread_normalized = spread_normalized.reset_index(drop=True)
        input(spread_normalized)
        plt.figure(figsize=(15, 7))
        plt.title(f'Z-Scores for {s1} and {s2} Across Training and Testing Periods')
        spread_normalized.plot()
        plt.show()

In [None]:
from google.colab import drive
drive.mount('/content/drive')

tickers,ticker_dfs = get_data()

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
def generate_trading_data_blocks(df, month,year,days_per_block=9, step_days=5):
    """
    Generate rolling blocks of data consisting of 9 trading days,
    advancing the window by 5 days after each block from the original dataframe.
    """

    if month == 12:
      month = 1
      year +=1

    else:
      month+=1

    month_data =df[(df.index.year == year) & (df.index.month == month)]

    # Extract unique trading days from the index
    unique_days = month_data.index.normalize().unique()

    for start_day_idx in range(0, len(unique_days) - days_per_block + 1, step_days):
        # Identify start and end day for the block
        start_day = unique_days[start_day_idx]
        end_day_idx = start_day_idx + days_per_block - 1  # inclusive end day index
        if end_day_idx >= len(unique_days):  # Ensure we don't go out of bounds
            break
        end_day = unique_days[end_day_idx]

        # Slice the original dataframe to get the block data
        block_data = month_data[start_day:end_day]
        print(f"yeilding block data for {month} {year}")
        yield block_data


def calculate_monthly_clusters(monthly_data):
    """
    Calculate clusters for the given month.

    """
    ret_df = monthly_data.pct_change().round(4).fillna(0)
    X = get_pca_features(ret_df)
    clustered_series = create_clusters(X, ret_df.columns)

    return clustered_series


def yield_monthly_data(df):
    """
    Yields each month's data from the dataframe.
    """
    for year in df.index.year.unique():
        for month in df[df.index.year == year].index.month.unique():
            monthly_data = df[(df.index.year == year) & (df.index.month == month)]
            yield year, month, monthly_data

In [None]:

import statsmodels.api as sm
from hurst import compute_Hc  # Assuming you have a function to compute the Hurst exponent
from statsmodels.tools.tools import add_constant
from scipy.stats import skew, kurtosis

def add_stationary(spread_normalized):

    rolling_volatility = spread_normalized[:500].rolling(window=10).std()
    hurst_exponent = compute_Hc(spread_normalized[:500])[0]  # Hurst exponent

    skewness = skew(spread_normalized[:500])
    kurtosis_value = kurtosis(spread_normalized[:500])


    # Ensure spread_lagged starts from the second element to align with spread_diff which starts from the second element due to differencing and shifting
    spread_lagged = spread_normalized.shift(1).dropna()
    spread_diff = spread_normalized.diff().dropna()  # First difference to get spread_diff

    # Make sure indices are aligned by reindexing spread_diff to match spread_lagged
    spread_diff = spread_diff.reindex(spread_lagged.index)
    spread_diff += 1e-8

    # Now perform your regression with aligned indices
    model = sm.OLS(np.log(np.abs(spread_diff)), add_constant(spread_lagged))
    result = model.fit()
    phi = result.params[0]


    spread_normalized = spread_normalized[:500]  # Normalized spread

    features_df = pd.DataFrame({
        'spread_normalized': spread_normalized,
        'rolling_volatility': rolling_volatility[:500],
        'hurst_exponent': np.repeat(hurst_exponent, rolling_volatility.shape[0]),
    })

    # Mean Reversion Speed calculated as the absolute value of phi
    mean_reversion_speed = np.abs(phi)

    # Calculate half-life from the decay factor
    half_life = -np.log(2) / np.log(np.abs(phi))


    features_df['half_life'] = np.repeat(half_life, len(features_df))
    features_df['mean_reversion_speed'] = np.repeat(mean_reversion_speed, len(features_df))
    features_df['skewness'] = np.repeat(skewness, len(features_df))
    features_df['kurtosis'] = np.repeat(kurtosis_value, len(features_df))

    return features_df



def process_pair(pair, prices):
    s1, s2 = pair

    datax = prices[s1]
    datay = prices[s2]

    # Find the hedge ratio using OLS on the first 500 data points
    hedge_ratio = sm.OLS(datax[:500], sm.add_constant(datay[:500])).fit().params[1]

    # Calculate the normalized spread based on the hedge ratio
    spread = datax - hedge_ratio * datay
    spread_normalized = (spread - spread.mean()) / spread.std()

    features_df = add_stationary(spread_normalized)
    features_df['hedge_ratio'] = np.repeat(hedge_ratio, len(features_df))

    # Perform ADF test on the first 500 points for stationarity
    adf_result_train = ADF_test(spread_normalized)
    print(pair, adf_result_train[1], )

    features_df.dropna(inplace = True)
    # print(features_df)



    # Proceed only if the first 500 points spread is stationary
    if adf_result_train[1] < 0.05:
        stationarity_train = adf_result_train[1] < 0.05

        # The stationarity status of the next 124 points and the hedge ratio for those points are used as labels for training the models
        stationarity_test = ADF_test(spread[500:])[1] < 0.05
        hedge_ratio_test = hedge_ratio

        # spread_normalized = spread_normalized.reset_index(drop=True)
        # plt.figure(figsize=(15, 7))
        # plt.title(f'Z-Scores for {s1} and {s2} Across Training and Testing Periods')
        # spread_normalized.plot()
        # plt.show()

        print(f"hedge_ratio_test {hedge_ratio_test}")
        print(f"stationarity_test {stationarity_test}")

        return features_df, stationarity_test, hedge_ratio_test



In [None]:
def initialize_hdf5_structure(h5_file_path):
    with pd.HDFStore(h5_file_path, mode='a') as store:
        # Create an empty DataFrame with expected structure for 'features' if it doesn't exist
        if 'features' not in store:
            store.put('features', pd.DataFrame(), format='table', data_columns=True)
        # Create empty series for 'stationarity_labels' and 'hedge_ratios' if they don't exist
        if 'stationarity_labels' not in store:
            store.put('stationarity_labels', pd.Series(dtype='int'), format='table')
        if 'hedge_ratios' not in store:
            store.put('hedge_ratios', pd.Series(dtype='float'), format='table')


import pandas as pd
import numpy as np
import os

def store_data(features_list, stationarity_labels, hedge_ratios, hdf5_file_path, current_batch_index=None):
    if not features_list:
        return  # Early exit if no features to store

    with pd.HDFStore(hdf5_file_path, mode='a') as store:
        # Determine the current batch index if not provided
        if current_batch_index is None:
            if '/features' in store.keys():
                # Compute the next batch index based on existing data
                current_batch_index = store.get_storer('features').nrows
            else:
                current_batch_index = 0

        # Create a MultiIndex DataFrame from the list
        keys = list(range(current_batch_index, current_batch_index + len(features_list)))
        features_df = pd.concat(features_list, keys=keys, names=['batch', 'row'])

        # Append the features DataFrame to the store
        if '/features' in store.keys():
            store.append('features', features_df, format='table', data_columns=True)
        else:
            store.put('features', features_df, format='table', data_columns=True)

        # Prepare and store stationarity labels
        if stationarity_labels:
            # Create a Series with a simple range index, assuming one label per batch
            labels_series = pd.Series(stationarity_labels, index=keys)
            if '/stationarity_labels' in store.keys():
                store.append('stationarity_labels', labels_series, format='table')
            else:
                store.put('stationarity_labels', labels_series, format='table')

        # Prepare and store hedge ratios
        if hedge_ratios:
            # Similarly, create a Series for hedge ratios
            ratios_series = pd.Series(hedge_ratios, index=keys)
            if '/hedge_ratios' in store.keys():
                store.append('hedge_ratios', ratios_series, format='table')
            else:
                store.put('hedge_ratios', ratios_series, format='table')




In [None]:
def main_driver_function(prices, store_file_path, track_completion_path):
    # Load or initialize the checkpoint
    completed_months = load_pickle(track_completion_path) if os.path.exists(track_completion_path) else {}

    for year, month, monthly_data in yield_monthly_data(prices):
        month_year_key = f"{month}{year}"

        # Check if the month has been processed
        if completed_months.get(month_year_key):
            print(f"Skipping already processed {month_year_key}.")
            continue

        print(f"Clustering for {month} {year}")
        cluster = calculate_monthly_clusters(monthly_data)

        for block_data in generate_trading_data_blocks(prices, month, year):
            if block_data.shape[0] == 624:
                block_data = block_data.dropna(axis=1)
                pairs = get_coint_pairs(block_data, cluster)

                features_list, stationarity_labels, hedge_ratios = [], [], []
                for pair in pairs:
                    result = process_pair(pair, block_data)
                    if result is not None:
                        features_df, stationarity_test, hedge_ratio_test = result
                        features_list.append(features_df)
                        stationarity_labels.append(stationarity_test)
                        hedge_ratios.append(hedge_ratio_test)

                # Store the processed data
                store_data(features_list, stationarity_labels, hedge_ratios, store_file_path)

        # Update the checkpoint after each month
        completed_months[month_year_key] = True
        save_pickle(track_completion_path, completed_months)

import os
# Paths
store_file_path = "/content/drive/My Drive/NN_dataset.h5"
track_completion_path = "/content/drive/My Drive/track_completion"

initialize_hdf5_structure(store_file_path)

intraday_dfs = ticker_dfs.copy()
prices = clean_data(intraday_dfs, tickers)

main_driver_function(prices[-3000:], store_file_path, track_completion_path)

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
('BAC', 'USB') 0.09455290037505215
('BAC', 'UNP') 0.00314779569358727
hedge_ratio_test 0.031352957251925176
stationarity_test False
('BAC', 'UPS') 0.004310948485661122
hedge_ratio_test 0.05143313881502823
stationarity_test False
('BAC', 'UNH') 0.0028442006559216753
hedge_ratio_test -0.01909531288661083
stationarity_test False
('BAC', 'VRSK') 0.002653951337929681
hedge_ratio_test -0.014007622408985104
stationarity_test False
('BAC', 'VZ') 0.0017071814882189933
hedge_ratio_test 0.02900548399917946
stationarity_test False
('BAC', 'V') 0.011261441981316862
hedge_ratio_test 0.06713816017059285
stationarity_test False
('BAC', 'WRB') 0.0009122602197539974
hedge_ratio_test -0.0410613173900618
stationarity_test False
('BAC', 'WM') 0.0010526555375529013
hedge_ratio_test -0.08015825891542548
stationarity_test False
('BAC', 'WFC') 0.09574265736422599
('BAC', 'XYL') 0.12213121292446988
('BK', 'VZ') 0.004618989595605254
hedge_ratio_tes

In [None]:
import pandas as pd

def load_data_from_hdf5(hdf5_file_path):
    # Open the HDF5 file in read mode
    with pd.HDFStore(hdf5_file_path, mode='r') as store:
        # Access the 'features' dataset
        if '/features' in store.keys():
            features_df = store['features']
            print("Features Data Loaded")
            print(features_df.head())  # Display the first few rows
        else:
            print("Features dataset not found.")
            features_df = None

        # Access the 'stationarity_labels' dataset
        if '/stationarity_labels' in store.keys():
            labels_series = store['stationarity_labels']
            print("Stationarity Labels Loaded")
            print(labels_series.head())  # Display the first few labels
        else:
            print("Stationarity labels not found.")
            labels_series = None

        # Access the 'hedge_ratios' dataset
        if '/hedge_ratios' in store.keys():
            ratios_series = store['hedge_ratios']
            print("Hedge Ratios Loaded")
            print(ratios_series.head())  # Display the first few ratios
        else:
            print("Hedge ratios not found.")
            ratios_series = None

    return features_df, labels_series, ratios_series

# Specify the path to your HDF5 file
hdf5_file_path = "/content/drive/My Drive/NN_dataset.h5"

# Call the function and retrieve the data
features_df, labels_series, ratios_series = load_data_from_hdf5(hdf5_file_path)



In [None]:
features_df.shape

(862196, 8)