In [6]:
import gc  # Garbage collection for memory management
import os  # Operating system-related functions
import time  # Time-related functions
import warnings  # Handling warnings
from itertools import combinations  # For creating combinations of elements
from warnings import simplefilter  # Simplifying warning handling   
import matplotlib.pyplot as plt

# 📦 Importing machine learning libraries
import joblib  # For saving and loading models
import lightgbm as lgb  # LightGBM gradient boosting framework
import numpy as np  # Numerical operations
import pandas as pd  # Data manipulation and analysis
from sklearn.metrics import mean_absolute_error  # Metric for evaluation
from sklearn.model_selection import KFold, TimeSeriesSplit  # Cross-validation techniques

# 🤐 Disable warnings to keep the code clean
warnings.filterwarnings("ignore")
simplefilter(action="ignore", category=pd.errors.PerformanceWarning)

# 📊 Define flags and variables
is_offline = False  # Flag for online/offline mode
is_train = True  # Flag for training mode
is_infer = True  # Flag for inference mode
max_lookback = np.nan  # Maximum lookback (not specified)
split_day = 435  # Split day for time series data 

In [2]:
df = pd.read_csv('/home/shi/Documents/Trading/Kaggle/train.csv')
df = df.dropna(subset=["target"])
df.reset_index(drop=True, inplace=True)
df.shape

# test data starts at day 478 

(5237892, 17)

In [3]:
df.head()

Unnamed: 0,stock_id,date_id,seconds_in_bucket,imbalance_size,imbalance_buy_sell_flag,reference_price,matched_size,far_price,near_price,bid_price,bid_size,ask_price,ask_size,wap,target,time_id,row_id
0,0,0,0,3180602.69,1,0.999812,13380276.64,,,0.999812,60651.5,1.000026,8493.03,1.0,-3.029704,0,0_0_0
1,1,0,0,166603.91,-1,0.999896,1642214.25,,,0.999896,3233.04,1.00066,20605.09,1.0,-5.519986,0,0_0_1
2,2,0,0,302879.87,-1,0.999561,1819368.03,,,0.999403,37956.0,1.000298,18995.0,1.0,-8.38995,0,0_0_2
3,3,0,0,11917682.27,-1,1.000171,18389745.62,,,0.999999,2324.9,1.000214,479032.4,1.0,-4.0102,0,0_0_3
4,4,0,0,447549.96,-1,0.999532,17860614.95,,,0.999394,16485.54,1.000016,434.1,1.0,-7.349849,0,0_0_4


## Calculate rolling features

In [None]:
def calculate_rolling_features(input_df):
    window_rsi=14
    rsi_ma_window = 14
    window_kd=14
    smooth_d=3
    min_lag = 1
    short_MACD_window = 6
    long_MACD_window = 13
    signal_MACD_window = 5
    
    # for historical volatility, do it cross day
    
    # Group by date_id and calculate rolling values within each group
    grouped = input_df.groupby('date_id')
    
    # Calculate price changes
    input_df['delta'] = grouped['reference_price'].transform(lambda x: x.diff())
    
    # Separate gains (positive changes) and losses (negative changes)
    input_df['gains'] = input_df['delta'].where(input_df['delta'] > 0, 0)
    input_df['losses'] = -input_df['delta'].where(input_df['delta'] < 0, 0)

    # Calculate average gains and average losses over the specified window
    input_df['avg_gains'] = grouped['gains'].transform(lambda x: x.rolling(window=window_rsi, min_periods=1).mean())
    input_df['avg_losses'] = grouped['losses'].transform(lambda x: x.rolling(window=window_rsi, min_periods=1).mean())

    # Calculate relative strength (RS)
    input_df['rs'] = input_df['avg_gains'] / input_df['avg_losses']
    
    # Calculate the RSI index
    input_df['rsi'] = 100 - (100 / (1 + input_df['rs']))
    
    # Calculate the moving average of RSI
    input_df['rsi_ma'] = grouped['rsi'].transform(lambda x: x.rolling(window=rsi_ma_window, min_periods=1).mean())

    # Calculate rsi rsi_ma difference
    input_df['rsi_rsi_ma_diff'] = input_df['rsi'] - input_df['rsi_ma']
    
    #################################################
    # Calculate Stochastic Oscillator (%K and %D) for a given stock.
    input_df['low_min'] = grouped['reference_price'].transform(lambda x: x.rolling(window=window_kd, min_periods=1).min())
    input_df['high_max'] = grouped['reference_price'].transform(lambda x: x.rolling(window=window_kd, min_periods=1).max())

    # Calculate %K
    input_df['percentK'] = ((input_df['reference_price'] - input_df['low_min']) / 
                        (input_df['high_max'] - input_df['low_min']) * 100)
    
    # Smooth %K to get %D common choices of smooth_d are 3 or 5. Default is 3
    input_df['percentD'] = grouped['percentK'].transform(lambda x: x.rolling(window=smooth_d, min_periods=1).max())

    # Calculate KD difference
    input_df['KD_diff'] = input_df['percentK'] - input_df['percentD']

    ###############################################
    # MACD
    # default period set to short: 6, long: 13. Standard choices are 12 and 26, but may be too long... only 10 mins for each day, ~ 50 data points
    # Calculate short-term EMA 
    input_df['short_ema'] = grouped['reference_price'].transform(lambda x: x.ewm(span=short_MACD_window, adjust=False, min_periods=1).mean())
    # Calculate long-term EMA
    input_df['long_ema'] = grouped['reference_price'].transform(lambda x: x.ewm(span=long_MACD_window, adjust=False, min_periods=1).mean())
    
    # Calculate MACD line
    input_df['MACD'] = input_df['short_ema'] - input_df['long_ema']
    
    # Calculate signal line
    input_df['Signal_Line'] = grouped['MACD'].transform(lambda x: x.ewm(span=signal_MACD_window, adjust=False, min_periods=1).mean())

    # Calculate MACD_signal difference
    input_df['MACD_Signal_Diff'] = input_df['MACD'] - input_df['Signal_Line']

    
    ###################################
    # Bollings band
    # feel it is not a good one to use... hate this indicator

    ###################################
    # rolling feature of Optimal order book spread
    

    # features to be dropped ['delta', 'gains', 'losses', 'avg_gains', 'avg_losses', 'rs', 'short_ema', 'long_ema', ]
    
    # lagged features (only do lagged feature for prices, for other rolling features, use derivatives)
    prices = ['reference_price','far_price', 'near_price', 'ask_price', 'bid_price', 'wap']
    for a in prices:
        input_df[f'{a}_lag_by_{min_lag}'] = grouped[a].transform(lambda x: x.shift(min_lag))
        input_df[f'{a}_lag_by_{min_lag+1}'] = grouped[a].transform(lambda x: x.shift(min_lag+1))
        input_df[f'{a}_lag_by_{min_lag+2}'] = grouped[a].transform(lambda x: x.shift(min_lag+2))

    
    otherLaggedFeatures = ['rsi', 'rsi_ma', 'rsi_rsi_ma_diff', 'percentK', 'percentD', 'KD_diff', 'MACD', 
                           'Signal_Line', 'MACD_Signal_Diff']
    for a in prices:
        input_df[f'{a}_1st_derivative'] = grouped[a].transform(lambda x: x.diff())
        input_df[f'{a}_2nd_derivative'] = grouped[f'{a}_1st_derivative'].transform(lambda x: x.diff())
                           
    return input_df


In [None]:
from concurrent.futures import ThreadPoolExecutor
import threading
from tqdm import tqdm

lock = threading.Lock()

def process_stock_date_pair(stock_id, input_df):
    stock_data = input_df[input_df['stock_id'] == stock_id].copy()
    calculate_rsi_stochastic_oscillator(stock_data, window_rsi=14, rsi_ma_window=14, window_kd=14, smooth_d=3)
    return stock_id, stock_data[['rsi', 'rsi_ma', 'percentK', 'percentD']]

def update_train_dataset(stock_id, result_data, input_df):
    with lock:
        mask = (input_df['stock_id'] == stock_id)
        input_df.loc[mask, ['rsi', 'rsi_ma', 'percentK', 'percentD']] = result_data.values

def parallelize_processing(unique_stock_ids, input_df):
    results = []

    with ThreadPoolExecutor() as executor:
        # Create a list of tuples containing unique_stock_ids
        combinations = [(stock_id, input_df) for stock_id in unique_stock_ids]

        # Use executor.map to get results in the order they were submitted
        results = list(tqdm(executor.map(lambda stock_id: process_stock_date_pair(stock_id, input_df), unique_stock_ids), total=len(unique_stock_ids)))

        print("Processing completed. Starting result update.")

        # Update input_df with the results
        list(tqdm(executor.map(lambda args: update_train_dataset(*args, input_df),
                               [(stock_id, result_data) for stock_id, result_data in results]),
                  total=len(results)))


# Get unique stock and date IDs
unique_stock_ids = df['stock_id'].unique()

# Parallelize the processing
parallelize_processing(unique_stock_ids, df)

# drop useless columns
useless_columns = ['gains', 'losses', 'avg_gains', 'avg_losses', 'rs', 'short_ema', 'long_ema', 

In [None]:
## simulate synthetic index

In [None]:
weights = [
    0.004, 0.001, 0.002, 0.006, 0.004, 0.004, 0.002, 0.006, 0.006, 0.002, 0.002, 0.008,
    0.006, 0.002, 0.008, 0.006, 0.002, 0.006, 0.004, 0.002, 0.004, 0.001, 0.006, 0.004,
    0.002, 0.002, 0.004, 0.002, 0.004, 0.004, 0.001, 0.001, 0.002, 0.002, 0.006, 0.004,
    0.004, 0.004, 0.006, 0.002, 0.002, 0.04 , 0.002, 0.002, 0.004, 0.04 , 0.002, 0.001,
    0.006, 0.004, 0.004, 0.006, 0.001, 0.004, 0.004, 0.002, 0.006, 0.004, 0.006, 0.004,
    0.006, 0.004, 0.002, 0.001, 0.002, 0.004, 0.002, 0.008, 0.004, 0.004, 0.002, 0.004,
    0.006, 0.002, 0.004, 0.004, 0.002, 0.004, 0.004, 0.004, 0.001, 0.002, 0.002, 0.008,
    0.02 , 0.004, 0.006, 0.002, 0.02 , 0.002, 0.002, 0.006, 0.004, 0.002, 0.001, 0.02,
    0.006, 0.001, 0.002, 0.004, 0.001, 0.002, 0.006, 0.006, 0.004, 0.006, 0.001, 0.002,
    0.004, 0.006, 0.006, 0.001, 0.04 , 0.006, 0.002, 0.004, 0.002, 0.002, 0.006, 0.002,
    0.002, 0.004, 0.006, 0.006, 0.002, 0.002, 0.008, 0.006, 0.004, 0.002, 0.006, 0.002,
    0.004, 0.006, 0.002, 0.004, 0.001, 0.004, 0.002, 0.004, 0.008, 0.006, 0.008, 0.002,
    0.004, 0.002, 0.001, 0.004, 0.004, 0.004, 0.006, 0.008, 0.004, 0.001, 0.001, 0.002,
    0.006, 0.004, 0.001, 0.002, 0.006, 0.004, 0.006, 0.008, 0.002, 0.002, 0.004, 0.002,
    0.04 , 0.002, 0.002, 0.004, 0.002, 0.002, 0.006, 0.02 , 0.004, 0.002, 0.006, 0.02,
    0.001, 0.002, 0.006, 0.004, 0.006, 0.004, 0.004, 0.004, 0.004, 0.002, 0.004, 0.04,
    0.002, 0.008, 0.002, 0.004, 0.001, 0.004, 0.006, 0.004,
]

## Memory reduction

In [3]:
def reduce_mem_usage(input_df, verbose=0):
    """
    Iterate through all numeric columns of a dataframe and modify the data type
    to reduce memory usage.
    """

    start_mem = input_df.memory_usage().sum() / 1024**2
    
    for col in input_df.columns:
        col_type = input_df[col].dtype

        if col_type != object:
            # grab the minimum and maximum values in the column 
            c_min = input_df[col].min()
            c_max = input_df[col].max()
            # Depending on the range of values, 
            # it converts the column to the smallest integer data type that can accommodate the data while reducing memory usage. 
            # It checks for int8, int16, int32, and int64 data types based on the data range.
            if str(col_type)[:3] == "int":
                if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max:
                    input_df[col] = input_df[col].astype(np.int8)
                elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max:
                    input_df[col] = input_df[col].astype(np.int16)
                elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max:
                    input_df[col] = input_df[col].astype(np.int32)
                elif c_min > np.iinfo(np.int64).min and c_max < np.iinfo(np.int64).max:
                    input_df[col] = input_df[col].astype(np.int64)
            else:
                if c_min > np.finfo(np.float16).min and c_max < np.finfo(np.float16).max:
                    input_df[col] = input_df[col].astype(np.float32)
                elif c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max:
                    input_df[col] = input_df[col].astype(np.float32)
                else:
                    input_df[col] = input_df[col].astype(np.float32)
    # If verbose is set to a truthy value (e.g., 1), it provides information about memory optimization, 
    # including the initial and final memory usage, and the percentage reduction in memory usage.
    if verbose:
        logger.info(f"Memory usage of dataframe is {start_mem:.2f} MB")
        end_mem = input_df.memory_usage().sum() / 1024**2
        logger.info(f"Memory usage after optimization is: {end_mem:.2f} MB")
        decrease = 100 * (start_mem - end_mem) / start_mem
        logger.info(f"Decreased by {decrease:.2f}%")

    return input_df
