In [1]:
%pip install optuna kneed



In [2]:
import gc
import json
import optuna
import joblib
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from google.colab import drive
from joblib import Parallel, delayed
from scipy.stats import pearsonr, zscore
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
from xgboost import XGBRegressor
from sklearn.model_selection import TimeSeriesSplit
from kneed import KneeLocator
from sklearn.metrics import silhouette_score
from scipy.cluster.hierarchy import linkage, fcluster, dendrogram
from scipy.spatial.distance import squareform
from collections import defaultdict

In [3]:
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [4]:
class DataLoader:
  def __init__(self, data_path, file_name):
    self.data_path = data_path
    self.file_name = file_name

  def load(self):
    df = pd.read_parquet(self.data_path + self.file_name)
    self.df = df.loc[:, ~((df == -np.inf).any() | (df == 0).all())]
    print('Data loaded')
    return self

  def split_X_qty_label(self):
    label = self.df['label']
    qty_vol = self.df[['bid_qty', 'ask_qty', 'buy_qty', 'sell_qty', 'volume']]
    X_ = self.df.drop(['label', 'bid_qty', 'ask_qty', 'buy_qty', 'sell_qty', 'volume'], axis=1)
    return X_, qty_vol, label

In [5]:
class FeatureGenerator:

  def __init__(self, df, epsilon=1e-6):
    self.df = df
    self.epsilon = epsilon

  def develop_features_from_qty_volume(self):
    '''
      Function to develop features from bid_qty, ask_qty, buy_qty, sell_qty
      and volume columns.
    '''
    imbalance = (self.df['bid_qty'] - self.df['ask_qty']) / (self.df['bid_qty'] + self.df['ask_qty'] + self.epsilon)
    buy_sell_ratio = np.log1p(self.df['buy_qty'] / (self.df['sell_qty'] + self.epsilon))
    volume_z = zscore(self.df['volume'])

    return pd.DataFrame({
        'imbalance': imbalance,
        'buy_sell_ratio': buy_sell_ratio,
        'volume_z': volume_z
    }, index=self.df.index)

  def standardize_columns(self, columns):
    '''Function to standardize specific columns wihtin a df.'''
    scaler = StandardScaler()
    df_scaled = scaler.fit_transform(self.df[columns])
    df_scaled = pd.DataFrame(df_scaled, columns=columns, index=self.df.index)
    return pd.concat([df_scaled, self.df['label']], axis=1)

  def standardize_df(self):
    '''Function to standardize a whole df.'''
    scaler = StandardScaler()
    df_scaled = scaler.fit_transform(self.df)
    return pd.DataFrame(df_scaled, columns=self.df.columns, index=self.df.index)

In [6]:
class CorrelationAnalyzer:
  def __init__(self, Y_column='label'):
    self.Y_column = Y_column

  def _correlation(self, df, feature):
    '''
    Calcualte the Pearson correlation and p-value
    for a single feature and Y
    '''
    r, p = pearsonr(df[feature], df[self.Y_column])
    return feature, r, p

  def compute_correlations_in_window(self, df, features, window='3D'):
    '''
    Calcualte the Pearson correlation and p-value
    for each window between each feature and Y
    '''
    grouped = df.groupby(pd.Grouper(freq=window))
    results = []
    for period_start, df_window in grouped:
      output = Parallel(n_jobs=-1)(
          delayed(self._correlation)(df_window, f) for f in features
          )
      for feature, r, p in output:
        results.append({
            'period_start': period_start,
            'feature': feature,
            'correlation': r,
            'p_value': p
            })

    return pd.DataFrame(results)

  def score_features(self, df_correlations, epsilon=1e-6):
    '''
    Score features based on their correlation with Y as a:
      base_score:
        correlation_mean / correlation_std

      adjusted_score:
        base_score * (1 / p_value_mean)
    '''
    summary = df_correlations.groupby('feature').agg({
        'correlation': ['mean', 'std'],
        'p_value': 'mean'
        })

    summary['base_score'] = (
        abs(summary[('correlation', 'mean')]) / (summary[('correlation', 'std')]
                                                 + epsilon)
    )

    summary['adjusted_score'] = (summary['base_score'] *
     (1 / summary[('p_value', 'mean')] + epsilon))

    return summary

  def select_top_features(self, summary, top_k=55, score='adjusted_score'):
    sorted_summary = summary.sort_values(score, ascending=False)
    top_k_df = sorted_summary.head(top_k)
    return top_k_df.index.tolist()

In [7]:
class LagCreator:
  def __init__(self, lag_periods = 5):
    self.lag_periods = lag_periods

  def lag_features(self, X):

    lagged_columns = []
    for col in X.columns:
        for lag in range(1, self.lag_periods + 1):
          shifted = X[col].shift(lag)
          shifted.name = f"{col}_lag{lag}"
          lagged_columns.append(shifted)

    lagged = pd.concat(lagged_columns, axis=1)
    return lagged.dropna()

  def match_index_lags(self, X_lagged, Y):
    '''Y is series with a single column'''
    y_lagged = Y.loc[X_lagged.index]

    return y_lagged


In [18]:
class CorrelationBatches:
  def __init__(self, df, lags, Y):
    self.Y_column = Y
    self.df = df
    self.lags = lags

  def _lag_column(self, column_df, feature_name, l):
    '''Create a lagged version of a given feature column.'''
    lagged_col = column_df.shift(1).dropna()
    lagged_col.name = f"{feature_name}_lag{l}"
    return lagged_col

  def _standardize_col(self, column_df):
    '''Standardize a feature column'''
    scaler = StandardScaler()
    scaled_col = scaler.fit_transform(column_df.values.reshape(-1, 1))
    scaled_col = pd.Series(scaled_col.flatten(), index=column_df.index, name=column_df.name)
    return scaled_col

  def _col_label_corr(self, period_start, column_df):
    '''Compute Pearson correlation between a feature column and the target column.'''
    y = self.Y_column.loc[column_df.index]
    corr, p = pearsonr(column_df, y)
    return column_df.name, period_start, corr, p

  def feature_correlation(self, feature):
    '''
    Compute correlations of a feature and its lagged versions with the target column.
    Each call standardizes the values, then calculates the Pearson correlation
    and p-value accross windows of 3 days.
    Then it creates lagged versions of the feature and repeats the process.
    Returns:
      list: A list of three lists containing:
        [0] Feature names (including lags),
        [1] Correlation coefficients,
        [2] Corresponding p-values.

    '''
    feature_correlations = []
    feature_df = self.df[feature]
    scaled_feature = self._standardize_col(feature_df)

    grouped = scaled_feature.groupby(pd.Grouper(freq='3D'))

    output = Parallel(n_jobs=-1)(
        delayed(self._col_label_corr)(period_start, df_g) for period_start, df_g in grouped
        )

    feature_correlations.append(output)

    ft_name = feature_df.name

    for lag in range(1, self.lags + 1):
      feature_df = self._lag_column(feature_df, ft_name, lag)
      scaled_feature = self._standardize_col(feature_df)

      grouped = scaled_feature.groupby(pd.Grouper(freq='3D'))

      output = Parallel(n_jobs=-1)(
          delayed(self._col_label_corr)(period_start, df_g) for period_start, df_g in grouped
          )

      feature_correlations.append(output)

    return feature_correlations

In [9]:
# Features
# Group 1 - X1, X2 ....
# Group 2 - _qty, vol

In [10]:
# Filepaths
dataPath = '/content/drive/MyDrive/Colab Notebooks/DRW/data/'
fileNameTrain = 'train.parquet'

In [12]:
# Load train data and remove -inf columns and columns full of 0
loader = DataLoader(dataPath, fileNameTrain)
features_group1, features_group2, label = loader.load().split_X_qty_label()
del loader
gc.collect()

Data loaded


27

In [None]:
# Calculate correlation in windows of 3 days
# between each feature, its lags and labe
results = {}
columns = features_group1.columns
count = 1
for col in columns:
  correlation_in_bacthes = CorrelationBatches(features_group1, 5, label)
  out = correlation_in_bacthes.feature_correlation(col)

  results[col] = out
  print(f'Processed {count} out of {len(columns)}')
  count += 1

Processed 1 out of 863
Processed 2 out of 863
Processed 3 out of 863
Processed 4 out of 863
Processed 5 out of 863
Processed 6 out of 863
Processed 7 out of 863
Processed 8 out of 863
Processed 9 out of 863
Processed 10 out of 863
Processed 11 out of 863
Processed 12 out of 863
Processed 13 out of 863
Processed 14 out of 863
Processed 15 out of 863
Processed 16 out of 863
Processed 17 out of 863
Processed 18 out of 863
Processed 19 out of 863
Processed 20 out of 863
Processed 21 out of 863
Processed 22 out of 863
Processed 23 out of 863
Processed 24 out of 863
Processed 25 out of 863
Processed 26 out of 863
Processed 27 out of 863
Processed 28 out of 863
Processed 29 out of 863
Processed 30 out of 863
Processed 31 out of 863
Processed 32 out of 863
Processed 33 out of 863
Processed 34 out of 863
Processed 35 out of 863
Processed 36 out of 863
Processed 37 out of 863
Processed 38 out of 863
Processed 39 out of 863
Processed 40 out of 863
Processed 41 out of 863
Processed 42 out of 863
P

In [22]:
with open(dataPath + 'results_correlation6.json', 'w') as f:
  json.dump(results, f)

[('X1_lag5', Timestamp('2023-03-01 00:00:00'), np.float64(-0.10913781937205035), np.float64(6.549692452999619e-13)), ('X1_lag5', Timestamp('2023-03-04 00:00:00'), np.float64(-0.026471443605183664), np.float64(0.08191484648624336)), ('X1_lag5', Timestamp('2023-03-07 00:00:00'), np.float64(0.03826253558900235), np.float64(0.011901090556599324)), ('X1_lag5', Timestamp('2023-03-10 00:00:00'), np.float64(-0.07067511447370398), np.float64(3.323970261134287e-06)), ('X1_lag5', Timestamp('2023-03-13 00:00:00'), np.float64(-0.08625417896052534), np.float64(1.3616185049561066e-08)), ('X1_lag5', Timestamp('2023-03-16 00:00:00'), np.float64(-0.042595578975492175), np.float64(0.005108238442889498)), ('X1_lag5', Timestamp('2023-03-19 00:00:00'), np.float64(0.04364136036407596), np.float64(0.0041185040941400155)), ('X1_lag5', Timestamp('2023-03-22 00:00:00'), np.float64(0.027924124408820986), np.float64(0.06647792105142478)), ('X1_lag5', Timestamp('2023-03-25 00:00:00'), np.float64(0.04591892154455668

In [None]:
# Lag X_ features
lagger = LagCreator()
features_group1 = lagger.lag_features(features_group1)

In [10]:
# Develop features from qty_vol
# Modified group2: imbalance, buy_sell_ratio, volume_z
features_group2 = FeatureGenerator(features_group2).develop_features_from_qty_volume()

In [11]:
# Lag modified group2
features_group2 = lagger.lag_features(features_group2)

In [12]:
# Standardize lagged group2 features
features_group2 = FeatureGenerator(features_group2).standardize_df()