# get most liquid coins + get most cointegrated_pairs + output prices and actual spreads

# Import modules

In [None]:
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"
import pandas as pd
import glob,re

pd.set_option('display.max_rows', None)
pd.set_option('display.max_columns', None)
pd.set_option('display.max_colwidth', None)

from datetime import datetime, timedelta
from sqlalchemy import create_engine
import requests
import pandas as pd
from tqdm import tqdm
import numpy as np
import random
from scipy.stats import pearsonr
from statsmodels.tsa.stattools import adfuller, coint, acf
import statsmodels.api as sm
from statsmodels.graphics.tsaplots import plot_acf

import matplotlib.pyplot as plt
import matplotlib.dates as mdates
from matplotlib.dates import DateFormatter, AutoDateLocator

import warnings
warnings.filterwarnings('ignore')


In [None]:
from google.colab import drive
import os
import shutil

# Mount Google Drive
drive.mount('/content/drive')


Mounted at /content/drive


In [1]:
cd /content/drive/MyDrive/MSC_YORK/PROJECT/

# Parameters

In [None]:
engine = create_engine('sqlite:///binance_prices.db')

In [None]:
dates_param_dic = {'start_date' : ['2023-01-01'],
'end_date' : ['2023-01-19'],
'in_sample_start_date' : ['2023-01-01'],
'in_sample_end_date' : ['2023-01-19'],
'out_of_sample_start_date': ['2023-01-19'],
'out_of_sample_end_date' : ['2023-01-23']}

In [None]:
dates_param = pd.DataFrame(dates_param_dic)
dates_param

Unnamed: 0,start_date,end_date,in_sample_start_date,in_sample_end_date,out_of_sample_start_date,out_of_sample_end_date
0,2023-01-01,2023-01-19,2023-01-01,2023-01-19,2023-01-19,2023-01-23


In [None]:
null_threshold=0.05
std_threshold= 0.05
significance_level=0.05
correlation_threshold = 0

# Dataset size

In [None]:
sql = f"""
SELECT count(*)
FROM  market_data
WHERE close_time >= '2023-01-01' AND close_time <= '2023-01-23'
ORDER BY close_time ASC
"""

# Execute the SQL query and pivot the resulting DataFrame
pd.read_sql_query(sql, con=engine)

Unnamed: 0,count(*)
0,10384500


# Get most liquid coins + most cointegrated pairs

In [None]:
def get_historical_data(start_date, end_date):
  """
  Retrieves historical market data for a list of top coins between specified start and end dates.

  Parameters:
      start_date (str): The start date for the data retrieval in 'YYYY-MM-DD' format.
      end_date (str): The end date for the data retrieval in 'YYYY-MM-DD' format.

  Returns:
      pd.DataFrame: A pivoted DataFrame with close_time as index and coin symbols as columns.
  """
  # Retrieve the list of top coins from the database
  coins_list = pd.read_sql_query("select * from top_coins", con=engine)['symbol'].values.tolist()

  # Create a comma-separated string of coin symbols
  line = ''
  for coin in coins_list:
      line = line + f"'{coin}',"
  line = line[:-1]
  print(line)

  # SQL query to retrieve market data for the selected coins and date range
  sql = f"""
  SELECT b.symbol, b.close_time, mid
  FROM top_coins a
  INNER JOIN market_data b ON a.symbol = b.symbol
  WHERE close_time >= '{start_date}' AND close_time < '{end_date}'
  and a.symbol in ({line})
  ORDER BY close_time ASC
  """

  # Execute the SQL query and pivot the resulting DataFrame
  historical_data = pd.read_sql_query(sql, con=engine, index_col='close_time', parse_dates=['close_time'])
  historical_data = pd.pivot_table(historical_data, index='close_time', columns=['symbol'], values=['mid'], aggfunc='sum')
  historical_data.columns = [col[1] for col in historical_data.columns]

  # Return the pivoted DataFrame
  return historical_data

def get_log_historical_mid(data, coin1,coin2):
    """
    Retrieves and pivots historical market data for given coins, returning the log of mid values.

    Parameters:
        data (pd.DataFrame): DataFrame containing historical data with close_time as index.
        coin1 (str): Symbol of the first coin.
        coin2 (str): Symbol of the second coin.

    Returns:
        pd.DataFrame: A pivoted DataFrame with close_time as index, coins as columns, and log of mid values as values.
    """
    # Sort the data by index (close_time) in ascending order
    log_historical_data = data.sort_index(ascending = True)
    # Select columns for the specified coins
    log_historical_data = log_historical_data.loc[:,[coin1,coin2]]
    # Drop rows with missing values
    log_historical_data = log_historical_data.dropna()
    # Apply log transformation to the data
    log_historical_data = log_historical_data.apply(np.log)

    # Return the pivoted DataFrame with log of mid values
    return log_historical_data

def adf_test(series):
    """
    Performs the Augmented Dickey-Fuller test to check for stationarity in a time series.

    Parameters:
        series (pd.Series): The time series data on which to perform the test.

    Returns:
        bool: True if the time series is non-stationary (p-value > significance_level), False otherwise.
    """
    # Perform the ADF test
    result = adfuller(series,autolag = 'BIC')
    p_value = result[1]
    # Check if the p-value is greater than the significance level
    return p_value > significance_level  # Return True if non-stationary (p-value > significance_level)

def get_lookback(series):
    """
    Determines the lookback period for a time series using the autocorrelation function.

    Parameters:
        series (pd.Series): The time series data.

    Returns:
        int: The lag at which the ACF curve falls within the confidence interval.
    """
    # Calculate the autocorrelation function (ACF)
    acf_values, confint = acf(series, alpha=0.05, nlags=50000)
    confint_lower = confint[:, 0] - acf_values
    confint_upper = confint[:, 1] - acf_values
    for lag in range(len(acf_values)):
        if acf_values[lag] > confint_lower[lag] and acf_values[lag] < confint_upper[lag]:
            print(f"The ACF curve falls within the confidence interval at lag: {lag}")
            # Return the results
            return lag


def out_of_sample_find_correlated_cointegrated_pairs(out_of_sample_data, alpha, beta, correlation_threshold=0.8, significance_level=0.05, null_threshold=0.05):
    """
    Finds pairs of coins that are both correlated and cointegrated in out-of-sample data.

    Parameters:
        out_of_sample_data (pd.DataFrame): DataFrame containing out-of-sample data for the coins.
        alpha (float): Intercept from in-sample cointegration regression.
        beta (float): Slope from in-sample cointegration regression.
        correlation_threshold (float): Threshold for the Pearson correlation coefficient.
        significance_level (float): Significance level for the cointegration test.
        null_threshold (float): Threshold for the proportion of null values.

    Returns:
        tuple: Out-of-sample correlation, correlation p-value, cointegration p-value, mean, std, halflife, and Hurst exponent.
    """
    # Calculate out-of-sample correlation and p-value
    out_of_sample_correlation, out_of_sample_correlation_p_value = pearsonr(out_of_sample_data.iloc[:, 0], out_of_sample_data.iloc[:, 1])

    # Check if the out-of-sample correlation is significant
    if out_of_sample_correlation is not None:
        # Calculate spread on out-of-sample data using coefficients from in-sample analysis
        print("Calculate spread based on parameters from in-sample.")
        predicted_log_coin1_out_of_sample = beta * out_of_sample_data.iloc[:, 1] + alpha
        spread_out_of_sample = out_of_sample_data.iloc[:, 0] - predicted_log_coin1_out_of_sample

        # Perform ADF test on spread on out-of-sample data
        print("Perform ADF test on spread on out-of-sample data:")
        adf_result_spread_on_out_of_sample = adfuller(spread_out_of_sample,autolag = 'BIC')

        # Extract ADF test results
        spread_on_out_of_sample_adf_statistic = adf_result_spread_on_out_of_sample[0]
        spread_on_out_of_sample_p_value = adf_result_spread_on_out_of_sample[1]
        spread_on_out_of_sample_critical_values = adf_result_spread_on_out_of_sample[4]

        # Print ADF test results
        print("ADF Test Results for Residuals:")
        print(f"\tADF Statistic: {spread_on_out_of_sample_adf_statistic}")
        print(f"\tresiduals P-Value: {spread_on_out_of_sample_p_value}")
        print("\tCritical Values:")
        for key, value in spread_on_out_of_sample_critical_values.items():
            print(f"\t{key}: {value}")

        # Print correlation and p-values
        print(f"""out_of_sample_correlation: {out_of_sample_correlation},
              out_of_sample_correlation_p_value: {out_of_sample_correlation_p_value},
              cointegration_p_value: {spread_on_out_of_sample_p_value}""")

        # Calculate mean and standard deviation of spread on out-of-sample data
        mean = spread_out_of_sample.mean()
        std = spread_out_of_sample.std()
        print(f"\nmean(out-of-sample): {mean} ")
        print(f"\nstd(out-of-sample): {std} ")

        # Extract Half-life
        halflife = calculate_half_life(spread_out_of_sample)

        # Extract Half-life
        hurst_exponent = hurst(spread_out_of_sample)

        # Return the results
        return out_of_sample_correlation, out_of_sample_correlation_p_value, spread_on_out_of_sample_p_value, mean, std, halflife, hurst_exponent

    return None, None, None

def in_sample_cointegration_pvalue(data):
    """
    Calculates the cointegration p-value for a given pair of assets using the Engle-Granger method.

    Parameters:
        data (pd.DataFrame): DataFrame containing historical data for the assets.

    Returns:
        tuple: alpha, beta, residuals p-value, mean, and standard deviation of residuals, half-life, and Hurst exponent.
    """
    try:
        # Perform ADF test on both series to ensure they are non-stationary
        print("We check if both assets are non-stationary.")
        asset1_non_stationary = adf_test(data.iloc[:, 0])
        asset2_non_stationary = adf_test(data.iloc[:, 1])
        print(f"asset1_non_stationary : {asset1_non_stationary}")
        print(f"asset2_non_stationary : {asset2_non_stationary}")
        if asset1_non_stationary and asset2_non_stationary:
            # Perform cointegration test
            print("We get parameters from Engle-Granger method.")
            log_coin2_prices_const = sm.add_constant(data.iloc[:, 1])
            model = sm.OLS(data.iloc[:, 0], log_coin2_prices_const)
            results = model.fit()

            # Extract coefficients
            alpha = results.params[0]  # intercept
            beta = results.params[1]   # slope

            # Extract residuals
            residuals = results.resid

            # Perform ADF test on residuals
            adf_result_residuals = adfuller(residuals,autolag = 'BIC')

            # Extract ADF test results
            residuals_adf_statistic = adf_result_residuals[0]
            residuals_p_value = adf_result_residuals[1]
            residuals_critical_values = adf_result_residuals[4]

            # Print ADF test results
            print("ADF Test Results for Residuals:")
            print(f"\tADF Statistic: {residuals_adf_statistic}")
            print(f"\tresiduals P-Value: {residuals_p_value}")
            print("\tCritical Values:")

            # Print mean and std results
            for key, value in residuals_critical_values.items():
                print(f"\t{key}: {value}")
            mean = residuals.mean()
            std = residuals.std()
            print(f"\nmean(in-sample): {mean} ")
            print(f"\nstd(out-of-sample): {std} ")

            # Extract Half-life
            halflife = calculate_half_life(residuals)

            # Extract Half-life
            hurst_exponent = hurst(residuals)

            del residuals
            # Return the results
            return alpha, beta, residuals_p_value, mean, std, halflife, hurst_exponent

        else:
            # Return a high p-value to indicate poor cointegration if any series is stationary
            return np.inf, None, None, None, None , None, None # Return a high p-value to indicate poor cointegration if any series is stationary
    except Exception as e:
        print(f"Error with pair: {e}")
        return np.inf, None, None, None, None, None, None  # Return a high p-value to indicate poor cointegration

def calculate_half_life(spread):
    """
    Calculates the half-life of mean reversion for a given spread.

    Parameters:
        spread (pd.Series): The spread of the time series.

    Returns:
        float: The half-life of mean reversion.
    """
    df_spread = spread.to_frame()
    df_spread.columns = ['spread']
    spread_lag = df_spread.spread.shift(1)
    spread_lag.iloc[0] = spread_lag.iloc[1]
    spread_ret = df_spread.spread - spread_lag
    spread_ret.iloc[0] = spread_ret.iloc[1]
    spread_lag2 = sm.add_constant(spread_lag)
    model = sm.OLS(spread_ret, spread_lag2)
    res = model.fit()
    halflife = round(-np.log(2) / res.params[1], 0)
    print(f"halflife : {halflife}")
    del df_spread
    return halflife

def hurst(spread):
    """
    Returns the Hurst Exponent of the time series vector.

    Parameters:
        spread (pd.Series): The time series data.

    Returns:
        float: The Hurst exponent of the time series.
    """
    # Create the range of lag values
    lags = range(2, 100)
    ts = spread.values

    # Calculate the array of the variances of the lagged differences
    tau = [np.sqrt(np.std(np.subtract(ts[lag:], ts[:-lag]))) for lag in lags]

    # Avoid divide by zero in log
    tau = np.array(tau)
    tau = np.where(tau == 0, np.finfo(float).eps, tau)

    # Use a linear fit to estimate the Hurst Exponent
    poly = np.polyfit(np.log(lags), np.log(tau), 1)

    # Return the Hurst exponent from the polyfit output
    hurst = poly[0] * 2.0
    print(f"Hurst Exponent: {round(hurst, 2)}")
    del spread
    return hurst

def in_sample_find_correlated_cointegrated_pairs(in_sample_data, correlation_threshold=0.8, significance_level=0.05, null_threshold=0.05):
    """
    Finds pairs of coins that are both correlated and cointegrated in in-sample data.

    Parameters:
        in_sample_data (pd.DataFrame): DataFrame containing in-sample data for the coins.
        correlation_threshold (float): Threshold for the Pearson correlation coefficient.
        significance_level (float): Significance level for the cointegration test.
        null_threshold (float): Threshold for the proportion of null values.

    Returns:
        pd.DataFrame: DataFrame containing cointegrated pairs and their statistics.
    """
    pair = in_sample_data.columns.tolist()

    in_sample_correlation, in_sample_corr_pvalue = pearsonr(in_sample_data.iloc[:, 0], in_sample_data.iloc[:, 1])
    print(f"in_sample_corr_pvalue : {in_sample_corr_pvalue}")
    print(f"in_sample_correlation : {in_sample_correlation}")
    print(f" in_sample_corr_pvalue < significance_level :{ in_sample_corr_pvalue < significance_level}")
    print(f" in_sample_correlation > correlation_threshold :{ in_sample_correlation > correlation_threshold}")

    if in_sample_correlation is not None :
        in_sample_alpha, in_sample_beta, in_sample_residuals_p_value, in_sample_mean, in_sample_std, in_sample_halflife, in_sample_hurst =\
            in_sample_cointegration_pvalue(in_sample_data)
        print(f"in_sample_residuals_p_value : {in_sample_residuals_p_value}")
        if in_sample_residuals_p_value is not None:
            if (in_sample_residuals_p_value < significance_level) :
                out_of_sample_data = get_log_historical_mid(out_of_sample_historical_data, pair[0], pair[1])
                out_of_sample_correlation, out_of_sample_correlation_p_value, spread_on_out_of_sample_p_value,\
                  out_of_sample_mean, out_of_sample_std, out_of_sample_halflife, out_of_sample_hurst =\
                    out_of_sample_find_correlated_cointegrated_pairs(out_of_sample_data, in_sample_alpha, in_sample_beta, correlation_threshold=correlation_threshold,
                                                                     significance_level=significance_level, null_threshold=null_threshold)

                # Get the lookback for each coin
                coin1_lookback = get_lookback(in_sample_data.iloc[:, 0])
                coin2_lookback = get_lookback(in_sample_data.iloc[:, 1])

                print(f"""Cointegrated Pair {pair},
                      in_sample_correlation: {in_sample_correlation},
                      in_sample_correlation_p_value: {in_sample_corr_pvalue},
                      in_sample_residuals_adf_p_value: {in_sample_residuals_p_value},
                      in_sample_alpha: {in_sample_alpha},
                      in_sample_beta: {in_sample_beta},
                      in_sample_mean: {in_sample_mean},
                      in_sample_std: {in_sample_std},
                      in_sample_half_file : {in_sample_halflife},
                      in_sample_hurst : {in_sample_hurst},
                      in_sample_coin1_lookback : {coin1_lookback},
                      in_sample_coin2_lookback : {coin2_lookback},
                      out_of_sample_correlation: {out_of_sample_correlation},
                      out_of_sample_correlation_p_value: {out_of_sample_correlation_p_value},
                      out_of_sample_residuals_adf_p_value: {spread_on_out_of_sample_p_value},
                      """)

                cointegrated_pair = pd.DataFrame({
                    'coin1': [pair[0]],
                    'coin2': [pair[1]],
                    'in_sample_correlation': [in_sample_correlation],
                    'in_sample_correlation_pvalue': [in_sample_corr_pvalue],
                    'in_sample_residuals_adf_p_value': [in_sample_residuals_p_value],
                    'in_sample_alpha': [in_sample_alpha],
                    'in_sample_beta': [in_sample_beta],
                    'in_sample_half_file' : [in_sample_halflife],
                    'in_sample_hurst' : [in_sample_hurst],
                    'in_sample_coin1_lookback' : [coin1_lookback],
                    'in_sample_coin2_lookback' : [coin2_lookback],
                    'out_of_sample_correlation': [out_of_sample_correlation],
                    'out_of_sample_correlation_p_value': [out_of_sample_correlation_p_value],
                    'out_of_sample_spread_adf_p_value': [spread_on_out_of_sample_p_value],
                    'out_of_sample_mean': [out_of_sample_mean],
                    'out_of_sample_std': [out_of_sample_std],
                    'out_of_sample_half_file' : [out_of_sample_halflife],
                    'out_of_sample_hurst' : [out_of_sample_hurst]


                })

                return cointegrated_pair
    return None



In [None]:
for index, row in dates_param.iterrows():
  print(row)
  start_date = row['start_date']
  end_date = row['end_date']
  in_sample_start_date = row['in_sample_start_date']
  in_sample_end_date = row['in_sample_end_date']
  out_of_sample_start_date = row['out_of_sample_start_date']
  out_of_sample_end_date = row['out_of_sample_end_date']

  # Get the market prices  => determination of the most liquid cryptocurrencies
  sql = f"""
  select close_time as date, symbol, monetary_volume
  from market_data
  where
  close_time >= '{start_date}'
  and close_time < '{end_date}'
  --and monetary_volume != 0
  """
  market_prices = pd.read_sql(sql, con = engine)

  # Get the most liquid assets
  market_prices['datetime'] = pd.to_datetime(market_prices['date'])
  market_prices['monetary_volume'] = market_prices['monetary_volume'].fillna(0)

  quantile_3rd = market_prices.groupby("datetime")['monetary_volume'].quantile(0.75).reset_index()
  quantile_3rd.rename(columns={'monetary_volume': '75th_percentile'}, inplace=True)
  quantile_3rd = quantile_3rd.sort_values(by = 'datetime', ascending = True)

  market_caps = market_prices.sort_values(by = ['symbol','datetime'], ascending = [True, True])
  market_caps = market_caps.merge(quantile_3rd, how = 'inner', on = ['datetime'])
  market_caps['is_top_25th_market_cap'] = market_caps.apply(lambda x : 1 if x['monetary_volume'] >= x['75th_percentile'] else 0, axis =1 )
  top_25th_market_caps = market_caps.query("is_top_25th_market_cap == 1")
  coins_traded_in_entire_in_sample = top_25th_market_caps.groupby('symbol').agg({'is_top_25th_market_cap' : 'sum'}).reset_index().rename(columns = {'is_top_25th_market_cap' : 'number_of_periods_in_top_25th_market_cap'})
  coins_traded_in_entire_in_sample = coins_traded_in_entire_in_sample.sort_values(by = 'number_of_periods_in_top_25th_market_cap', ascending = False)
  coins_traded_in_entire_in_sample['periods_coverage'] = coins_traded_in_entire_in_sample['number_of_periods_in_top_25th_market_cap']/coins_traded_in_entire_in_sample['number_of_periods_in_top_25th_market_cap'].iloc[0]
  coins_traded_in_entire_in_sample = coins_traded_in_entire_in_sample.query("periods_coverage>=0.9")
  coins_traded_in_entire_in_sample.drop(columns = ['number_of_periods_in_top_25th_market_cap','periods_coverage'], inplace = True)
  top_25th_market_caps_refined = top_25th_market_caps.merge(coins_traded_in_entire_in_sample, how = 'inner', on = ['symbol'])
  top_25th_market_caps_refined['rank'] = top_25th_market_caps_refined.groupby("date")['monetary_volume'].rank("dense", ascending = False)
  top_25th_market_caps_refined = top_25th_market_caps_refined.groupby('symbol').agg({'monetary_volume':'mean'}).reset_index().rename(columns = {'monetary_volume' : 'monetary_volume_average'}).sort_values(by = ['monetary_volume_average'], ascending = False)
  top_25th_market_caps_refined['rank'] = top_25th_market_caps_refined['monetary_volume_average'].rank(ascending = False)
  pd.set_option('display.float_format', '{:.2f}'.format)
  top_25th_market_caps_refined
  top_25th_market_caps_refined['symbol'].to_sql('top_coins', con=engine, index=False, if_exists='replace')
  top_25th_market_caps_refined.to_csv(f"MOST_LIQUID_COINS/top_25th_market_caps_from_{start_date}_to_{end_date}.csv", index = False)

  # Get the historical data of the most liquid pairs
  in_sample_historical_data = get_historical_data(in_sample_start_date, in_sample_end_date)
  out_of_sample_historical_data = get_historical_data(out_of_sample_start_date, out_of_sample_end_date)

  # Get the most cointegrated pairs
  # Loop over combinations of pairs and find cointegrated pairs
  print("Looping over the combinations of pairs...")
  coins = top_25th_market_caps_refined['symbol'].values.tolist()


  cointegrated_pairs = pd.DataFrame()
  for i in range(len(coins)):
      for j in range(i + 1, len(coins)):
          pair = (coins[i], coins[j])
          print("################################################################")
          print(f"\nPair: {pair}")
          in_sample_data = get_log_historical_mid(in_sample_historical_data, coins[i], coins[j])

          print(f"data.shape: {in_sample_data.shape}")
          print(f"data.count(): {in_sample_data.count()}")
          print(f"data.columns: {in_sample_data.columns}")
          null_proportion = in_sample_data.isnull().mean().mean()  # Proportion of null values in the pair
          print(f"null_proportion: {null_proportion}")
          if null_proportion > null_threshold:
              continue
          cointegrated_pair = in_sample_find_correlated_cointegrated_pairs(in_sample_data, correlation_threshold=correlation_threshold, significance_level=significance_level, null_threshold=null_threshold)
          if cointegrated_pair is not None:
              cointegrated_pairs = pd.concat([cointegrated_pairs, cointegrated_pair], axis=0)
              cointegrated_pairs.to_csv(f"MOST_COINTEGRATED_PAIRS/cointegrated_pairs_{in_sample_start_date}_{in_sample_end_date}_{out_of_sample_end_date}.csv", index = False)





start_date                  2023-01-01
end_date                    2023-01-19
in_sample_start_date        2023-01-01
in_sample_end_date          2023-01-19
out_of_sample_start_date    2023-01-19
out_of_sample_end_date      2023-01-23
Name: 0, dtype: object


Unnamed: 0,symbol,monetary_volume_average,rank
5,BTCUSDT,3087738.72,1.0
8,ETHUSDT,437444.05,2.0
21,SOLUSDT,119946.97,3.0
23,XRPUSDT,100256.32,4.0
11,GALAUSDT,82885.32,5.0
6,DOGEUSDT,70266.02,6.0
2,APTUSDT,63536.92,7.0
4,BNBUSDT,58567.1,8.0
20,SHIBUSDT,53277.0,9.0
15,LTCUSDT,48905.16,10.0


24

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
                      in_sample_hurst : 0.45341556914326847,
                      in_sample_coin1_lookback : 2313,
                      in_sample_coin2_lookback : 2448,
                      out_of_sample_correlation: 0.9107643059409778,
                      out_of_sample_correlation_p_value: 0.0,
                      out_of_sample_residuals_adf_p_value: 0.3975824464660058,
                      
################################################################

Pair: ('LTCUSDT', 'LDOUSDT')
data.shape: (25920, 2)
data.count(): LTCUSDT    25920
LDOUSDT    25920
dtype: int64
data.columns: Index(['LTCUSDT', 'LDOUSDT'], dtype='object')
null_proportion: 0.0
in_sample_corr_pvalue : 0.0
in_sample_correlation : 0.9464304902777203
 in_sample_corr_pvalue < significance_level :True
 in_sample_correlation > correlation_threshold :True
We check if both assets are non-stationary.
asset1_non_stationary : True
asset2_non_stationary : 

# Prices and actual spreads

In [None]:
class DataProcessor:
    """
    A class used to fetch prices and actual spreads.
    Attributes:
    ----------
    training_set_start_date : str
        The start date of the training set.
    training_set_end_date : str
        The end date of the training set.
    test_set_end_date : str
        The end date of the test set.
    coin1 : str
        The symbol of the first cryptocurrency.
    coin2 : str
        The symbol of the second cryptocurrency.
    lookforward_window : int
        The lookforward window period.

    """
    def __init__(self, training_set_start_date,
                 training_set_end_date,
                 test_set_end_date,
                 coin1,
                 coin2,
                 lookforward_window,
                 ):
        """
        Initialize the DataProcessor with dates, coin symbols, and periods.

        Parameters:
        ----------
        training_set_start_date : str
            The start date of the training set.
        training_set_end_date : str
            The end date of the training set.
        test_set_end_date : str
            The end date of the test set.
        coin1 : str
            The symbol of the first cryptocurrency.
        coin2 : str
            The symbol of the second cryptocurrency.
        lookforward_window : int
            The lookforward window period.

        """
        self.training_set_start_date = training_set_start_date
        self.training_set_end_date = training_set_end_date
        self.test_set_end_date = test_set_end_date
        self.coin1 = coin1
        self.coin2 = coin2
        self.lookforward_window = lookforward_window

    def plot_mid(self,):
        """
        Plot the mid prices of the two cryptocurrencies over time.

        Parameters:
        ----------
        periods : list
            The list of periods to be used in the plot.
        """
        # Concatenate mid prices from training and test datasets
        self.coin1_mid_all_datasets = pd.concat([self.training_set_historical_data[['close_time',f'{self.coin1}_mid']], self.test_set_historical_data[['close_time',f'{self.coin1}_mid']] ], axis = 0)
        self.coin2_mid_all_datasets = pd.concat([self.training_set_historical_data[['close_time',f'{self.coin2}_mid']], self.test_set_historical_data[['close_time',f'{self.coin2}_mid']] ], axis = 0)

        # Create plot
        fig, ax = plt.subplots(figsize=(20, 10), dpi=200)
        coin1_close_times = pd.to_datetime(self.coin1_mid_all_datasets['close_time'])
        coin2_close_times = pd.to_datetime(self.coin2_mid_all_datasets['close_time'])
        ax.plot(coin1_close_times, self.coin1_mid_all_datasets[f'{self.coin1}_mid'], label=self.coin1, color='g', linestyle='--')
        ax.plot(coin2_close_times, self.coin2_mid_all_datasets[f'{self.coin2}_mid'], label=self.coin2, color='r', linestyle='--')

        # Format x-axis
        ax.xaxis.set_major_formatter(DateFormatter('%Y-%m-%d %H:%M:%S'))
        ax.xaxis.set_major_locator(mdates.AutoDateLocator())
        ax.axvline(x=pd.to_datetime(self.training_set_end_date), color='r', linestyle='--')

        # Set titles and labels
        plt.title(f'MID_PRICES_{self.coin1}_{self.coin2}')
        plt.ylabel('Mid prices')
        plt.xlabel('Time')
        plt.legend()

        plt.xticks(rotation=45)

        # Adjust x-axis ticks frequency
        locator = AutoDateLocator()
        plt.gca().xaxis.set_major_locator(locator)
        plt.gca().xaxis.set_major_formatter(DateFormatter('%Y-%m-%d %H:%M:%S'))

        plt.grid(True)
        plt.tight_layout()

        plt.savefig(f'PRICES/ACTUAL/MID_plot_{self.coin1}_{self.coin2}.png')

        plt.show()

    def plot_spread(self, ):
        """
        Plot the spread of the pairs over time.
        """
        #concat spread from test and training sets
        self.spread_all_datasets = pd.concat([self.training_set_historical_data[['close_time','spread']], self.test_set_historical_data[['close_time','spread']] ], axis = 0)

        # Create plot
        fig, ax = plt.subplots(figsize=(20, 10), dpi=200)
        close_times = pd.to_datetime(self.spread_all_datasets['close_time'])
        ax.plot(close_times, self.spread_all_datasets[f'spread'][:], label=self.coin1, color='g', linestyle='--')

        # Format x-axis
        ax.xaxis.set_major_formatter(DateFormatter('%Y-%m-%d %H:%M:%S'))
        ax.xaxis.set_major_locator(mdates.AutoDateLocator())
        ax.axvline(x=pd.to_datetime(self.training_set_end_date), color='r', linestyle='--')

        # Set titles and labels
        plt.title(f'ACTUAL SPREAD_{self.coin1}_{self.coin2}')
        plt.ylabel('Actual spread')
        plt.xlabel('Time')
        plt.xticks(rotation=45)

        # Adjust x-axis ticks frequency
        locator = AutoDateLocator()
        plt.gca().xaxis.set_major_locator(locator)
        plt.gca().xaxis.set_major_formatter(DateFormatter('%Y-%m-%d %H:%M:%S'))

        plt.grid(True)
        plt.tight_layout()

        # Save and show plot
        plt.savefig(f'SPREADS/ACTUAL/actual_spread_plot_{self.coin1}_{self.coin2}.png')
        plt.show()


    def adf_test(self, series):
        """
        Perform the Augmented Dickey-Fuller test to check for stationarity of a series.

        Parameters:
        ----------
        series : pandas.Series
            The time series data to test for stationarity.
        """
        # Handle missing and infinite values
        series = series.replace([np.inf, -np.inf], np.nan).dropna()
        result = adfuller(series,autolag = 'BIC')

        # Print ADF test results
        print('ADF Statistic:', result[0])
        print('p-value:', result[1])
        print('Critical Values:')
        for key, value in result[4].items():
            print(f'\t{key}: {value:.3f}')


    def calculate_cointegration_parameters(self, price1, price2):
        """
        Calculate the cointegration parameters between two price series.

        Parameters:
        ----------
        price1 : pandas.Series
            The first price series.
        price2 : pandas.Series
            The second price series.

        Returns:
        -------
        hedge_ratio : float
            The hedge ratio between the two price series.
        alpha : float
            The intercept from the OLS regression.
        residuals : pandas.Series
            The residuals from the OLS regression.
        """
        # Handle missing and infinite values
        price1.replace([np.inf, -np.inf], np.nan, inplace=True)
        price1.dropna(inplace=True)
        price2.replace([np.inf, -np.inf], np.nan, inplace=True)
        price2.dropna(inplace=True)

        # Fit OLS model
        model = sm.OLS(price1, sm.add_constant(price2))
        results = model.fit()
        residuals = results.resid


        # Print results
        print("Cointegration test results:")
        print(results.summary())


        # Calculate hedge ratio and alpha
        hedge_ratio = results.params[1]
        alpha = results.params[0]
        return hedge_ratio, alpha,residuals

    def calculate_log_mid(self, historical_data):
        """
        Calculate the logarithmic mid prices for the given historical data.

        Parameters:
        ----------
        historical_data : pandas.DataFrame
            The historical market data containing mid prices.

        Returns:
        -------
        historical_data : pandas.DataFrame
            The historical data with added logarithmic mid prices.
        """
        # Calculate logarithmic mid prices
        historical_data[f'log_{self.coin1}_mid'] = np.log(historical_data[f'{self.coin1}_mid']).dropna()
        historical_data[f'log_{self.coin2}_mid'] = np.log(historical_data[f'{self.coin2}_mid']).dropna()

        return historical_data

    def fetch_data(self, start_date, end_date):
        """
        Fetch historical market data between the given dates.

        Parameters:
        ----------
        start_date : str
            The start date for fetching data.
        end_date : str
            The end date for fetching data.

        Returns:
        -------
        historical_data : pandas.DataFrame
            The fetched historical market data.
        """
        sql = f"""
        SELECT symbol, close_time, mid, volume
        FROM market_data
        WHERE close_time >= '{start_date}' AND close_time < '{end_date}'
        AND symbol in ('{self.coin1}','{self.coin2}')
        ORDER BY close_time ASC
        """
        print(sql)
        # Fetch mid prices data
        historical_data = pd.read_sql_query(sql, con=engine, parse_dates=['close_time'])
        historical_data = pd.pivot_table(historical_data, index='close_time', columns=['symbol'], values=['mid'], aggfunc='sum')
        historical_data.columns = [col[1]+'_mid' for col in historical_data.columns]

        # Fetch volume data
        historical_volume_data = pd.read_sql_query(sql, con=engine, parse_dates=['close_time'])
        historical_volume_data = pd.pivot_table(historical_volume_data, index='close_time', columns=['symbol'], values=['volume'], aggfunc='sum')
        historical_volume_data.columns = [col[1]+'_vol' for col in historical_volume_data.columns]

        # Merge mid prices and volume data
        historical_data = historical_data.merge(historical_volume_data, how = 'inner', on = 'close_time')

        return historical_data


    def process_training_data(self):
        """
        Process the training set data: fetch prices, calculate log mid, hedge ratio, and spread, and check stationarity.
        """
        print("training set pre-processing ")
        print("fetch prices ")
        self.training_set_historical_data = self.fetch_data(self.training_set_start_date, self.training_set_end_date)

        print("calculate log mid.")
        self.training_set_historical_data = self.calculate_log_mid(self.training_set_historical_data)


        print("calculate hedge ratio and spread")
        self.training_set_historical_data = self.training_set_historical_data.dropna()
        self.hedge_ratio, self.alpha, residuals = self.calculate_cointegration_parameters(self.training_set_historical_data[f'log_{self.coin1}_mid'], self.training_set_historical_data[f'log_{self.coin2}_mid'])
        print(f"hedge ratio: {self.hedge_ratio}")
        print(f"alpha constant: {self.alpha}")
        self.training_set_historical_data['spread'] = residuals


        print("\nckeck stationarity of spread ")
        self.adf_test(self.training_set_historical_data['spread'])



        print(f"get the look-back period based on acf test of the mid price series")

        # Clean and reset the training data
        self.training_set_historical_data = self.training_set_historical_data.dropna()
        self.training_set_historical_data = self.training_set_historical_data.reset_index()


        if not pd.api.types.is_datetime64_any_dtype(self.training_set_historical_data['close_time']):
            self.training_set_historical_data['close_time'] = pd.to_datetime(self.training_set_historical_data['close_time'])

    def process_test_data(self):
        """
        Process the test set data: fetch prices, calculate log mid, and spread, and check stationarity.
        """

        print("Test set pre-processing ")
        print("fetch prices and volumes ")
        self.test_set_historical_data = self.fetch_data(self.training_set_end_date, self.test_set_end_date)
        print(self.test_set_historical_data.columns)

        print("calculate log mid")
        self.test_set_historical_data = self.calculate_log_mid(self.test_set_historical_data)
        print(self.test_set_historical_data.columns)

        print("use hedge ratio calculated with training set to calculate the spread in test set")
        self.test_set_historical_data = self.test_set_historical_data.dropna()
        self.test_set_historical_data['spread'] = self.test_set_historical_data[f'log_{self.coin1}_mid'] - self.hedge_ratio * self.test_set_historical_data[f'log_{self.coin2}_mid'] -self.alpha
        print(self.test_set_historical_data.columns)


        print("ckeck stationarity of spread in test set")
        self.adf_test(self.test_set_historical_data['spread'])

        # Clean and reset the test data
        self.test_set_historical_data = self.test_set_historical_data.dropna()
        self.test_set_historical_data = self.test_set_historical_data.reset_index()
        print(self.test_set_historical_data.columns)

        if not pd.api.types.is_datetime64_any_dtype(self.test_set_historical_data['close_time']):
            self.test_set_historical_data['close_time'] = pd.to_datetime(self.test_set_historical_data['close_time'])

    def merge_train_test_sets(self):
        """
        Merge the training and test datasets.
        """
        # Concatenate spreads from training and test datasets
        self.spread_all_datasets = pd.concat([self.training_set_historical_data[['close_time','spread']], self.test_set_historical_data[['close_time','spread']] ], axis = 0)



In [None]:
adf_p_value_threshold = 0.05

In [None]:
# Initialize an empty DataFrame to store correlated pairs
cointegrated_pairs = pd.DataFrame([])

# Define the pattern for the filenames
pattern = "MOST_COINTEGRATED_PAIRS/cointegrated_pairs_*.csv"

# Get the list of files matching the pattern
files = glob.glob(pattern)

# Process each file
for filename in files:
  # Extract dates from the filename
  match = re.search(r'MOST_COINTEGRATED_PAIRS/cointegrated_pairs_(\d{4}-\d{2}-\d{2})_(\d{4}-\d{2}-\d{2})_(\d{4}-\d{2}-\d{2})', filename)
  if match:
    print(filename)
    cointegrated_pairs_tmp = pd.read_csv(filename).query(f"out_of_sample_spread_adf_p_value< {adf_p_value_threshold} and in_sample_residuals_adf_p_value < {adf_p_value_threshold}")
    cointegrated_pairs_tmp['in_sample_start_date'] = match.group(1)
    cointegrated_pairs_tmp['in_sample_end_date'] = match.group(2)
    cointegrated_pairs_tmp['out_of_sample_end_date'] = match.group(3)
    cointegrated_pairs = pd.concat([cointegrated_pairs, cointegrated_pairs_tmp], axis = 0)

cointegrated_pairs

MOST_COINTEGRATED_PAIRS/cointegrated_pairs_2023-01-01_2023-01-19_2023-01-23.csv


Unnamed: 0,coin1,coin2,in_sample_correlation,in_sample_correlation_pvalue,in_sample_residuals_adf_p_value,in_sample_alpha,in_sample_beta,in_sample_half_file,in_sample_hurst,in_sample_coin1_lookback,in_sample_coin2_lookback,out_of_sample_correlation,out_of_sample_correlation_p_value,out_of_sample_spread_adf_p_value,out_of_sample_mean,out_of_sample_std,out_of_sample_half_file,out_of_sample_hurst,in_sample_start_date,in_sample_end_date,out_of_sample_end_date
2,BTCUSDT,LUNCUSDT,0.89,0.0,0.04,21.46,1.34,1411.0,0.46,2423,2152,0.91,0.0,0.02,0.1,0.02,302.0,0.49,2023-01-01,2023-01-19,2023-01-23
4,ETHUSDT,XRPUSDT,0.95,0.0,0.01,8.85,1.61,850.0,0.47,2443,2458,0.97,0.0,0.0,0.0,0.01,196.0,0.37,2023-01-01,2023-01-19,2023-01-23
6,ETHUSDT,SHIBUSDT,0.96,0.0,0.04,16.79,0.83,1309.0,0.5,2443,2262,0.98,0.0,0.0,-0.04,0.01,127.0,0.4,2023-01-01,2023-01-19,2023-01-23
7,ETHUSDT,MATICUSDT,0.99,0.0,0.0,7.35,0.97,650.0,0.45,2443,2409,0.98,0.0,0.01,0.04,0.01,222.0,0.4,2023-01-01,2023-01-19,2023-01-23
12,ETHUSDT,DOTUSDT,0.99,0.0,0.0,5.81,0.86,627.0,0.46,2443,2337,0.97,0.0,0.05,0.01,0.01,265.0,0.38,2023-01-01,2023-01-19,2023-01-23
18,SOLUSDT,SANDUSDT,0.98,0.0,0.01,3.58,1.24,1249.0,0.49,2189,2348,0.97,0.0,0.02,-0.08,0.02,264.0,0.48,2023-01-01,2023-01-19,2023-01-23
21,SOLUSDT,DOTUSDT,0.98,0.0,0.01,-1.05,2.36,1024.0,0.46,2189,2337,0.98,0.0,0.01,-0.05,0.02,203.0,0.45,2023-01-01,2023-01-19,2023-01-23
26,XRPUSDT,LTCUSDT,0.93,0.0,0.01,-4.33,0.76,887.0,0.47,2458,2313,0.96,0.0,0.0,0.04,0.01,143.0,0.41,2023-01-01,2023-01-19,2023-01-23
27,XRPUSDT,MATICUSDT,0.96,0.0,0.01,-0.94,0.56,718.0,0.46,2458,2409,0.94,0.0,0.02,0.03,0.01,305.0,0.4,2023-01-01,2023-01-19,2023-01-23
28,XRPUSDT,SANDUSDT,0.92,0.0,0.03,-0.86,0.25,1230.0,0.49,2458,2348,0.89,0.0,0.05,0.01,0.01,404.0,0.41,2023-01-01,2023-01-19,2023-01-23


In [None]:
# Iterate over each row in the cointegrated_pairs DataFrame
for index,task in cointegrated_pairs.iterrows():
  # Extract relevant data from the current row
  training_set_start_date = task[-3]
  training_set_end_date = task[-2]
  test_set_end_date = task[-1]
  coin1 = task[0]
  coin2 = task[1]
  lookforward_window = 2

  # Initialize the DataProcessor with extracted data
  processor = DataProcessor(training_set_start_date,
                          training_set_end_date,
                          test_set_end_date,
                          coin1,
                          coin2,
                          lookforward_window,
                            )

  # Process the training data
  processor.process_training_data()

  # Store the calculated hedge ratio and alpha in the DataFrame
  cointegrated_pairs.loc[index, 'in_sample_beta'] = processor.hedge_ratio
  cointegrated_pairs.loc[index, 'in_sample_alpha'] = processor.alpha

  # Save the training set mid prices to CSV files
  for coin in [processor.coin1,processor.coin2]:
      processor.training_set_historical_data[['close_time',f'{coin}_mid']].to_csv(f"PRICES/ACTUAL/training_set_{coin}_{training_set_start_date}_{training_set_end_date}.csv", index=False)

  # Process the test data
  processor.process_test_data()
  # Merge training and test datasets
  processor.merge_train_test_sets()

  # Plot the spread and mid prices
  processor.plot_spread()
  processor.plot_mid()

  # Save the test set mid prices and volumes to CSV files
  for coin in [processor.coin1,processor.coin2]:
      processor.test_set_historical_data[['close_time',f'{coin}_mid',f'{coin}_vol']].to_csv(f"PRICES/ACTUAL/test_set_{coin}_{training_set_end_date}_{test_set_end_date}.csv", index=False)

  processor.spread_all_datasets[['close_time','spread']].to_csv(f"SPREADS/ACTUAL/actual_spread_{processor.coin1}_{processor.coin2}_{training_set_start_date}_{test_set_end_date}.csv", index = False)

# Save the updated cointegrated_pairs DataFrame to a CSV file
cointegrated_pairs.to_csv(f"MOST_COINTEGRATED_PAIRS/confirmed_cointegrated_pairs.csv", index=False)

Output hidden; open in https://colab.research.google.com to view.

In [None]:
unique_coins = pd.concat([cointegrated_pairs.coin1, cointegrated_pairs.coin2]).unique()
print(unique_coins)
unique_coins.shape


['BTCUSDT' 'ETHUSDT' 'SOLUSDT' 'XRPUSDT' 'DOGEUSDT' 'SHIBUSDT' 'LTCUSDT'
 'ADAUSDT' 'NEARUSDT' 'SANDUSDT' 'LINKUSDT' 'DOTUSDT' 'LUNCUSDT'
 'MATICUSDT' 'TRXUSDT' 'ATOMUSDT' 'JASMYUSDT']


(17,)

In [None]:
cointegrated_pairs

Unnamed: 0,coin1,coin2,in_sample_correlation,in_sample_correlation_pvalue,in_sample_residuals_adf_p_value,in_sample_alpha,in_sample_beta,in_sample_half_file,in_sample_hurst,in_sample_coin1_lookback,in_sample_coin2_lookback,out_of_sample_correlation,out_of_sample_correlation_p_value,out_of_sample_spread_adf_p_value,out_of_sample_mean,out_of_sample_std,out_of_sample_half_file,out_of_sample_hurst,in_sample_start_date,in_sample_end_date,out_of_sample_end_date
2,BTCUSDT,LUNCUSDT,0.89,0.0,0.04,21.46,1.34,1411.0,0.46,2423,2152,0.91,0.0,0.02,0.1,0.02,302.0,0.49,2023-01-01,2023-01-19,2023-01-23
4,ETHUSDT,XRPUSDT,0.95,0.0,0.01,8.85,1.61,850.0,0.47,2443,2458,0.97,0.0,0.0,0.0,0.01,196.0,0.37,2023-01-01,2023-01-19,2023-01-23
6,ETHUSDT,SHIBUSDT,0.96,0.0,0.04,16.79,0.83,1309.0,0.5,2443,2262,0.98,0.0,0.0,-0.04,0.01,127.0,0.4,2023-01-01,2023-01-19,2023-01-23
7,ETHUSDT,MATICUSDT,0.99,0.0,0.0,7.35,0.97,650.0,0.45,2443,2409,0.98,0.0,0.01,0.04,0.01,222.0,0.4,2023-01-01,2023-01-19,2023-01-23
12,ETHUSDT,DOTUSDT,0.99,0.0,0.0,5.81,0.86,627.0,0.46,2443,2337,0.97,0.0,0.05,0.01,0.01,265.0,0.38,2023-01-01,2023-01-19,2023-01-23
18,SOLUSDT,SANDUSDT,0.98,0.0,0.01,3.58,1.24,1249.0,0.49,2189,2348,0.97,0.0,0.02,-0.08,0.02,264.0,0.48,2023-01-01,2023-01-19,2023-01-23
21,SOLUSDT,DOTUSDT,0.98,0.0,0.01,-1.05,2.36,1024.0,0.46,2189,2337,0.98,0.0,0.01,-0.05,0.02,203.0,0.45,2023-01-01,2023-01-19,2023-01-23
26,XRPUSDT,LTCUSDT,0.93,0.0,0.01,-4.33,0.76,887.0,0.47,2458,2313,0.96,0.0,0.0,0.04,0.01,143.0,0.41,2023-01-01,2023-01-19,2023-01-23
27,XRPUSDT,MATICUSDT,0.96,0.0,0.01,-0.94,0.56,718.0,0.46,2458,2409,0.94,0.0,0.02,0.03,0.01,305.0,0.4,2023-01-01,2023-01-19,2023-01-23
28,XRPUSDT,SANDUSDT,0.92,0.0,0.03,-0.86,0.25,1230.0,0.49,2458,2348,0.89,0.0,0.05,0.01,0.01,404.0,0.41,2023-01-01,2023-01-19,2023-01-23


In [None]:
unique_coins = pd.concat([
    cointegrated_pairs[['coin1', 'in_sample_coin1_lookback']].rename(columns={'coin1': 'coin', 'in_sample_coin1_lookback': 'lookback'}),
    cointegrated_pairs[['coin2', 'in_sample_coin2_lookback']].rename(columns={'coin2': 'coin', 'in_sample_coin2_lookback': 'lookback'})
], axis=0).drop_duplicates()

unique_coins = unique_coins.groupby('coin').agg({'lookback': 'min'}).reset_index()
unique_coins

Unnamed: 0,coin,lookback
0,ADAUSDT,2448
1,ATOMUSDT,2449
2,BTCUSDT,2423
3,DOGEUSDT,2461
4,DOTUSDT,2337
5,ETHUSDT,2443
6,JASMYUSDT,2161
7,LINKUSDT,2438
8,LTCUSDT,2313
9,LUNCUSDT,2152


In [None]:
unique_coins.shape

(17, 2)

In [None]:
cointegrated_pairs.shape

(27, 21)