---

## 0. requirements

In [1]:
# !pip install --target=/home/<user_name>/<venv_name>/lib/python3.10/site-packages <package_name>

## 1. config 설정

#### 1.1. init config

In [57]:
MODE = "inference"  # train, inference, both
KAGGLE_DATASET_NAME = "model-version-52"

is_train = False
is_infer = True
is_pre_test = False

INFER_USE_ML = True

INFER_USE_TF = True

INFER_USE_CNN = True

N_Folds = 5

In [39]:
import gc
import os
import time
import warnings
from itertools import combinations
from warnings import simplefilter
import functools
import time
from numba import njit, prange
import numba
import pyarrow.parquet as pq
from tqdm import tqdm
import glob
import polars as pl
import datetime

import joblib
import lightgbm as lgb
import xgboost as xgb
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import optuna
from functools import partial
from sklearn.metrics import mean_absolute_error
from sklearn.model_selection import KFold, TimeSeriesSplit
from sklearn.preprocessing import LabelEncoder

from typing import Dict, List, Optional, Tuple
from sklearn.preprocessing import LabelEncoder, StandardScaler, MinMaxScaler,PowerTransformer, FunctionTransformer,minmax_scale, QuantileTransformer
from sklearn.decomposition import PCA,TruncatedSVD,LatentDirichletAllocation
from sklearn.neighbors import KNeighborsClassifier,NearestNeighbors
from sklearn.impute import KNNImputer
import traceback
from contextlib import contextmanager
import seaborn as sns
sns.set(style='darkgrid', font_scale=1.4)

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, TensorDataset, Subset, random_split
from torch.optim.lr_scheduler import ReduceLROnPlateau

from itertools import combinations


pd.set_option('display.max_rows', None)

warnings.filterwarnings("ignore")
simplefilter(action="ignore", category=pd.errors.PerformanceWarning)

#### 1.2. train / inference config

In [4]:
lgb.__version__, xgb.__version__

('3.3.2', '2.0.2')

In [5]:
EPS = 1e-10

In [6]:
if MODE == "train":
    print("You are in train mode")
    model_directory = "./models/" + time.strftime("%Y%m%d_%H:%M:%S", time.localtime(time.time() + 9 * 60 * 60))
    data_directory = "./data"
    train_mode = True
    infer_mode = False
elif MODE == "inference":
    print("You are in inference mode")
    model_directory = f'/kaggle/input/{KAGGLE_DATASET_NAME}'
    data_directory = "/kaggle/input/optiver-trading-at-the-close"
    train_mode = False
    infer_mode = True
elif MODE == "both":
    print("You are in both mode")
    model_directory = f'/kaggle/working/'
    data_directory = "/kaggle/input/optiver-trading-at-the-close"
    train_mode = True
    infer_mode = True
else:
    raise ValueError("Invalid mode")

You are in inference mode


#### 1.3. model config

In [7]:
config = {
    ### default config
    "data_dir": data_directory,
    "model_dir": model_directory,
    "train_mode": train_mode,  # True : train, False : not train
    "infer_mode": infer_mode,  # True : inference, False : not inference

    ### model config
    "model_name": ["lgb_b"],  # model name
    "stacking_mode": False,  # stacking mode or not (single model도 split되면 그걸로 stacking)
    "stacking_algorithm": None,  # "optuna",  # or None

    "target": "target",

    ### model hyperparameter
    "optuna_random_state": 42,

    ### cv hyperparameter
    "split_method": "purged",  # time_series, rolling, blocking, holdout
    "n_splits": 5,  # number of splits
    "correct": True,  # correct boundary
    "gap": 0.05,  # gap between train and test (0.05 = 5% of train size)
    "initial_fold_size_ratio": 0.4,  # initial fold size ratio
    "train_test_ratio": 0.95,  # train, test ratio
}

In [8]:
if config["stacking_mode"] == True and config["n_splits"] == 1:
    raise ValueError("stacking mode is True but n_splits is 1, cannot stacking")
if config["stacking_mode"] == False and config["stacking_algorithm"] is not None:
    raise ValueError("stacking mode is False but stacking_algorithm is not None, impossible")
if config["stacking_mode"] == True and config["stacking_algorithm"] is None:
    raise ValueError("stacking mode is True but stacking_algorithm is None, impossible")

#### 1.4. model heyperparameter config

In [9]:
models_config = {
    "lgb": {
        "model": lgb.LGBMRegressor,
        "params": {
            "objective": "mae",
            "n_estimators": 9999,  # 2040
            "num_leaves": 126,
            "subsample": 0.7628752081565437,
            "colsample_bytree": 0.6380919043232433,
            "learning_rate": 0.01795041572109495,
            "n_jobs": 4,
            "device": "gpu",
            "verbosity": -1,
            "importance_type": "gain",
        },
    },
    "lgb_b": {
        "model": lgb.LGBMRegressor,
        "params": {
            "objective": "mae",
            "n_estimators": 6000,
            "num_leaves": 256,
            "subsample": 0.6,
            "colsample_bytree": 0.8,
            "learning_rate": 0.00871,
            'max_depth': 11,
            "n_jobs": 4,
            "device": "gpu",
            "verbosity": -1,
            "importance_type": "gain",
            "reg_alpha": 0.1,
            "reg_lambda": 3.25
        },
    },
    "xgb": {
        "model": xgb.XGBRegressor,
        "params": {
            "objective": "reg:linear",
            "n_estimators": 9999,  # 2400
            "max_depth": 14,
            "eta": 0.0073356282482453065,
            "subsample": 0.9,
            "colsample_bytree": 0.30000000000000004,
            "colsample_bylevel": 0.9,
            "min_child_weight": 0.4824060812428942,
            "reg_lambda": 182.50819193990537,
            "reg_alpha": 0.03171419713574529,
            "gamma": 0.9162634503670075,
            "tree_method": "gpu_hist",
            "n_jobs": 4,
            "verbosity": 0,
        },
    },
}

In [10]:
if MODE == "train":
    if not os.path.exists(config["model_dir"]):
        os.makedirs(config["model_dir"])
    if not os.path.exists(config["data_dir"]):
        os.makedirs(config["data_dir"])
    !kaggle competitions download optiver-trading-at-the-close -p {config["data_dir"]} --force
    !unzip -o {config["data_dir"]} /optiver-trading-at-the-close.zip -d {config["data_dir"]}
    !rm {config["data_dir"]} /optiver-trading-at-the-close.zip

# ## Global Method

In [11]:
def reduce_mem_usage(df, verbose=0):
    """
    Iterate through all numeric columns of a dataframe and modify the data type
    to reduce memory usage.
    """

    start_mem = df.memory_usage().sum() / 1024 ** 2

    for col in df.columns:
        col_type = df[col].dtype

        if col_type != object:
            c_min = df[col].min()
            c_max = df[col].max()
            if str(col_type)[:3] == "int":
                if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max:
                    df[col] = df[col].astype(np.int8)
                elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max:
                    df[col] = df[col].astype(np.int16)
                elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max:
                    df[col] = df[col].astype(np.int32)
                elif c_min > np.iinfo(np.int64).min and c_max < np.iinfo(np.int64).max:
                    df[col] = df[col].astype(np.int64)
            else:
                if c_min > np.finfo(np.float16).min and c_max < np.finfo(np.float16).max:
                    df[col] = df[col].astype(np.float32)
                elif c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max:
                    df[col] = df[col].astype(np.float32)
                else:
                    df[col] = df[col].astype(np.float32)

    return df

In [12]:
@njit(parallel=True)
def compute_triplet_imbalance(df_values, comb_indices):
    """
    Calculate the triplet imbalance for each row in the DataFrame.
    :param df_values: 
    :param comb_indices: 
    :return: 
    """
    num_rows = df_values.shape[0]
    num_combinations = len(comb_indices)
    imbalance_features = np.empty((num_rows, num_combinations))

    for i in prange(num_combinations):
        a, b, c = comb_indices[i]
        for j in range(num_rows):
            max_val = max(df_values[j, a], df_values[j, b], df_values[j, c])
            min_val = min(df_values[j, a], df_values[j, b], df_values[j, c])
            mid_val = df_values[j, a] + df_values[j, b] + df_values[j, c] - min_val - max_val
            if mid_val == min_val:  # Prevent division by zero
                imbalance_features[j, i] = np.nan
            else:
                imbalance_features[j, i] = (max_val - mid_val) / (mid_val - min_val + EPS)

    return imbalance_features


def calculate_triplet_imbalance_numba(price, df):
    """
    Calculate the triplet imbalance for each row in the DataFrame.
    :param price: 
    :param df: 
    :return: 
    """
    # Convert DataFrame to numpy array for Numba compatibility
    df_values = df[price].values
    comb_indices = [(price.index(a), price.index(b), price.index(c)) for a, b, c in combinations(price, 3)]

    # Calculate the triplet imbalance
    features_array = compute_triplet_imbalance(df_values, comb_indices)

    # Create a DataFrame from the results
    columns = [f"{a}_{b}_{c}_imb2" for a, b, c in combinations(price, 3)]
    features = pd.DataFrame(features_array, columns=columns)

    return features

In [13]:
def print_log(message_format):
    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            # self 확인: 첫 번째 인자가 클래스 인스턴스인지 확인합니다.
            if args and hasattr(args[0], 'infer'):
                self = args[0]

                # self.infer가 False이면 아무 것도 출력하지 않고 함수를 바로 반환합니다.
                if self.infer:
                    return func(*args, **kwargs)

            start_time = time.time()
            result = func(*args, **kwargs)
            end_time = time.time()

            elapsed_time = end_time - start_time

            if result is not None:
                data_shape = getattr(result, 'shape', 'No shape attribute')
                shape_message = f", shape({data_shape})"
            else:
                shape_message = ""

            print(f"\n{'-' * 100}")
            print(message_format.format(func_name=func.__name__, elapsed_time=elapsed_time) + shape_message)
            print(f"{'-' * 100}\n")

            return result

        return wrapper

    return decorator


In [14]:
def zero_sum(prices, volumes):
    std_error = np.sqrt(volumes)
    step = np.sum(prices) / np.sum(std_error)
    out = prices - std_error * step
    return out

In [15]:
def custom_pct_change(series, window=1, epsilon=1e-10):
    return (series.diff(window) / (series.shift(window) + epsilon)).reset_index(drop=True)

In [16]:
all_stock_data = {}
for s in tqdm(glob.glob("./data/alpha/*.csv") if MODE == "train" else glob.glob(
        "/kaggle/input/nasdaq-stocks-historical-data/alpha/*.csv"), desc="Processing files"):
    stock_df = pd.read_csv(s, dtype={"ticker": str})
    stock_df.query("Date >= '2021-08-05' and Date <= '2023-07-06'", inplace=True)
    if len(stock_df) > 180:
        all_stock_data[s[13:-15]] = (stock_df, len(stock_df))

reversed_stock_list = [
    'MNST', 'WING', 'AXON', 'HON', 'MAR', 'OKTA', 'POOL', 'LRCX', 'YOTA', 'PFG',
    'NDAQ', 'COIN', 'AMGN', 'TER', 'ADBE', 'ABNB', 'ZBRA', 'KLAC', 'ZI', 'ALNY',
    'ULTA', 'SSNC', 'ON', 'SWKS', 'AKAM', 'ASML', 'PPBI', 'QRVO', 'FANG', 'ORLY',
    'LNT', 'AGRX', 'NTAP', 'CROX', 'REGN', 'ROST', 'DLTR', 'ADP', 'EMCG', 'CTAS',
    'CZR', 'NVDA', 'SAIA', 'JKHY', 'FOSLL', 'MSFT', 'TECH', 'TXRH', 'WDAY', 'FITB',
    'MTCH', 'ROKU', 'CINF', 'EBAY', 'SNPS', 'FAST', 'ETSY', 'IDXX', 'INTU', 'ZG',
    'CRWD', 'LYFT', 'RGEN', 'LKQ', 'MKTX', 'EXC', 'LBRDK', 'MRNA', 'PAYX', 'SOFI',
    'BYND', 'EQIX', 'ADI', 'GEN', 'ALGN', 'CDNS', 'HAS', 'VRTX', 'HOOD', 'WBD',
    'TXG', 'SGEN', 'OPEN', 'INTC', 'GOOG', 'CAR', 'UPST', 'LSCC', 'NFLX', 'ENTG',
    'FFIV', 'DOCU', 'MSTR', 'ZION', 'PCTY', 'AMD', 'MRVL', 'NBIX', 'JBLU', 'PARA',
    'MQ', 'FCNCA', 'TEAM', 'ZS', 'WBA', 'MDLZ', 'TRMB', 'PODD', 'SEDG', 'CSX',
    'TMUS', 'SPWR', 'AAPL', 'LULU', 'LPLA', 'ILMN', 'CDW', 'GDS', 'MELI', 'MASI',
    'FOXA', 'KDP', 'AAL', 'GILD', 'ASO', 'UTHR', 'MU', 'MDB', 'WDC', 'CFLT',
    'SBUX', 'INCY', 'TSCO', 'ISRG', 'VTRS', 'DKNG', 'LITE', 'TTWO', 'SMCI', 'EXPE',
    'VRTS', 'AMAT', 'AVGO', 'TLRY', 'PCAR', 'CG', 'MIDD', 'APA', 'LNT', 'VRSK',
    'PANW', 'CSCO', 'SBAC', 'HTZ', 'DBX', 'CHKEW', 'LCID', 'ADSK', 'APLS', 'STLD',
    'PEP', 'PTON', 'ENPH', 'COST', 'CPRT', 'HST', 'KHC', 'CHRW', 'AMZN', 'ANSS',
    'HOLX', 'TROW', 'APP', 'FIVE', 'AFRM', 'GOOGL', 'FTNT', 'SWAV', 'ZM', 'META',
    'GH', 'JBHT', 'UAL', 'MCHP', 'DDOG', 'ODFL', 'CTSH', 'EA', 'RUN', 'CSGP',
    'DXCM', 'TSLA', 'PTC', 'PYPL', 'PENN', 'XEL', 'XRAY', 'SPLK', 'CMCSA', 'BKR'
]

stock_list_df = pd.read_csv(
    './data/nasdaq-screener/nasdaq_screener_1701158836955.csv') if MODE == "train" else pd.read_csv(
    '/kaggle/input/nasdaq-screener/nasdaq_screener_1701158836955.csv')

Processing files: 100%|██████████| 4235/4235 [00:58<00:00, 72.28it/s]


In [17]:
def get_stock_info(df, data, column_name):  # column_name = "Market Cap", "Sector", "Industry"
    le = LabelEncoder()

    if column_name != "Market Cap":
        stock_list_df[column_name] = le.fit_transform(stock_list_df[column_name])

    df[f'{column_name}'] = -1

    for idx, ticker in enumerate(reversed_stock_list):
        stock_id_indices = data[data['stock_id'] == idx].index
        if ticker in stock_list_df["Symbol"].values:
            value = stock_list_df[stock_list_df["Symbol"] == ticker][column_name].iloc[0]
            df.loc[stock_id_indices, f'{column_name}'] = value

    return df

In [18]:
@numba.jit(nopython=True)
def find_exact_two_decimal_divisors(number, target):
    closest_divisor = None
    closest_value = None
    min_diff = np.inf

    for divisor in range(1, int(number) + 1):
        division_result = number / divisor
        if np.abs(division_result - np.round(division_result, 2)) < 1e-10:
            diff = np.abs(division_result - target)
            if diff < min_diff:
                min_diff = diff
                closest_divisor = divisor
                closest_value = division_result

    return closest_divisor, closest_value

In [19]:
def estimate_and_verify_prices(df):
    all_data_df_list = []

    for date_id, group in df.groupby('date_id'):
        try:
            tick_size_min = group["tick_size_min"].iloc[0]
            group = group.copy()
            # group['date'] = group['date_id'].apply(lambda x: date_id_dict[x])
            group["date"] = date_id_dict.get(date_id)
            group['estimated_price'] = group['ask_price'] / tick_size_min * 0.01
            group['estimated_price_diff'] = group['estimated_price'].diff().abs().fillna(0) * 100
            results = group.apply(lambda x: find_exact_two_decimal_divisors(x['ask_size'], x['estimated_price']),
                                  axis=1)
            group['quantity'], group['integer_validated_value'] = zip(*results)

            for i in range(1, len(group)):
                prev_val = group.iloc[i - 1]['integer_validated_value']
                curr_val = group.iloc[i]['integer_validated_value']
                estimated_diff = int(group.iloc[i]['estimated_price_diff']) * 0.01

                if abs(curr_val - prev_val) > estimated_diff:
                    if abs((curr_val + estimated_diff) - prev_val) < abs((curr_val - estimated_diff) - prev_val):
                        group.at[group.index[i], 'integer_validated_value'] = curr_val + estimated_diff
                    else:
                        group.at[group.index[i], 'integer_validated_value'] = curr_val - estimated_diff

            all_data_df_list.append(group)
        except Exception as e:
            print(e)
            continue

    return pd.concat(all_data_df_list)

In [20]:
weights = [
    0.004, 0.001, 0.002, 0.006, 0.004, 0.004, 0.002, 0.006, 0.006, 0.002, 0.002, 0.008,
    0.006, 0.002, 0.008, 0.006, 0.002, 0.006, 0.004, 0.002, 0.004, 0.001, 0.006, 0.004,
    0.002, 0.002, 0.004, 0.002, 0.004, 0.004, 0.001, 0.001, 0.002, 0.002, 0.006, 0.004,
    0.004, 0.004, 0.006, 0.002, 0.002, 0.04, 0.002, 0.002, 0.004, 0.04, 0.002, 0.001,
    0.006, 0.004, 0.004, 0.006, 0.001, 0.004, 0.004, 0.002, 0.006, 0.004, 0.006, 0.004,
    0.006, 0.004, 0.002, 0.001, 0.002, 0.004, 0.002, 0.008, 0.004, 0.004, 0.002, 0.004,
    0.006, 0.002, 0.004, 0.004, 0.002, 0.004, 0.004, 0.004, 0.001, 0.002, 0.002, 0.008,
    0.02, 0.004, 0.006, 0.002, 0.02, 0.002, 0.002, 0.006, 0.004, 0.002, 0.001, 0.02,
    0.006, 0.001, 0.002, 0.004, 0.001, 0.002, 0.006, 0.006, 0.004, 0.006, 0.001, 0.002,
    0.004, 0.006, 0.006, 0.001, 0.04, 0.006, 0.002, 0.004, 0.002, 0.002, 0.006, 0.002,
    0.002, 0.004, 0.006, 0.006, 0.002, 0.002, 0.008, 0.006, 0.004, 0.002, 0.006, 0.002,
    0.004, 0.006, 0.002, 0.004, 0.001, 0.004, 0.002, 0.004, 0.008, 0.006, 0.008, 0.002,
    0.004, 0.002, 0.001, 0.004, 0.004, 0.004, 0.006, 0.008, 0.004, 0.001, 0.001, 0.002,
    0.006, 0.004, 0.001, 0.002, 0.006, 0.004, 0.006, 0.008, 0.002, 0.002, 0.004, 0.002,
    0.04, 0.002, 0.002, 0.004, 0.002, 0.002, 0.006, 0.02, 0.004, 0.002, 0.006, 0.02,
    0.001, 0.002, 0.006, 0.004, 0.006, 0.004, 0.004, 0.004, 0.004, 0.002, 0.004, 0.04,
    0.002, 0.008, 0.002, 0.004, 0.001, 0.004, 0.006, 0.004,
]
weights = {int(k): v for k, v in enumerate(weights)}

In [21]:
def weighted_average(n, equal_weight=True):
    w = []

    if equal_weight:
        # 모든 원소에 동일한 가중치 (1/n) 부여
        w = [1 / n for _ in range(n)]
    else:
        # 가중치 계산 방식에 따라 다른 가중치 부여
        for j in range(1, n + 1):
            j = 2 if j == 1 else j
            w.append(1 / (2 ** (n + 1 - j)))

    return w


#### 각 클래스의 method는 각자 필요에 따라 추가 해서 사용하면 됩니다. 이때 class의 주석에 method를 추가하고, method의 주석에는 method의 역할을 간단하게 적어주세요.

# ## Pre Code

## Split Data Class

In [22]:
class Splitter:
    """
    데이터 분리 클래스
    
    Attributes
    ----------
    method : str
        데이터 분리 방식
    n_splits : int
        데이터 분리 개수
    correct : bool
        데이터 분리 시 boundary를 맞출지 여부
    initial_fold_size_ratio : float
        초기 fold size 비율
    train_test_ratio : float
        train, test 비율
        
    Methods
    -------
    split()
        데이터 분리 수행
    """

    def __init__(self, method, n_splits, correct, initial_fold_size_ratio=0.6, train_test_ratio=0.8, gap=0,
                 origin_data=None,
                 overlap=True, train_start=0,
                 train_end=390, valid_start=391, valid_end=480):
        self.method = method
        self.n_splits = n_splits
        self.correct = correct
        self.initial_fold_size_ratio = initial_fold_size_ratio
        self.train_test_ratio = train_test_ratio

        self.gap = gap
        self.origin_data = origin_data
        self.overlap = overlap

        # only for holdout method
        self.train_start = train_start
        self.train_end = train_end
        self.valid_start = valid_start
        self.valid_end = valid_end

        self.target = config["target"]

        self.boundaries = []

    def split(self, data, p_gap=None):
        self.data = reduce_mem_usage(data)
        self.all_dates = self.data['date_id_copy'].unique()
        if self.method == "time_series":
            if self.n_splits <= 1:
                raise ValueError("Time series split method only works with n_splits > 1")
            return self._time_series_split(data)
        elif self.method == "rolling":
            if self.n_splits <= 1:
                raise ValueError("Rolling split method only works with n_splits > 1")
            return self._rolling_split(data)
        elif self.method == "blocking":
            if self.n_splits <= 1:
                raise ValueError("Blocking split method only works with n_splits > 1")
            self.initial_fold_size_ratio = 1.0 / self.n_splits
            return self._rolling_split(data)
        elif self.method == "holdout":
            if self.n_splits != 1:
                raise ValueError("Holdout method only works with n_splits=1")
            return self._holdout_split(data)

        elif self.method == "purged":
            if self.n_splits <= 1:
                raise ValueError("Purged split method only works with n_splits > 1")
            return self._purged_split(data, p_gap)
        else:
            raise ValueError("Invalid method")

    def _correct_boundary(self, data, idx, direction="forward"):
        # Correct the boundary based on date_id_copy
        original_idx = idx
        if idx == 0 or idx == len(data) - 1:
            return idx
        if direction == "forward":
            while idx < len(data) and data.iloc[idx]['date_id_copy'] == data.iloc[original_idx]['date_id_copy']:
                idx += 1
        elif direction == "backward":
            while idx > 0 and data.iloc[idx]['date_id_copy'] == data.iloc[original_idx]['date_id_copy']:
                idx -= 1
            idx += 1  # adjust to include the boundary
        return idx

    def _time_series_split(self, data):
        n = len(data)
        initial_fold_size = int(n * self.initial_fold_size_ratio)
        initial_test_size = int(initial_fold_size * (1 - self.train_test_ratio))
        increment = (1.0 - self.initial_fold_size_ratio) / (self.n_splits - 1)

        for i in range(self.n_splits):
            fold_size = int(n * (self.initial_fold_size_ratio + i * increment))
            train_size = fold_size - initial_test_size

            if self.correct:
                train_size = self._correct_boundary(data, train_size, "forward")
                end_of_test = self._correct_boundary(data, train_size + initial_test_size, "forward")
            else:
                end_of_test = train_size + initial_test_size

            train_slice = data.iloc[:train_size]
            test_slice = data.iloc[train_size:end_of_test]
            if test_slice.shape[0] == 0:
                raise ValueError("Try setting correct=False or Try reducing the train_test_ratio")

            X_train = train_slice.drop(columns=[self.target, 'date_id_copy'])
            y_train = train_slice[self.target]
            X_test = test_slice.drop(columns=[self.target, 'date_id_copy'])
            y_test = test_slice[self.target]

            self.boundaries.append((
                train_slice['date_id_copy'].iloc[0],
                train_slice['date_id_copy'].iloc[-1],
                test_slice['date_id_copy'].iloc[-1]
            ))
            yield X_train, y_train, X_test, y_test

    def _rolling_split(self, data):
        n = len(data)
        total_fold_size = int(n * self.initial_fold_size_ratio)
        test_size = int(total_fold_size * (1 - self.train_test_ratio))
        gap_size = int(total_fold_size * self.gap)
        train_size = total_fold_size - test_size
        rolling_increment = (n - total_fold_size) // (self.n_splits - 1)

        end_of_test = n - 1
        start_of_test = end_of_test - test_size
        end_of_train = start_of_test - gap_size
        start_of_train = end_of_train - train_size

        for _ in range(self.n_splits):
            if self.correct:
                start_of_train = self._correct_boundary(data, start_of_train, direction="forward")
                end_of_train = self._correct_boundary(data, end_of_train, direction="backward")
                start_of_test = self._correct_boundary(data, start_of_test, direction="forward")
                end_of_test = self._correct_boundary(data, end_of_test, direction="forward")

            train_slice = data[start_of_train:end_of_train]
            test_slice = data[start_of_test:end_of_test]
            if test_slice.shape[0] == 0:
                raise ValueError("Try setting correct=False or Try reducing the train_test_ratio")

            X_train = train_slice.drop(columns=[self.target, 'date_id_copy'])
            y_train = train_slice[self.target]
            X_test = test_slice.drop(columns=[self.target, 'date_id_copy'])
            y_test = test_slice[self.target]

            self.boundaries.append((
                train_slice['date_id_copy'].iloc[0],
                train_slice['date_id_copy'].iloc[-1],
                test_slice['date_id_copy'].iloc[0],
                test_slice['date_id_copy'].iloc[-1]
            ))
            yield X_train, y_train, X_test, y_test
            start_of_train = max(start_of_train - rolling_increment, 0)
            end_of_train -= rolling_increment
            start_of_test -= rolling_increment
            end_of_test -= rolling_increment

    def _holdout_split(self, data):
        # train_start ~ train_end : 학습 데이터 기간
        # valid_start ~ valid_end : 검증 데이터 기간
        # 학습 및 검증 데이터 분리
        train_mask = (data['date_id_copy'] >= self.train_start) & (data['date_id_copy'] <= self.train_end)
        valid_mask = (data['date_id_copy'] >= self.valid_start) & (data['date_id_copy'] <= self.valid_end)

        train_slice = data[train_mask]
        valid_slice = data[valid_mask]

        X_train = train_slice.drop(columns=[self.target, 'date_id_copy'])
        y_train = train_slice[self.target]
        X_valid = valid_slice.drop(columns=[self.target, 'date_id_copy'])
        y_valid = valid_slice[self.target]

        self.boundaries.append((
            train_slice['date_id_copy'].iloc[0],
            train_slice['date_id_copy'].iloc[-1],
            valid_slice['date_id_copy'].iloc[0],
            valid_slice['date_id_copy'].iloc[-1]
        ))
        yield X_train, y_train, X_valid, y_valid

    def _purged_split(self, data, p_gap):
        fold_size = 480 // self.n_splits
        date_ids = data['date_id_copy'].values

        for i in range(self.n_splits):
            start = i * fold_size
            end = start + fold_size
            if i < self.n_splits - 1:  # No need to purge after the last fold
                purged_start = end - 2
                purged_end = end + p_gap + 2
                train_indices = (date_ids >= start) & (date_ids < purged_start) | (date_ids > purged_end)
            else:
                train_indices = (date_ids >= start) & (date_ids < end)

            test_indices = (date_ids >= end) & (date_ids < end + fold_size)

            import gc
            gc.collect()

            df_fold_train = data.drop(columns=[self.target, 'date_id_copy']).iloc[train_indices]
            df_fold_train_target = data['target'][train_indices]
            df_fold_valid = data.drop(columns=[self.target, 'date_id_copy']).iloc[test_indices]
            df_fold_valid_target = data['target'][test_indices]

            self.boundaries.append((
                data['date_id_copy'].iloc[train_indices].iloc[0],
                data['date_id_copy'].iloc[train_indices].iloc[-1],
                data['date_id_copy'].iloc[test_indices].iloc[0],
                data['date_id_copy'].iloc[test_indices].iloc[-1]
            ))
            yield df_fold_train, df_fold_train_target, df_fold_valid, df_fold_valid_target

    def visualize_splits(self):
        print("Visualizing Train/Test Split Boundaries")

        plt.figure(figsize=(15, 6))

        for idx, (train_start, train_end, test_start, test_end) in enumerate(self.boundaries):
            train_width = train_end - train_start + 1
            plt.barh(y=idx, width=train_width, left=train_start, color='blue', edgecolor='black')
            plt.text(train_start + train_width / 2, idx - 0.15, f'{train_start}-{train_end}', ha='center', va='center',
                     color='black', fontsize=8)

            test_width = test_end - test_start + 1
            plt.barh(y=idx, width=test_width, left=test_start, color='red', edgecolor='black')
            if test_width > 0:
                plt.text(test_start + test_width / 2, idx + 0.15, f'{test_start}-{test_end}', ha='center', va='center',
                         color='black', fontsize=8)

        plt.yticks(range(len(self.boundaries)), [f"split {i + 1}" for i in range(len(self.boundaries))])
        plt.xticks(self.all_dates[::int(len(self.all_dates) / 10)])
        plt.xlabel("date_id_copy")
        plt.title("Train/Test Split Boundaries")
        plt.grid(axis='x')

        plt.tight_layout()
        plt.show()

## Data Preprocessing Class

In [23]:
class DataPreprocessor:
    """
    데이터 전처리 클래스
    
    Attributes
    ----------
    data : pandas.DataFrame
        전처리할 데이터
        
    Methods
    -------
    handle_missing_data()
        결측치 처리
    handle_outliers()
        이상치 처리
    normalize()
        정규화
    custom_preprocessing()
        사용자 정의 전처리
    transform()
        전처리 수행
    """

    def __init__(self, data, infer=False):
        self.data = data  # reduce_mem_usage(data) # reduce_mem_usage 정밀도 훼손함 
        self.infer = infer

    @print_log("Executed {func_name}, Elapsed time: {elapsed_time:.2f} seconds")
    def handle_missing_data(self):
        # 결측치 처리 코드
        self.data = self.data.dropna(subset=["target"]) if self.infer == False else self.data
        self.data = self.data.reset_index(drop=True) if self.infer == False else self.data
        # self.data.reset_index(drop=True, inplace=True)
        return self.data

    @print_log("Executed {func_name}, Elapsed time: {elapsed_time:.2f} seconds")
    def handle_outliers(self):
        # 이상치 처리 코드
        return self.data

    @print_log("Executed {func_name}, Elapsed time: {elapsed_time:.2f} seconds")
    def normalize(self):
        # 정규화 코드
        return self.data

    @print_log("Executed {func_name}, Elapsed time: {elapsed_time:.2f} seconds")
    def custom_preprocessing(self):
        # 사용자 정의 전처리 코드
        return self.data

    @print_log("Executed {func_name}, Elapsed time: {elapsed_time:.2f} seconds")
    def transform(self):
        # 전처리 수행 코드 (위의 메소드 활용 가능)
        self.handle_missing_data()
        # self.handle_outliers()
        # self.normalize()
        # self.custom_preprocessing()
        return self.data

## Feature Engineering Class

In [24]:
global_features = {}

In [25]:
class FeatureEngineer:
    """
    이 클래스는 데이터 세트에 대한 피처 엔지니어링을 수행합니다.
    클래스의 주요 목적은 데이터 세트에 대한 다양한 변환 및 가공을 통해 머신 러닝 모델에 적합한 형태의 피처를 생성하는 것입니다.

    클래스에는 다음과 같은 메서드들이 포함됩니다:
    1. feature_version_n: 피처 엔지니어링의 다양한 버전을 구현합니다. 
       이 메서드들은 데이터에 대한 고유한 변환을 적용하며, 다른 피처 엔지니어링 버전의 결과를 결합할 수도 있습니다.
    2. transform: 모든 피처 엔지니어링 버전의 결과를 결합하여 최종적으로 통합된 데이터 세트를 생성하고 반환합니다.

    feature_version_n 메서드의 'args' 매개변수에 대한 설명:
    - 'args'는 가변 인자로, 다른 피처 엔지니어링 버전의 결과를 전달하는 데 사용됩니다.
    - 예를 들어, feature_version_2 메서드가 feature_version_0의 결과를 필요로 하는 경우, 
      feature_version_2(feature_version_0()) 형태로 호출할 수 있습니다.
    - 이런 방식으로 'args'를 사용하면, 하나의 피처 엔지니어링 버전이 다른 버전의 결과를 참조하고 활용할 수 있습니다.

    주의: 이 클래스는 원본 데이터를 직접 수정하지 않습니다. 모든 변환은 새로운 데이터 프레임에 적용되며, 
    transform 메서드는 최종적으로 통합된 데이터 세트를 반환합니다.
    """

    def __init__(self, data, infer=False, feature_versions=None, dependencies=None,
                 base_directory="./data/fe_versions"):
        self.data = data
        self.infer = infer
        self.feature_versions = feature_versions or []
        self.dependencies = dependencies or {}  # 피처 버전 간 의존성을 정의하는 딕셔너리
        self.base_directory = base_directory
        if not os.path.exists(self.base_directory):
            os.makedirs(self.base_directory)

    def _save_to_parquet(self, df, version_name):
        file_path = os.path.join(self.base_directory, f"{version_name}.parquet")
        df.to_parquet(file_path)
        print(f"Saved {version_name} to {file_path}")

    def _load_from_parquet(self, version_name):
        file_path = os.path.join(self.base_directory, f"{version_name}.parquet")
        if os.path.exists(file_path):
            return pq.read_table(file_path).to_pandas()
        else:
            raise FileNotFoundError(f"File {file_path} not found.")

    """
    def feature_version_n(self, *args):
        """"""
        이 메서드는 피처 엔지니어링을 위한 예제 버전으로, 새로운 피처들을 데이터 프레임에 추가합니다.
    
        과정은 다음과 같습니다:
        1. 빈 데이터 프레임 생성: 원본 데이터(self.data)의 인덱스를 기반으로 빈 데이터 프레임(df)을 생성합니다.
        2. 새 피처 추가: 원본 데이터의 'feature' 컬럼에 1을 더하여 새 피처를 'new_feature'라는 이름으로 df에 추가합니다.
        3. 다른 피처 의존성 추가: args[0]에서 'feature' 값을 가져와 원본 데이터의 'feature_2'에 더하고, 이를 'new_feature_2'라는 이름으로 df에 추가합니다.
           이 단계에서 args[0]는 이 메서드의 첫 번째 인자로, 다른 피처 엔지니어링 버전의 결과를 나타냅니다.
        4. 결측값 처리: df의 모든 결측값을 -999로 채웁니다.
    
        이 메서드는 데이터 변환 및 가공 과정에서 다른 피처 엔지니어링 메서드와 함께 사용될 수 있으며, 
        머신 러닝 모델의 입력으로 사용될 수 있는 가공된 피처 세트를 생성합니다.
    
        반환 값: 가공된 피처를 포함하는 데이터 프레임(df)
        """"""
        # create empty dataframe
        df = pd.DataFrame(index=self.data.index)
        # add new feature to df
        df["new_feature"] = self.data["feature"] + 1
        # add new feature that depends on other feature
        df["new_feature_2"] = self.data["feature_2"] + args[0]["feature"]
        # fill nan values with -999
        df = df.fillna(-999)
        return df
    """

    @staticmethod
    @print_log("Executed {func_name}, Elapsed time: {elapsed_time:.2f} seconds")
    def generate_global_features(data):
        global_features["version_0"] = {
            "median_size": data.groupby("stock_id")["bid_size"].median() + data.groupby("stock_id")[
                "ask_size"].median(),
            "std_size": data.groupby("stock_id")["bid_size"].std() + data.groupby("stock_id")["ask_size"].std(),
            "ptp_size": data.groupby("stock_id")["bid_size"].max() - data.groupby("stock_id")["bid_size"].min(),
            "median_price": data.groupby("stock_id")["bid_price"].median() + data.groupby("stock_id")[
                "ask_price"].median(),
            "std_price": data.groupby("stock_id")["bid_price"].std() + data.groupby("stock_id")["ask_price"].std(),
            "ptp_price": data.groupby("stock_id")["bid_price"].max() - data.groupby("stock_id")["ask_price"].min(),
            # "min_size": data.groupby('stock_id')['bid_size'].min() + data.groupby('stock_id')['ask_size'].min(),
            # "max_size": data.groupby('stock_id')['bid_size'].max() + data.groupby('stock_id')['ask_size'].max()
        }

    @print_log("Executed {func_name}, Elapsed time: {elapsed_time:.2f} seconds")
    def feature_selection(self, data, exclude_columns):
        # 제외할 컬럼을 뺀 나머지로 구성된 새로운 DataFrame을 생성합니다.
        selected_columns = [c for c in data.columns if c not in exclude_columns]
        data = data[selected_columns]
        return data

    @print_log("Executed {func_name}, Elapsed time: {elapsed_time:.2f} seconds")
    def feature_version_time(self, *args, version_name="feature_version_time"):
        # feature engineering version 0
        df = pd.DataFrame(index=self.data.index)

        data = self.data.copy()

        df["dow"] = data["date_id"] % 5
        # df["dom"] = data["date_id"] % 20
        df["seconds"] = data["seconds_in_bucket"] % 60
        df["minute"] = data["seconds_in_bucket"] // 60
        df['time_to_market_close'] = 540 - data['seconds_in_bucket']

        for key, value in global_features["version_0"].items():
            df[f"global_{key}"] = data["stock_id"].map(value.to_dict())

        del data
        import gc
        gc.collect()

        return df

    @print_log("Executed {func_name}, Elapsed time: {elapsed_time:.2f} seconds")
    def feature_version_imbalance_1(self, *args, version_name="feature_version_alvin_1"):
        # feature engineering version 1
        # create empty dataframe
        df = pd.DataFrame(index=self.data.index)

        data = self.data.copy()

        prices = ["reference_price", "far_price", "near_price", "ask_price", "bid_price", "wap"]
        sizes = ["matched_size", "bid_size", "ask_size", "imbalance_size"]
        df["volume"] = data.eval("ask_size + bid_size")
        df["mid_price"] = data.eval("(ask_price + bid_price) / 2")
        df["liquidity_imbalance"] = data.eval(f"(bid_size-ask_size)/(bid_size+ask_size+{EPS})")
        df["matched_imbalance"] = data.eval(f"(imbalance_size-matched_size)/(matched_size+imbalance_size+{EPS})")
        df["size_imbalance"] = data.eval(f"bid_size / ask_size+{EPS}")

        for c in combinations(prices, 2):
            # df[f'{c[0]}_minus_{c[1]}'] = (data[f'{c[0]}'] - data[f'{c[1]}']).astype(np.float32)  # added
            df[f"{c[0]}_{c[1]}_imb"] = data.eval(f"({c[0]} - {c[1]})/({c[0]} + {c[1]} + {EPS})")

        for c in [['ask_price', 'bid_price', 'wap', 'reference_price'], sizes]:
            triplet_feature = calculate_triplet_imbalance_numba(c, data)
            df[triplet_feature.columns] = triplet_feature.values

        del data
        import gc
        gc.collect()

        return df

    @print_log("Executed {func_name}, Elapsed time: {elapsed_time:.2f} seconds")
    def feature_version_imbalance_2_0(self, *args, version_name="feature_version_alvin_2_0"):
        # feature engineering version 2_0
        df = pd.DataFrame(index=self.data.index)

        data = self.data.copy()

        df["imbalance_momentum"] = data.groupby(['stock_id'])['imbalance_size'].diff(periods=1) / (data[
                                                                                                       'matched_size'] + EPS)
        df["price_spread"] = data["ask_price"] - data["bid_price"]
        # temporary concat for groupby(stock_id)
        temp_df = pd.concat([data, df], axis=1)
        df["spread_intensity"] = temp_df.groupby(['stock_id'])['price_spread'].diff()

        del data
        import gc
        gc.collect()

        return df

    #  dependency: "feature_version_imbalance_1", "feature_version_imbalance_2_0"
    @print_log("Executed {func_name}, Elapsed time: {elapsed_time:.2f} seconds")
    def feature_version_imbalance_2_1(self, *args, version_name="feature_version_alvin_2_1"):
        df = pd.DataFrame(index=self.data.index)

        prices = ["reference_price", "far_price", "near_price", "ask_price", "bid_price", "wap"]
        sizes = ["matched_size", "bid_size", "ask_size", "imbalance_size"]

        df['price_pressure'] = self.data['imbalance_size'] * (self.data['ask_price'] - self.data['bid_price'])
        df['market_urgency'] = args[1]['price_spread'] * args[0]['liquidity_imbalance']
        df['depth_pressure'] = (self.data['ask_size'] - self.data['bid_size']) * (
                self.data['far_price'] - self.data['near_price'])

        for func in ["mean", "std", "skew", "kurt"]:
            df[f"all_prices_{func}"] = self.data[prices].agg(func, axis=1)
            df[f"all_sizes_{func}"] = self.data[sizes].agg(func, axis=1)

        return df

    @print_log("Executed {func_name}, Elapsed time: {elapsed_time:.2f} seconds")
    def feature_version_imbalance_3(self, *args, version_name="feature_version_alvin_3"):
        # feature engineering version 3
        df = pd.DataFrame(index=self.data.index)

        data = self.data.copy()
        for col in ['matched_size', 'imbalance_size', 'reference_price', 'imbalance_buy_sell_flag']:
            for window in [1, 3, 5, 10]:
                df[f"{col}_shift_{window}"] = data.groupby('stock_id')[col].shift(window)
                # df[f"{col}_ret_{window}"] = data.groupby('stock_id')[col].pct_change(window)
                df[f"{col}_ret_{window}"] = data.groupby('stock_id')[col].pct_change(window)
                # df[f"{col}_ret_{window}"] = data.groupby('stock_id')[col].transform(
                #     lambda x: custom_pct_change(x, window))

        for col in ['ask_price', 'bid_price', 'ask_size', 'bid_size']:
            for window in [1, 3, 5, 10]:
                df[f"{col}_diff_{window}"] = data.groupby("stock_id")[col].diff(window)

        del data
        import gc
        gc.collect()

        return df

    @print_log("Executed {func_name}, Elapsed time: {elapsed_time:.2f} seconds")
    def feature_version_imbalance_4_0(self, *args, version_name="feature_version_alvin_4_0"):
        # feature engineering version 4
        df = pd.DataFrame(index=self.data.index)

        data = self.data.copy()
        for col in ['matched_size', 'imbalance_size', 'reference_price', 'imbalance_buy_sell_flag']:
            for window in [1, 2, 3, 10]:
                # SMA 1, 2, 3, 10 - 이동 평균
                df[f"{col}_sma_{window}"] = data.groupby('stock_id')[col].rolling(window=window).mean().values
                # EMA 1, 2, 3, 10 - 지수 이동 평균
                df[f"{col}_ema_{window}"] = data.groupby('stock_id')[col].ewm(span=window).mean().values
                # WMA 1, 2, 3, 10 - 가중 이동 평균
                weights = np.arange(1, window + 1)
                df[f"{col}_wma_{window}"] = data.groupby('stock_id')[col].rolling(window=window).apply(
                    lambda x: np.dot(x, weights) / weights.sum(), raw=True).values
                # Volatility 1, 2, 3, 10 - 변동성
                df[f"{col}_volatility_{window}"] = data.groupby('stock_id')[col].rolling(
                    window=window).std().reset_index(
                    level=0, drop=True)
                # Price Range 1, 2, 3, 10 - 가격 범위
                rolling_max = data.groupby('stock_id')[col].rolling(window=window).max().reset_index(level=0,
                                                                                                     drop=True)
                rolling_min = data.groupby('stock_id')[col].rolling(window=window).min().reset_index(level=0,
                                                                                                     drop=True)
                df[f"{col}_range_{window}"] = rolling_max - rolling_min

        del data
        import gc
        gc.collect()

        return df

    @print_log("Executed {func_name}, Elapsed time: {elapsed_time:.2f} seconds")
    def feature_version_imbalance_4_1(self, *args, version_name="feature_version_alvin_4_1"):
        df = pd.DataFrame(index=self.data.index)

        data = self.data.copy()
        for col in ['ask_price', 'bid_price', 'ask_size', 'bid_size']:
            for window in [1, 2, 3, 10]:
                # SMA 1, 2, 3, 10 - 이동 평균
                df[f"{col}_sma_{window}"] = data.groupby('stock_id')[col].rolling(window=window).mean().values
                # EMA 1, 2, 3, 10 - 지수 이동 평균
                df[f"{col}_ema_{window}"] = data.groupby('stock_id')[col].ewm(span=window).mean().values
                # WMA 1, 2, 3, 10 - 가중 이동 평균 
                weights = np.arange(1, window + 1)
                df[f"{col}_wma_{window}"] = data.groupby('stock_id')[col].rolling(window=window).apply(
                    lambda x: np.dot(x, weights) / weights.sum(), raw=True).values
                # Volatility 1, 2, 3, 10 - 변동성
                df[f"{col}_volatility_{window}"] = data.groupby('stock_id')[col].rolling(
                    window=window).std().reset_index(
                    level=0, drop=True)
                # Price Range 1, 2, 3, 10 - 가격 범위
                rolling_max = data.groupby('stock_id')[col].rolling(window=window).max().reset_index(level=0,
                                                                                                     drop=True)
                rolling_min = data.groupby('stock_id')[col].rolling(window=window).min().reset_index(level=0,
                                                                                                     drop=True)
                df[f"{col}_range_{window}"] = rolling_max - rolling_min

        del data
        import gc
        gc.collect()

        return df

    @print_log("Executed {func_name}, Elapsed time: {elapsed_time:.2f} seconds")
    def feature_version_imbalance_4_2(self, *args, version_name="feature_version_alvin_4_2"):
        df = pd.DataFrame(index=self.data.index)

        data = self.data.copy()
        for col in ['far_price', 'near_price', 'wap']:
            for window in [1, 2, 3, 10]:
                # SMA 1, 2, 3, 10 - 이동 평균
                df[f"{col}_sma_{window}"] = data.groupby('stock_id')[col].rolling(window=window).mean().values
                # EMA 1, 2, 3, 10 - 지수 이동 평균
                df[f"{col}_ema_{window}"] = data.groupby('stock_id')[col].ewm(span=window).mean().values
                # WMA 1, 2, 3, 10 - 가중 이동 평균
                weights = np.arange(1, window + 1)
                df[f"{col}_wma_{window}"] = data.groupby('stock_id')[col].rolling(window=window).apply(
                    lambda x: np.dot(x, weights) / weights.sum(), raw=True).values
                # Volatility 1, 2, 3, 10 - 변동성
                df[f"{col}_volatility_{window}"] = data.groupby('stock_id')[col].rolling(
                    window=window).std().reset_index(
                    level=0, drop=True)
                # Price Range 1, 2, 3, 10 - 가격 범위
                rolling_max = data.groupby('stock_id')[col].rolling(window=window).max().reset_index(level=0,
                                                                                                     drop=True)
                rolling_min = data.groupby('stock_id')[col].rolling(window=window).min().reset_index(level=0,
                                                                                                     drop=True)
                df[f"{col}_range_{window}"] = rolling_max - rolling_min

        del data
        import gc
        gc.collect()

        return df

    @print_log("Executed {func_name}, Elapsed time: {elapsed_time:.2f} seconds")
    def feature_version_imbalance_5(self, *args, version_name="feature_version_alvin_5"):
        df = pd.DataFrame(index=self.data.index)

        data = self.data.copy()
        for window in [1, 2, 3, 10]:
            delta = data.groupby('stock_id')['wap'].diff()

            gain = (delta.where(delta > 0, 0))
            gain = gain.groupby(data['stock_id']).rolling(window=window).mean().reset_index(level=0, drop=True)

            loss = (-delta.where(delta < 0, 0))
            loss = loss.groupby(data['stock_id']).rolling(window=window).mean().reset_index(level=0, drop=True)

            rs = gain / loss
            df[f'RSI_{window}'] = 100 - (100 / (1 + rs))

        del data
        import gc
        gc.collect()

        return df

    @print_log("Executed {func_name}, Elapsed time: {elapsed_time:.2f} seconds")
    def feature_version_imbalance_6_0(self, *args, version_name="feature_version_imbalance_6"):
        df = pd.DataFrame(index=self.data.index)

        df["stock_weights"] = self.data["stock_id"].map(weights)

        return df

    # dependency: feature_version_imbalance_6_0 : df['stock_weights']
    @print_log("Executed {func_name}, Elapsed time: {elapsed_time:.2f} seconds")
    def feature_version_imbalance_6_1(self, *args, version="feature_version_imbalance_6_1"):
        df = pd.DataFrame(index=self.data.index)

        df["weighted_wap"] = args[0]["stock_weights"] * self.data["wap"]
        df['wap_momentum'] = df.groupby(self.data['stock_id'])['weighted_wap'].pct_change(periods=6)

        return df

    # dependency: feature_version_imbalance_1: df['mid_price']
    @print_log("Executed {func_name}, Elapsed time: {elapsed_time:.2f} seconds")
    def feature_version_imbalance_7(self, *args, version_name="feature_version_imbalance_7"):
        df = pd.DataFrame(index=self.data.index)

        # 데이터 복사 제거 및 직접 self.data 사용
        df['spread_depth_ratio'] = (self.data['ask_price'] - self.data['bid_price']) / (
                self.data['bid_size'] + self.data['ask_size'])
        df['mid_price_movement'] = args[0]['mid_price'].diff(periods=5).apply(
            lambda x: 1 if x > 0 else (-1 if x < 0 else 0))
        df['micro_price'] = ((self.data['bid_price'] * self.data['ask_size']) + (
                self.data['ask_price'] * self.data['bid_size'])) / (self.data['bid_size'] + self.data['ask_size'])
        df['relative_spread'] = (self.data['ask_price'] - self.data['bid_price']) / self.data['wap']

        return df

    # dependency: feature_version_imbalance_3 : df['ask_price_diff_*'] and df['bid_price_diff_*'] and df['ask_size_diff_*'] and df['bid_size_diff_*']
    @print_log("Executed {func_name}, Elapsed time: {elapsed_time:.2f} seconds")
    def feature_version_imbalance_8(self, *args, version_name="feature_version_imbalance_8"):
        df = pd.DataFrame(index=self.data.index)

        for window in [3, 5, 10]:
            bid_price_diff = args[0][f'bid_price_diff_{window}']
            ask_price_diff = args[0][f'ask_price_diff_{window}']
            bid_size_diff = args[0][f'bid_size_diff_{window}']
            ask_size_diff = args[0][f'ask_size_diff_{window}']

            df[f'price_change_diff_{window}'] = bid_price_diff - ask_price_diff
            df[f'size_change_diff_{window}'] = bid_size_diff - ask_size_diff

        return df

    # dependency: feature_version_imbalance_3 : df['ask_price_diff_*'] and df['bid_price_diff_*'] and df['ask_size_diff_*'] and df['bid_size_diff_*']
    @print_log("Executed {func_name}, Elapsed time: {elapsed_time:.2f} seconds")
    def feature_version_imbalance_9(self, *args, version_name="feature_version_imbalance_9"):
        # Convert from pandas to Polars
        data = self.data.copy()
        for col in args[0].columns:
            data[col] = args[0][col]
        pl_df = pl.from_pandas(data)

        #Define the windows and columns for which you want to calculate the rolling statistics
        windows = [3, 5, 10]
        columns = ['ask_price', 'bid_price', 'ask_size', 'bid_size']

        # prepare the operations for each column and window
        group = ["stock_id"]
        expressions = []

        # Loop over each window and column to create the rolling mean and std expressions
        for window in windows:
            for col in columns:
                rolling_mean_expr = (
                    pl.col(f"{col}_diff_{window}")
                    .rolling_mean(window)
                    .over(group)
                    .alias(f'rolling_diff_{col}_{window}')
                )

                rolling_std_expr = (
                    pl.col(f"{col}_diff_{window}")
                    .rolling_std(window)
                    .over(group)
                    .alias(f'rolling_std_diff_{col}_{window}')
                )

                expressions.append(rolling_mean_expr)
                expressions.append(rolling_std_expr)

        # Run the operations using Polars' lazy API
        lazy_df = pl_df.lazy().with_columns(expressions)

        # Execute the lazy expressions and overwrite the pl_df variable
        pl_df = lazy_df.collect()

        # Convert back to pandas if necessary
        pl_to_df = pl_df.to_pandas()
        import gc
        gc.collect()

        # get the columns that we want rolling_diff_* and rolling_std_diff_* and drop the rest
        rolling_diff_columns = [col for col in pl_to_df.columns if 'rolling_diff_' in col]
        rolling_std_diff_columns = [col for col in pl_to_df.columns if 'rolling_std_diff_' in col]
        df = pl_to_df[rolling_diff_columns + rolling_std_diff_columns]

        del data
        gc.collect()

        return df

    # dependency: feature_version_imbalance_1:  df['volume'] ,feature_version_imbalance_7 : df['mid_price_movement']
    @print_log("Executed {func_name}, Elapsed time: {elapsed_time:.2f} seconds")
    def feature_version_imbalance_10(self, *args, version_name="feature_version_imbalance_10"):
        df = pd.DataFrame(index=self.data.index)

        # 데이터 복사 생략 및 벡터화된 연산 사용
        df['mid_price*volume'] = args[1]['mid_price_movement'] * args[0]['volume']
        df['harmonic_imbalance'] = 2 / ((1 / self.data['bid_size']) + (1 / self.data['ask_size']))

        return df

    @print_log("Executed {func_name}, Elapsed time: {elapsed_time:.2f} seconds")
    def feature_version_imbalance_11(self, *args, version_name="feature_version_imbalance_11"):
        df = pd.DataFrame(index=self.data.index)

        # 데이터 복사 생략
        df['bid_plus_ask_sizes'] = self.data['bid_size'] + self.data['ask_size']

        # map 연산을 하나로 통합
        size_features = global_features["version_0"]
        for feature in ['median_size', 'std_size', 'min_size', 'max_size']:
            df[feature] = self.data['stock_id'].map(size_features[feature].to_dict())

        df['high_volume'] = np.where(df['bid_plus_ask_sizes'] > df['median_size'], 1, 0)

        return df

    @print_log("Executed {func_name}, Elapsed time: {elapsed_time:.2f} seconds")
    def feature_version_imbalance_12(self, *args, version_name="feature_version_imbalance_12"):
        df = pd.DataFrame(index=self.data.index)

        # Perform calculations using vectorized operations
        df['imb_s1'] = (self.data['bid_size'] - self.data['ask_size']) / (self.data['bid_size'] + self.data['ask_size'])
        df['imb_s2'] = (self.data['imbalance_size'] - self.data['matched_size']) / (
                self.data['matched_size'] + self.data['imbalance_size'])

        return df

    @print_log("Executed {func_name}, Elapsed time: {elapsed_time:.2f} seconds")
    def feature_version_order_flow(self, *args, version_name="feature_version_imbalance_order_flow"):
        df = pd.DataFrame(index=self.data.index)

        # Perform calculations using vectorized operations
        bid_price_diff = self.data['bid_price'].diff()
        ask_price_diff = self.data['ask_price'].diff()
        bid_size_diff = self.data['bid_size'].diff()
        ask_size_diff = self.data['ask_size'].diff()

        ofi = ((bid_price_diff > 0) | (ask_price_diff < 0)) * bid_size_diff \
              - ((bid_price_diff < 0) | (ask_price_diff > 0)) * ask_size_diff

        # Assign OFI to the DataFrame and fill NaN values with 0
        df['OFI'] = ofi.fillna(0)

        return df

    @print_log("Executed {func_name}, Elapsed time: {elapsed_time:.2f} seconds")
    def feature_version_custom_weight(self, *args, version_name="feature_version_custom_weight"):
        def get_stock_weight(data_batch):
            matched_volume = data_batch['matched_size'] * data_batch['wap']
            total_vol = matched_volume.sum()
            weights = matched_volume / total_vol
            data_batch['weights'] = weights
            return data_batch

        if MODE != "inference":
            # Group by 'time_id' with as_index=False to keep 'time_id' as a column
            data = self.data.groupby('time_id', as_index=False).apply(get_stock_weight)

            # Reset index after groupby operation
            data.reset_index(drop=True, inplace=True)
            return data['weights'].to_frame()
        else:
            data = self.data.copy()
            return_value = get_stock_weight(data)['weights'].to_frame()

            del data
            import gc
            gc.collect()

            return return_value

    @print_log("Executed {func_name}, Elapsed time: {elapsed_time:.2f} seconds")
    def feature_version_ta_indicators_1(self, *args, version_name="feature_version_ta_indicators_1"):
        df = pd.DataFrame(index=self.data.index)

        data = self.data.copy()
        # Price Relative to Matched Size
        df['price_relative_to_matched_size'] = data['reference_price'] / data['matched_size']
        # Price Volatility
        df['price_volatility'] = args[1]['price_spread'] / args[0]['mid_price']
        # Price Change per Second
        df['price_change_per_second'] = data.groupby("stock_id")['reference_price'].diff() / \
                                        data.groupby("stock_id")['seconds_in_bucket'].diff()
        # WAP to Reference Price Ratio
        df['wap_to_reference_price_ratio'] = data['wap'] / data['reference_price']
        # Matched Size as % of Total Volume
        df['matched_size_percent_of_total_volume'] = data['matched_size'] / (
                data['bid_size'] + data['ask_size']) * 100

        del data
        import gc
        gc.collect()

        return df

    @print_log("Executed {func_name}, Elapsed time: {elapsed_time:.2f} seconds")
    def feature_version_ta_indicators_2(self, *args, version_name="feature_version_ta_indicators_2"):
        df = pd.DataFrame(index=self.data.index)

        data = self.data.copy()
        # Weighted Price Spread
        df['weighted_price_spread'] = args[0]['price_spread'] * (data['bid_size'] + data['ask_size']) / (
                data['bid_size'] * data['ask_size'])
        # Imbalance Ratio
        df['imbalance_ratio'] = data['imbalance_size'] / data['matched_size']
        # Price Movement Indicator
        df['price_movement_indicator'] = data['reference_price'] - data['wap']
        # Bid-Ask Volume Ratio
        df['bid_ask_volume_ratio'] = data['bid_size'] / data['ask_size']
        # Relative Size to WAP
        df['relative_size_to_wap'] = data['matched_size'] / data['wap']

        del data
        import gc
        gc.collect()

        return df

    @print_log("Executed {func_name}, Elapsed time: {elapsed_time:.2f} seconds")
    def feature_version_ta_indicators_3(self, *args, version_name="feature_version_ta_indicators_3"):
        df = pd.DataFrame(index=self.data.index)

        data = self.data.copy()
        # Order Book Imbalance
        df['order_book_imbalance'] = data['bid_size'] - data['ask_size']
        # Relative Imbalance
        df['relative_imbalance'] = df['order_book_imbalance'] / (data['bid_size'] + data['ask_size'])
        # Inventory Risk - Change in imbalance size (simple difference)
        df['inventory_risk'] = data.groupby('stock_id')['imbalance_size'].diff().fillna(0)
        # Market Order Likelihood - Placeholder calculation as an example
        # In a real-world scenario, this would require a more sophisticated model
        df['market_order_likelihood'] = data['imbalance_buy_sell_flag'] * (
                data['bid_size'] + data['ask_size'])
        # Limit Order Partial Execution Risk
        # Assuming a higher risk when matched size is lower than imbalance size
        df['limit_order_partial_exec_risk'] = data['imbalance_size'] - data['matched_size']
        # Poissonian Trade Model Feature - Placeholder
        # In reality, this would require fitting a Poisson model to historical trade data
        # Here, we use a simple ratio of bid and ask sizes as a proxy
        df['poissonian_trade_model'] = (data['bid_size'] + data['ask_size']) / data[
            'seconds_in_bucket'].replace(0, 1)

        del data
        import gc
        gc.collect()

        return df

    @print_log("Executed {func_name}, Elapsed time: {elapsed_time:.2f} seconds")
    def feature_market_cap(self, *args, version_name="feature_stock_info"):
        df = pd.DataFrame(index=self.data.index)

        data = self.data.copy()
        df = get_stock_info(df, data, "Market Cap")

        del data
        import gc
        gc.collect()

        return df

    @print_log("Executed {func_name}, Elapsed time: {elapsed_time:.2f} seconds")
    def feature_sector(self, *args, version_name="feature_stock_info"):
        df = pd.DataFrame(index=self.data.index)

        data = self.data.copy()
        df = get_stock_info(df, data, "Sector")

        del data
        import gc
        gc.collect()

        return df

    @print_log("Executed {func_name}, Elapsed time: {elapsed_time:.2f} seconds")
    def feature_industry(self, *args, version_name="feature_stock_info"):
        df = pd.DataFrame(index=self.data.index)

        data = self.data.copy()
        df = get_stock_info(df, data, "Industry")

        del data
        import gc
        gc.collect()

        return df

    @print_log("Executed {func_name}, Elapsed time: {elapsed_time:.2f} seconds")
    def feature_real_price(self, *args, version_name="feature_real_price"):
        df = pd.DataFrame(index=self.data.index)

        df['real_price'] = pd.NA
        df['estimated_price_diff'] = pd.NA
        df['estimated_price'] = pd.NA
        df['tick_size_min'] = pd.NA
        df['quantity'] = pd.NA

        data = self.data.copy()

        data["bid_price_diff"] = data.groupby(["date_id", "stock_id"]).bid_price.diff().abs()
        data["ask_price_diff"] = data.groupby(["date_id", "stock_id"]).ask_price.diff().abs()
        tick_size_est_1 = data[data.bid_price_diff > 0].groupby(
            ["date_id", "stock_id"]).bid_price_diff.min().rename("tick_est_bid").to_frame()
        tick_size_est_2 = data[data.ask_price_diff > 0].groupby(
            ["date_id", "stock_id"]).ask_price_diff.min().rename("tick_est_ask")
        tick_size_est = tick_size_est_1.join(tick_size_est_2)
        tick_size_est["tick_size_min"] = tick_size_est.min(1)

        data = data.reset_index().merge(tick_size_est, on=["date_id", "stock_id"], how="left").set_index('index')

        for (stock_id, date_id), group in tqdm(data.groupby(['stock_id', 'date_id']), desc="Processing stock_id",
                                               disable=MODE == "inference"):
            try:
                tick_size_min = group["tick_size_min"].iloc[0]
                estimated_price = group['ask_price'] / tick_size_min * 0.01
                estimated_price_diff = estimated_price.diff().abs().fillna(0) * 100
                results = group.apply(
                    lambda x: find_exact_two_decimal_divisors(x['ask_size'], estimated_price[x.name]), axis=1)
                quantity, integer_validated_value = zip(*results)

                df.loc[group.index, 'tick_size_min'] = tick_size_min
                df.loc[group.index, 'estimated_price'] = estimated_price
                df.loc[group.index, 'estimated_price_diff'] = estimated_price_diff
                df.loc[group.index, 'quantity'] = quantity
                df.loc[group.index, 'integer_validated_value'] = integer_validated_value

                for i in range(1, len(group)):
                    prev_val = integer_validated_value[i - 1]
                    curr_val = integer_validated_value[i]
                    estimated_diff = int(estimated_price_diff.iloc[i]) * 0.01

                    if abs(curr_val - prev_val) > estimated_diff:
                        if abs((curr_val + estimated_diff) - prev_val) < abs(
                                (curr_val - estimated_diff) - prev_val):
                            df.at[group.index[i], 'integer_validated_value'] = curr_val + estimated_diff
                        else:
                            df.at[group.index[i], 'integer_validated_value'] = curr_val - estimated_diff

            except Exception as e:
                # print(e)
                continue

        for col in df.columns:
            if df[col].dtype not in ['float64', 'int64', 'bool']:
                df[col] = pd.to_numeric(df[col], errors='coerce')

        del data
        import gc
        gc.collect()

        return df

    # you can add more feature engineering version like above
    @print_log("Executed {func_name}, Elapsed time: {elapsed_time:.2f} seconds")
    def execute_feature_versions(self, save=False, load=False):
        results = {}

        for version in self.feature_versions:
            if load:
                df = self._load_from_parquet(version)
            else:
                method = getattr(self, version, None)
                if callable(method):
                    args = []
                    for dep in self.dependencies.get(version, []):
                        dep_result = results.get(dep)
                        if isinstance(dep_result, pd.DataFrame):
                            args.append(dep_result)
                        elif dep_result is None and hasattr(self, dep):
                            dep_method = getattr(self, dep)
                            dep_result = dep_method()
                            results[dep] = dep_result
                            args.append(dep_result)
                        else:
                            args.append(None)
                    df = method(*args)
                    if save:
                        self._save_to_parquet(df, version)
            results[version] = df

        # return that was in self.feature_versions
        return {k: v for k, v in results.items() if k in self.feature_versions}

    @print_log("Executed {func_name}, Elapsed time: {elapsed_time:.2f} seconds")
    def transform(self, save=False, load=False):
        feature_versions_results = self.execute_feature_versions(save=save, load=load)
        if not self.infer:
            self.data["date_id_copy"] = self.data["date_id"]
        concat_df = pd.concat([self.data] + list(feature_versions_results.values()), axis=1)

        exclude_columns = ["row_id", "time_id", "date_id"]
        final_data = self.feature_selection(concat_df, exclude_columns)
        return final_data


## Model Class

In [26]:
from sklearn.metrics import mean_absolute_error


class OptunaWeights:
    def __init__(self, random_state, n_trials=5000):
        self.study = None
        self.weights = None
        self.random_state = random_state
        self.n_trials = n_trials

    def _objective(self, trial, y_true, y_preds):
        # Define the weights for the predictions from each model
        #         weights = [trial.suggest_float(f"weight{n}", -2, 3) for n in range(len(y_preds))]
        weights = [max(0, trial.suggest_float(f"weight{n}", -2, 3)) for n in range(len(y_preds))]
        # Calculate the weighted prediction
        if sum(weights) == 0:
            num_models = len(y_preds)
            weights = [1 / num_models] * num_models
        weighted_pred = np.average(np.array(y_preds).T, axis=1, weights=weights)
        auc_score = mean_absolute_error(y_true, weighted_pred)
        #         log_loss_score=log_loss(y_true, weighted_pred)
        return auc_score  #/log_loss_score

    def fit(self, y_true, y_preds):
        optuna.logging.set_verbosity(optuna.logging.ERROR)
        sampler = optuna.samplers.CmaEsSampler(seed=self.random_state)
        pruner = optuna.pruners.HyperbandPruner()
        self.study = optuna.create_study(sampler=sampler, pruner=pruner, study_name="OptunaWeights",
                                         direction='maximize')
        objective_partial = partial(self._objective, y_true=y_true, y_preds=y_preds)
        self.study.optimize(objective_partial, n_trials=self.n_trials)
        self.weights = [self.study.best_params[f"weight{n}"] for n in range(len(y_preds))]

    def predict(self, y_preds):
        assert self.weights is not None, 'OptunaWeights error, must be fitted before predict'
        weighted_pred = np.average(np.array(y_preds).T, axis=1, weights=self.weights)
        return weighted_pred

    def fit_predict(self, y_true, y_preds):
        self.fit(y_true, y_preds)
        return self.predict(y_preds)

    def weights(self):
        return self.weights

In [27]:
# V3 ModlePipeline
class ModelPipeline:
    def __init__(self, lgb_model_weights=None):
        self.models = []
        self.models_list = []

        self.predictions = []
        self.predictions_list = []  # for single stacking

        self.inference_prediction = None

        self.optuna_weights = []

        self.lgb_model_weights = lgb_model_weights

    @staticmethod
    def _lgb_train(model, X_train, y_train, X_valid, y_valid):
        fit_params = {
            "callbacks": [lgb.callback.log_evaluation(period=100)]
        }
        if X_valid is not None and y_valid is not None:
            fit_params["eval_set"] = [(X_valid, y_valid)]
            fit_params["callbacks"].append(lgb.callback.early_stopping(stopping_rounds=100))
    
        model.fit(X_train, y_train, **fit_params)
        return model


    @staticmethod
    def _xgb_train(model, X_train, y_train, X_valid, y_valid):
        fit_params = {
            "eval_set": [(X_valid, y_valid)],
            "eval_metric": "mae",
            "verbose": 100,
            "early_stopping_rounds": 100
        }
        model.fit(X_train, y_train, **fit_params)
        return model

    @staticmethod
    def _cnn_train(model, X_train, y_train, X_valid, y_valid):
        pass

    def train(self, idx, X_train, y_train, X_valid, y_valid):
        self.models = []  # each fold 에서 모델 저장
        for model_name in config["model_name"]:
            model_cls = models_config[model_name]["model"]
            params = models_config[model_name]["params"]
            model = model_cls(**params)
            print(f"\n\n================== Training {model_name} ({idx}/{config['n_splits']})==================")
            if "lgb" in model_name:
                trained_model = self._lgb_train(model, X_train, y_train, X_valid, y_valid)
            elif "xgb" in model_name:
                trained_model = self._xgb_train(model, X_train, y_train, X_valid, y_valid)
            elif model_name == "pytorch_cnn":
                trained_model = self._cnn_train(model, X_train, y_train, X_valid, y_valid)
            else:
                raise ValueError("Invalid model name")
            self.models.append(trained_model)
            print(f"Successfully trained {model_name} ({idx}/{config['n_splits']})")
        self.models_list.append(self.models)  # 전체 fold 에서 모델 저장

    def predict(self, idx, X_test):
        self.predictions = []  # each fold 에서 모델 예측값 저장
        self.inference_prediction = None
        self.models = self.models_list[idx]  # 각 fold 에서 모델 불러 오기
        for model_name, model in zip(config["model_name"], self.models):
            print(
                f"\n\n================== Predict each model {model_name} ({idx}/{config['n_splits']})==================") if MODE != "inference" else None
            prediction = model.predict(X_test)  # 각 모델 예측
            self.predictions.append(prediction)
        self.inference_prediction = np.mean(self.predictions, axis=0) * lgb_model_weights[idx]  # non stacking
        self.predictions_list.append(self.predictions[0]) if config["stacking_mode"] and len(
            config["model_name"]) == 1 else None  # for single stacking
        print(f"Successfully predicted {model_name} ({idx}/{config['n_splits']})") if MODE != "inference" else None

    def _optuna_stacking(self, idx, y_test, X_test, infer):
        optuna = OptunaWeights(random_state=config["optuna_random_state"])
        if infer:
            self.inference_prediction = None
            optuna.weights = self.optuna_weights[idx]
            if idx == -1:
                self.inference_prediction = optuna.predict(self.predictions_list)
                self.predictions_list = []
            else:
                self.inference_prediction = optuna.predict(self.predictions)
        else:
            if idx == -1:  # single model stacking
                #  predict each model and add to self.prediction with y_test]
                self.predictions = []
                for idx, models in enumerate(self.models_list):
                    for model_name, model in zip(config["model_name"], models):
                        print(
                            f"\n\n================== Predict each model for stacking {model_name} ({idx}/{config['n_splits']})==================")
                        self.predictions.append(model.predict(X_test))
                        print(f"Successfully predicted for stacking {model_name} ({idx}/{config['n_splits']})")
                idx = -1
            print(f"\n\n================== Stacking ({idx}/{config['n_splits']})==================")
            y_test_pred = optuna.fit_predict(y_test.values, self.predictions)
            score = mean_absolute_error(y_test, y_test_pred)
            print(f"Score for stacking ({idx}/{config['n_splits']}): {score}")
            self.optuna_weights.append(optuna.weights)
            print(f"Successfully stacking ({idx}/{config['n_splits']})")

    def stacking(self, idx, y_test=None, X_test=None, infer=False):
        if config["stacking_algorithm"] == "optuna":
            self._optuna_stacking(idx, y_test, X_test, infer=infer)
        else:
            raise ValueError("Invalid stacking algorithm")

    def save_models(self):
        if MODE == "train":
            for idx in range(config["n_splits"] + 1):
                for n_model, model_name in enumerate(config["model_name"]):
                    model = self.models_list[idx][n_model]
                    joblib.dump(model, f"{config['model_dir']}/{idx}_{model_name}.pkl")
                    print(f"Successfully saved model ({config['model_dir']}/{idx}_{model_name}.pkl)")

    def save_optuna_weights(self):
        if MODE == "train":
            if config["stacking_mode"]:
                joblib.dump(self.optuna_weights, f"{config['model_dir']}/optuna_weights.pkl")
                print(f"Successfully saved optuna weights ({config['model_dir']}/optuna_weights.pkl)")

    def load_models(self):  # both 이면 안해도됨
        if MODE == "inference":
            for idx in range(config["n_splits"] + 1):
                self.models = []
                for model_name in config["model_name"]:
                    model = joblib.load(f"{config['model_dir']}/{idx}_{model_name}.pkl")
                    self.models.append(model)
                    print(f"Successfully loaded model ({config['model_dir']}/{idx}_{model_name}.pkl)")
                self.models_list.append(self.models)

    def load_optuna_weights(self):
        if MODE == "inference":
            if config["stacking_mode"]:
                self.optuna_weights = joblib.load(f"{config['model_dir']}/optuna_weights.pkl")
                print(f"Successfully loaded optuna weights ({config['model_dir']}/optuna_weights.pkl)")


# ## Main
## import data

In [28]:
dependencies = {
    "feature_version_imbalance_2_1": ["feature_version_imbalance_1", "feature_version_imbalance_2_0"],
    "feature_version_imbalance_6_1": ["feature_version_imbalance_6_0"],
    "feature_version_imbalance_7": ["feature_version_imbalance_1"],
    "feature_version_imbalance_8": ["feature_version_imbalance_3"],
    "feature_version_imbalance_9": ["feature_version_imbalance_3"],
    "feature_version_imbalance_10": ["feature_version_imbalance_1", "feature_version_imbalance_7"],
}

In [29]:
# # FOR VISUALIZE
# df = pd.read_csv(f"{config['data_dir']}/train.csv")
#
# # 데이터 전처리
# data_processor = DataPreprocessor(df)
# df = data_processor.transform()
# # Save할 피쳐 엔지니어링 함수 선택
# feature_engineer = FeatureEngineer(df, feature_versions=[
# ],
#                                    dependencies=dependencies)
# feature_engineer.generate_global_features(df)
# df = feature_engineer.transform()

In [30]:
# splitter = Splitter(method=config["split_method"], n_splits=config["n_splits"], correct=config["correct"],
#                     initial_fold_size_ratio=config["initial_fold_size_ratio"],
#                         train_test_ratio=config["train_test_ratio"], gap=config["gap"])
# for idx, (X_train, y_train, X_test, y_test) in enumerate(splitter.split(df)):
#     print(X_train.shape, y_train.shape, X_test.shape, y_test.shape)
# splitter.visualize_splits()

In [31]:
lgb_model_weights = weighted_average(config["n_splits"] + 1, equal_weight=False)
model_pipeline = ModelPipeline(lgb_model_weights)
if config["train_mode"]:
    # 데이터 불러오기

    df = pd.read_csv(f"{config['data_dir']}/train.csv")

    # 데이터 전처리
    data_processor = DataPreprocessor(data=df)
    df = data_processor.transform()

    # 사용할 피쳐 엔지니어링 함수 선택
    feature_engineer = FeatureEngineer(data=df, feature_versions=[
        'feature_version_time',
        'feature_version_imbalance_1',
        'feature_version_imbalance_2_0',
        'feature_version_imbalance_2_1',
        'feature_version_imbalance_3',
        'feature_version_imbalance_6_0',
        'feature_version_imbalance_6_1',
        'feature_version_imbalance_7',
        'feature_version_imbalance_8',
        'feature_version_imbalance_9',
        # 'feature_version_imbalance_10',
        # 'feature_version_imbalance_11',
        # 'feature_version_imbalance_12',
        # 'feature_version_custom_weight',
        # 'feature_version_order_flow',
    ],
                                       dependencies=dependencies)
    feature_engineer.generate_global_features(data=df)
    df = feature_engineer.transform(save=True)  # 맨 처음에는 save=True 돌렸으면, 다음부턴 transform(load=True)로 바꾸면된
    df_copy = df.copy()
    splitter = Splitter(method=config["split_method"], n_splits=config["n_splits"], correct=config["correct"],
                        initial_fold_size_ratio=config["initial_fold_size_ratio"],
                        train_test_ratio=config["train_test_ratio"], gap=config["gap"])
    for idx, (X_train, y_train, X_test, y_test) in enumerate(splitter.split(data=df, p_gap=5)):
        print(X_train.shape, y_train.shape, X_test.shape, y_test.shape)
        model_pipeline.train(idx=idx, X_train=X_train, y_train=y_train, X_valid=X_test, y_valid=y_test)
        model_pipeline.predict(idx=idx, X_test=X_test)
        if config["stacking_mode"] and len(config["model_name"]) > 1:  # 각 폴드마다 stacking
            model_pipeline.stacking(idx=idx, y_test=y_test)
    if config["stacking_mode"] and len(config["model_name"]) == 1:  # single model 에 대한 stacking
        model_pipeline.stacking(idx=-1, y_test=y_test,
                                X_test=X_test)  # stacking with last fold. if you want you can stacking with all folds
    # print(df_copy.shape, df_copy["target"].shape)
    model_pipeline.train(idx=config["n_splits"] + 1, X_train=df_copy.drop(columns=['target', 'date_id_copy']), y_train=df_copy["target"], X_valid=None, y_valid=None)
    model_pipeline.save_models()
    model_pipeline.save_optuna_weights()
    splitter.visualize_splits()

In [32]:
#far_price 및 near_price의 누락된 값을 채우고 세 가지 결과를 반환
def imputer(df):
    far_price_mean = df['far_price'].mean()
    near_price_mean = df['near_price'].mean()
    df['far_price'] = df['far_price'].fillna(far_price_mean)
    df['near_price'] = df['near_price'].fillna(near_price_mean)

    return df, far_price_mean, near_price_mean

# 결측치 채우기
def add_missing_data(df):
    all_stock_ids = set(range(200))
    all_missed_data_list = []

    #데이터를 미리 그룹화하여 각 time_id에 관련된 데이터에 빠르게 접근할 수 있도록 한다
    grouped = df.groupby('time_id')

    for t, group in grouped:
        current_stock_ids = set(group['stock_id'].to_list())
        missed_stock_id = list(all_stock_ids - current_stock_ids)
        
        date_id = group['date_id'].iloc[-1]
        seconds_in_bucket = group['seconds_in_bucket'].iloc[-1]
        
        missed_stock_id_num = len(missed_stock_id)
        missed_date_id = [date_id] * missed_stock_id_num
        missed_seconds_in_bucket = [seconds_in_bucket] * missed_stock_id_num
        missed_time_id = [t] * missed_stock_id_num
        
        missed_data = pd.DataFrame({
            'stock_id': missed_stock_id,
            'date_id': missed_date_id,
            'seconds_in_bucket': missed_seconds_in_bucket,
            'time_id': missed_time_id
        })
        
        all_missed_data_list.append(missed_data)

    all_missed_data = pd.concat(all_missed_data_list, axis=0).reset_index(drop=True).astype(int)

    df = pd.concat([df, all_missed_data], axis=0)
    df = df.sort_values(by=['time_id', 'stock_id']).reset_index(drop=True)
    df = df.groupby('stock_id').apply(lambda x: x.fillna(method='bfill')).reset_index(drop=True)

    return df

def sizesum_and_pricestd(df):
    #업데이트 후 10개의 특성 추가
    price_ftrs = ['reference_price', 'far_price', 'near_price', 'bid_price', 'ask_price', 'wap'] # std
    size_ftrs = ['imbalance_size', 'matched_size', 'bid_size', 'ask_size'] # sum
    
    rolled = df[['stock_id'] + size_ftrs].groupby('stock_id').rolling(window=8, min_periods=1).sum()
    rolled = rolled.reset_index(level=0, drop=True)
    for col in size_ftrs:
        df[f'{col}_rolled_sum'] = rolled[col]

    rolled = df[['stock_id'] + price_ftrs].groupby('stock_id').rolling(window=8, min_periods=1).std().fillna(0)
    rolled = rolled.reset_index(level=0, drop=True)
    for col in price_ftrs:
        df[f'{col}_rolled_std'] = rolled[col]

    return df

#리스트 요소 삭제
def remove_element(input_list, drop_list):
    return [e for e in input_list if e not in drop_list]

In [33]:
train = pd.read_csv(f"{config['data_dir']}/train.csv")
train = train.loc[train['target'].notna()]

train, far_price_mean, near_price_mean = imputer(train)
train = add_missing_data(train)
print('결측치：', train.isnull().sum().sum())

train = sizesum_and_pricestd(train)

no_feature_cols = ['date_id', 'row_id', 'time_id', 'target', 'currently_scored']

feature_cols = remove_element(train.columns, no_feature_cols)
target_col = 'target'

print('피처 수：', len(feature_cols))

#표준화
avg = train[feature_cols].mean()
std = train[feature_cols].std()

train[feature_cols] = (train[feature_cols] - avg)/std

#데이터를 float32 데이터 타입으로 변환
train = train.astype('float32')

seq_len = 8

# Grouping by time_id
grouped_by_time = train.groupby('stock_id')

def generate_data(grouped_by_time, seq_len):
    for _, group in grouped_by_time:
        # Sorting by stock_id to maintain consistency across images
        group_sorted = group.sort_values(by='time_id')
        features = group_sorted[feature_cols].values
        # print('features',features.shape)
        windows = []
        ############################################ 
        for t in range(0, seq_len - 1):
            copy_0 = np.stack([features[0]] * (seq_len - 1 - t))
            cut_0 = features[: t + 1]
            windows.append(np.vstack((copy_0, cut_0)))
            
        for t in range(0, features.shape[0] - seq_len + 1):
            windows.append(features[t: t+seq_len, :])
        ############################################
        # stock n의 0일~480일 0초 ~540초를 time_id기준으로 정렬했을 때
        # seq_len 길이만큼을 하나의 시퀀스 데이터로 만들되, 자기 시점 이전의 데이터 행 개수가 seq_len보다 작은 경우
        # 첫번째 행을 복사.
        # 예를 들어 seq_len이 5인데 데이터가 100개 있는 경우, 1번 데이터의 시퀀스는 11111
        # 예를 들어 seq_len이 5인데 데이터가 100개 있는 경우, 2번 데이터의 시퀀스는 11112
        # 예를 들어 seq_len이 5인데 데이터가 100개 있는 경우, 3번 데이터의 시퀀스는 11123
        # 이런 식으로 데이터 포인트를 시퀀스로 변환하고 시퀀스로 라벨을 맞추는 모델 만들기
        
        # Convert list of windows to numpy array
        features_array = np.stack(windows)
        
        # 시퀀스 형태 확인
        # print(len(windows),windows[0].shape,windows[0][0].shape)
        
        target = group_sorted['target'].values
        # Yield the result for this group to avoid storing all results in memory
        yield features_array, target

# Use generator to iterate over data
data_generator = generate_data(grouped_by_time, seq_len=seq_len)

# If you need to store results in arrays:
datas, labels = zip(*data_generator)

print(len(datas),datas[0].shape,datas[1].shape)

data = np.array(datas).reshape(-1, seq_len, len(feature_cols))
label = np.array(labels).reshape(-1,)
print('data_seq_to_reshaped', data.shape, 'label_shape', label.shape)
#del train, datas, labels, grouped_by_time

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print('device：', device)


data = torch.tensor(data, dtype=torch.float32).to(device)
label = torch.tensor(label, dtype=torch.float32).to(device)

print('데이터 형태', data.shape)
print('라벨 형태', label.shape)

#train,val set 분리
torch.manual_seed(42)

dataset = TensorDataset(data, label)

train_ratio = 0.8
train_size = int(train_ratio * len(dataset))
valid_size = len(dataset) - train_size
train_dataset, valid_dataset = random_split(dataset, [train_size, valid_size])

batch_size = 4096

train_loader = DataLoader(train_dataset, batch_size=batch_size)
valid_loader = DataLoader(valid_dataset, batch_size=batch_size)

print('batch size：', next(iter(train_loader))[0].shape)

결측치： 0
피처 수： 23
200 (26455, 8, 23) (26455, 8, 23)
data_seq_to_reshaped (5291000, 8, 23) label_shape (5291000,)
device： cpu
데이터 형태 torch.Size([5291000, 8, 23])
라벨 형태 torch.Size([5291000])
batch size： torch.Size([4096, 8, 23])


In [34]:
class MyModel(nn.Module):
    def __init__(self, feature_num, d_model, nhead, num_layers):
        super(MyModel, self).__init__()
        self.embedding = nn.Linear(feature_num, d_model)
        self.tf0 = nn.Transformer(d_model=d_model, nhead=nhead, num_encoder_layers=2, batch_first=True)
        self.tf1 = nn.Transformer(d_model=d_model, nhead=nhead, num_encoder_layers=num_layers, batch_first=True)
        # [batch_size, seq_len, d_model] nn.Transformer가 입력받는 포멧
        self.fc = nn.Linear(d_model, d_model)
        self.dropout = nn.Dropout(0.5)
        self.tf2 = nn.Transformer(d_model=d_model, nhead=nhead, num_encoder_layers=num_layers, batch_first=True)
        self.decoder_0 = nn.Linear(d_model, d_model//2)
        self.decoder_1 = nn.Linear(d_model//2 , 1)                            
        self.decoder = nn.Linear(d_model, 1)

        self.ffnn = nn.Sequential(
            nn.Linear(d_model, 2*d_model),  
            nn.ReLU(),                  
            nn.Linear(2*d_model, d_model)  
        )

    def forward(self, x):
        # x = self.embedding(x)
        # x = self.tf1.encoder(x)
        # x = x[:, -1, :]
        # x = self.fc(x) 
        # x = self.dropout(x)
        # x = self.tf2.encoder(x)
        # x = self.decoder(x)

        x = self.embedding(x)
        x = self.tf1.encoder(x)
        x = x[:, -1, :]
        # x = self.fc(x) 
        x = self.dropout(x)
        x = self.tf2.encoder(x)
        x = self.decoder(x)
        
        return x

In [35]:
if is_train:
    input_size = data.shape[-1]
    print(input_size)
    n_epochs = 50
    lr = 1e-03

    # pre mae init
    pre_epoch_valid_mae = np.inf

    # MAE가 두 번의 에폭에서 감소하지 않으면 학습률을 절반으로 줄인다"
    patience_counter = 0

    model = MyModel(feature_num=input_size, d_model=64, nhead=8, num_layers=1).to(device)
    
    # optimizer = optim.Adam(model.parameters(), lr=lr, weight_decay=1e-5)
    optimizer = optim.Adam(model.parameters(), lr=lr, weight_decay=1e-5)
    loss = nn.L1Loss().to(device)

    # out_path = "model/"
    # if not os.path.exists(out_path):
    #     os.makedirs(out_path)
    best_mae = np.inf

    print(f'Train start...')
    for epoch in range(n_epochs):
        model.train()
        train_maes = []
        batch_num = len(train_loader)

        # 훈련
        for X, y in train_loader:
            optimizer.zero_grad()
            outputs = model(X).squeeze()
            l = loss(outputs, y)
            l.backward()
            nn.utils.clip_grad_norm_(model.parameters(), max_norm=1)
            optimizer.step()
            mae = l.item()
            train_maes.append(mae)
        epoch_train_mae = np.mean(train_maes)
        print(f'Epoch [{epoch+1}/{n_epochs}] Training average MAE: {epoch_train_mae:.4f}')
        train_maes = []

        # 검증
        model.eval()
        with torch.no_grad():
            valid_maes = []
            for X_v, y_v in valid_loader:
                preds = model(X_v).squeeze()
                mae = torch.abs(preds - y_v).mean().item()
                valid_maes.append(mae)
            epoch_valid_mae = np.mean(valid_maes)
            print(f'Epoch [{epoch+1}/{n_epochs}] Validation average MAE: {epoch_valid_mae:.4f}')
            
            if epoch_valid_mae < best_mae:
                best_mae = epoch_valid_mae
                # torch.save(model, os.path.join(out_path, f"model_epoch_{epoch+1}.pt"))
    
                now = datetime.datetime.now()
                time_string = now.strftime("%Y%m%d_%H%M")
                torch.save(model, f"{config['model_dir']}/model_epoch_{epoch+1}.pt")
                # torch.save(model, f"transformer_model.pt")
                
        #이전 라운드의 MAE가 현재 MAE보다 개선되지 않으면 학습률을 절반으로 줄인다"
        if epoch_valid_mae - pre_epoch_valid_mae > 0:
            patience_counter += 1

            if patience_counter == 2:
                lr = lr * 0.75
                patience_counter = 0
                for param_group in optimizer.param_groups:
                    param_group['lr'] = lr  # 학습률 업데이트 
                    print(f'renew lr to {lr}')

        # MAE 업데이트
        pre_epoch_valid_mae = epoch_valid_mae

        # 분기가 0.03을 초과하거나 학습률이 1e-7보다 낮을 때 훈련을 중단
        if (epoch_valid_mae - epoch_train_mae > 0.03) or (lr <1e-7):
            print('Early stop now.')
            break
    print(f'Train over.')

In [36]:
train = pd.read_csv("/kaggle/input/optiver-trading-at-the-close/train.csv")

#整体特征
median_sizes = train.groupby('stock_id')['bid_size'].median() + train.groupby('stock_id')['ask_size'].median()
std_sizes = train.groupby('stock_id')['bid_size'].std() + train.groupby('stock_id')['ask_size'].std()
max_sizes = train.groupby('stock_id')['bid_size'].max() + train.groupby('stock_id')['ask_size'].max()
min_sizes = train.groupby('stock_id')['bid_size'].min() + train.groupby('stock_id')['ask_size'].min()
mean_sizes = train.groupby('stock_id')['bid_size'].mean() + train.groupby('stock_id')['ask_size'].mean()
first_sizes = train.groupby('stock_id')['bid_size'].first() + train.groupby('stock_id')['ask_size'].first()
last_sizes = train.groupby('stock_id')['bid_size'].last() + train.groupby('stock_id')['ask_size'].last()
#可以再做日期的（好像没看到drop掉日期列）

train = train.dropna(subset=['target'])
train.reset_index(drop=True, inplace=True)

def feature_eng(df):
    cols = [c for c in df.columns if c not in ['row_id', 'time_id']]
    df = df[cols]
    
    #匹配失败数量和匹配成功数量的比率
    df['imbalance_ratio'] = df['imbalance_size'] / df['matched_size']
    #供需市场的差额
    df['bid_ask_volume_diff'] = df['ask_size'] - df['bid_size']
    #供需市场总和
    df['bid_plus_ask_sizes'] = df['bid_size'] + df['ask_size']
    
    #供需价格的均值
    df['mid_price'] = (df['ask_price'] + df['bid_price']) / 2
    
    #整体数据情况
    df['median_size'] = df['stock_id'].map(median_sizes.to_dict())
    df['std_size'] = df['stock_id'].map(std_sizes.to_dict())
    df['max_size'] = df['stock_id'].map(max_sizes.to_dict())
    df['min_size'] = df['stock_id'].map(min_sizes.to_dict())
    df['mean_size'] = df['stock_id'].map(mean_sizes.to_dict())
    df['first_size'] = df['stock_id'].map(first_sizes.to_dict())    
    df['last_size'] = df['stock_id'].map(last_sizes.to_dict())       
    
    #整体市场规模和当前的市场规模比较
    df['high_volume'] = np.where(df['bid_plus_ask_sizes'] > df['median_size'], 1, 0)
    
    prices = ['reference_price', 'far_price', 'near_price', 'ask_price', 'bid_price', 'wap']
    
    #价格之间做差，做差/求和
    for c in combinations(prices, 2):
        df[f'{c[0]}_minus_{c[1]}'] = (df[f'{c[0]}'] - df[f'{c[1]}']).astype(np.float32)
        df[f'{c[0]}_{c[1]}_imb'] = df.eval(f'({c[0]} - {c[1]})/({c[0]} + {c[1]})')
        
    for c in combinations(prices, 3):
        max_ = df[list(c)].max(axis=1)
        min_ = df[list(c)].min(axis=1)
        mid_ = df[list(c)].sum(axis=1) - min_ - max_
        
        df[f'{c[0]}_{c[1]}_{c[2]}_imb2'] = (max_-mid_)/(mid_-min_ + 1e-4)
    
    gc.collect()
    
    return df

y = train['target'].values
X = feature_eng(train.drop(columns='target'))
X_date_id = X['date_id']
X = X.drop('date_id',axis=1)

y_min = np.min(y)
y_max = np.max(y)

In [37]:
def feat_eng_nn(df: pd.DataFrame):
    # change seconds_in_bucket to 9 categories (9 min) & make a new col
    df["stage"] = np.where(df["seconds_in_bucket"] > 300, 1, 0)
    df["min_in_bucket"] = df["seconds_in_bucket"]
    for i in range(9):
        t1, t2 = i * 60, ((i+1) * 60 if i < 8 else 541 )
        df.loc[(df["min_in_bucket"] >= t1) & (df["min_in_bucket"] < t2), "min_in_bucket"] = i 

    # create discrete feature
    int_feat = df.dtypes[df.dtypes == "int64"].to_dict().keys()
    
    # handle invaild values
    X_dsc = df[int_feat]
    for f in int_feat:
        mv = np.min(X_dsc[f])
        if mv < 0:
            X_dsc[f] += 0 - mv
    X_dsc = X_dsc.drop(columns="seconds_in_bucket")
    assert not X_dsc.isnull().any().any()
    cat_num = X_dsc.nunique()
    
    X_ctg = df.drop(columns=int_feat)
    X_ctg = X_ctg.fillna(0)
    
    
    return X_ctg, X_dsc, cat_num

X_ctg, X_dsc, cat_num = feat_eng_nn(X)
print(X_ctg.dtypes)
print(X_dsc.dtypes)
print(X_dsc.head())
print(X_dsc.min())
print(cat_num)

imbalance_size                               float64
reference_price                              float64
matched_size                                 float64
far_price                                    float64
near_price                                   float64
bid_price                                    float64
bid_size                                     float64
ask_price                                    float64
ask_size                                     float64
wap                                          float64
imbalance_ratio                              float64
bid_ask_volume_diff                          float64
bid_plus_ask_sizes                           float64
mid_price                                    float64
median_size                                  float64
std_size                                     float64
max_size                                     float64
min_size                                     float64
mean_size                                    f

In [40]:
scaler = QuantileTransformer(output_distribution='normal', n_quantiles=30000, subsample=500000)
scaled_data = scaler.fit_transform(X_ctg)
X_ctg = pd.DataFrame(scaled_data, columns=X_ctg.columns)

In [41]:
class NNDataset(Dataset):
    def __init__(self, X_c, X_d, y_nn):
        self.X_c = X_c
        self.X_d = X_d
        self.y_nn = y_nn
    
    def __len__(self):
        return len(self.X_d)
    
    def __getitem__(self, idx):
        if self.y_nn is None:
            return torch.tensor(self.X_c.iloc[idx]).float(), torch.tensor(self.X_d.iloc[idx]).long()
        return torch.tensor(self.X_c.iloc[idx]).float(), torch.tensor(self.X_d.iloc[idx]).long(), torch.tensor(self.y_nn[idx]).float()

In [42]:
class CNN(nn.Module):
    def __init__(self,
                 num_features: int,
                 hidden_size: int,
                 n_categories: List[int],
                 emb_dim: int = 10,
                 dropout_cat: float = 0.2,
                 channel_1: int = 256,
                 channel_2: int = 512,
                 channel_3: int = 512,
                 dropout_top: float = 0.1,
                 dropout_mid: float = 0.3,
                 dropout_bottom: float = 0.2,
                 weight_norm: bool = True,
                 two_stage: bool = True,
                 celu: bool = True,
                 kernel1: int = 5,
                 leaky_relu: bool = False):
        super().__init__()

        num_targets = 1

        cha_1_reshape = int(hidden_size / channel_1)
        cha_po_1 = int(hidden_size / channel_1 / 2)
        cha_po_2 = int(hidden_size / channel_1 / 2 / 2) * channel_3

        self.cat_dim = emb_dim * len(n_categories)
        self.cha_1 = channel_1
        self.cha_2 = channel_2
        self.cha_3 = channel_3
        self.cha_1_reshape = cha_1_reshape
        self.cha_po_1 = cha_po_1
        self.cha_po_2 = cha_po_2
        self.two_stage = two_stage

        self.expand = nn.Sequential(
            nn.BatchNorm1d(num_features + self.cat_dim),
            nn.Dropout(dropout_top),
            nn.utils.weight_norm(nn.Linear(num_features + self.cat_dim, hidden_size), dim=None),
            nn.CELU(0.06) if celu else nn.ReLU()
        )

        def _norm(layer, dim=None):
            return nn.utils.weight_norm(layer, dim=dim) if weight_norm else layer

        self.conv1 = nn.Sequential(
            nn.BatchNorm1d(channel_1),
            nn.Dropout(dropout_top),
            _norm(nn.Conv1d(channel_1, channel_2, kernel_size=kernel1, stride=1, padding=kernel1 // 2, bias=False)),
            nn.ReLU(),
            nn.AdaptiveAvgPool1d(output_size=cha_po_1),
            nn.BatchNorm1d(channel_2),
            nn.Dropout(dropout_top),
            _norm(nn.Conv1d(channel_2, channel_2, kernel_size=3, stride=1, padding=1, bias=True)),
            nn.ReLU()
        )

        if self.two_stage:
            self.conv2 = nn.Sequential(
                nn.BatchNorm1d(channel_2),
                nn.Dropout(dropout_mid),
                _norm(nn.Conv1d(channel_2, channel_2, kernel_size=3, stride=1, padding=1, bias=True)),
                nn.ReLU(),
                nn.BatchNorm1d(channel_2),
                nn.Dropout(dropout_bottom),
                _norm(nn.Conv1d(channel_2, channel_3, kernel_size=5, stride=1, padding=2, bias=True)),
                nn.ReLU()
            )

        self.max_po_c2 = nn.MaxPool1d(kernel_size=4, stride=2, padding=1)

        self.flt = nn.Flatten()

        if leaky_relu:
            self.dense = nn.Sequential(
                nn.BatchNorm1d(cha_po_2),
                nn.Dropout(dropout_bottom),
                _norm(nn.Linear(cha_po_2, num_targets), dim=0),
                nn.LeakyReLU()
            )
        else:
            self.dense = nn.Sequential(
                nn.BatchNorm1d(cha_po_2),
                nn.Dropout(dropout_bottom),
                _norm(nn.Linear(cha_po_2, num_targets), dim=0)
            )

        self.embs = nn.ModuleList([nn.Embedding(x, emb_dim) for x in n_categories])
        self.cat_dim = emb_dim * len(n_categories)
        self.dropout_cat = nn.Dropout(dropout_cat)

    def forward(self, x_num, x_cat):
        embs = [embedding(x_cat[:, i]) for i, embedding in enumerate(self.embs)]
        x_cat_emb = self.dropout_cat(torch.cat(embs, 1))
        x = torch.cat([x_num, x_cat_emb], 1)

        x = self.expand(x)

        x = x.reshape(x.shape[0], self.cha_1, self.cha_1_reshape)

        x = self.conv1(x)

        if self.two_stage:
            x = self.conv2(x) * x

        x = self.max_po_c2(x)
        x = self.flt(x)
        x = self.dense(x)

        return torch.squeeze(x)

In [43]:
from sklearn.model_selection._split import _BaseKFold, indexable, _num_samples
from sklearn.utils.validation import _deprecate_positional_args

# modified code for group gaps; source
# https://github.com/getgaurav2/scikit-learn/blob/d4a3af5cc9da3a76f0266932644b884c99724c57/sklearn/model_selection/_split.py#L2243
class PurgedGroupTimeSeriesSplit(_BaseKFold):
    """Time Series cross-validator variant with non-overlapping groups.
    Allows for a gap in groups to avoid potentially leaking info from
    train into test if the model has windowed or lag features.
    Provides train/test indices to split time series data samples
    that are observed at fixed time intervals according to a
    third-party provided group.
    In each split, test indices must be higher than before, and thus shuffling
    in cross validator is inappropriate.
    This cross-validation object is a variation of :class:`KFold`.
    In the kth split, it returns first k folds as train set and the
    (k+1)th fold as test set.
    The same group will not appear in two different folds (the number of
    distinct groups has to be at least equal to the number of folds).
    Note that unlike standard cross-validation methods, successive
    training sets are supersets of those that come before them.
    Read more in the :ref:`User Guide <cross_validation>`.
    Parameters
    ----------
    n_splits : int, default=5
        Number of splits. Must be at least 2.
    max_train_group_size : int, default=Inf
        Maximum group size for a single training set.
    group_gap : int, default=None
        Gap between train and test
    max_test_group_size : int, default=Inf
        We discard this number of groups from the end of each train split
    """

    @_deprecate_positional_args
    def __init__(self,
                 n_splits=5,
                 *,
                 max_train_group_size=np.inf,
                 max_test_group_size=np.inf,
                 group_gap=None,
                 verbose=False
                 ):
        super().__init__(n_splits, shuffle=False, random_state=None)
        self.max_train_group_size = max_train_group_size
        self.group_gap = group_gap
        self.max_test_group_size = max_test_group_size
        self.verbose = verbose

    def split(self, X, y=None, groups=None):
        """Generate indices to split data into training and test set.
        Parameters
        ----------
        X : array-like of shape (n_samples, n_features)
            Training data, where n_samples is the number of samples
            and n_features is the number of features.
        y : array-like of shape (n_samples,)
            Always ignored, exists for compatibility.
        groups : array-like of shape (n_samples,)
            Group labels for the samples used while splitting the dataset into
            train/test set.
        Yields
        ------
        train : ndarray
            The training set indices for that split.
        test : ndarray
            The testing set indices for that split.
        """
        if groups is None:
            raise ValueError(
                "The 'groups' parameter should not be None")
        X, y, groups = indexable(X, y, groups)
        n_samples = _num_samples(X)
        n_splits = self.n_splits
        group_gap = self.group_gap
        max_test_group_size = self.max_test_group_size
        max_train_group_size = self.max_train_group_size
        n_folds = n_splits + 1
        group_dict = {}
        u, ind = np.unique(groups, return_index=True)
        unique_groups = u[np.argsort(ind)]
        n_samples = _num_samples(X)
        n_groups = _num_samples(unique_groups)
        for idx in np.arange(n_samples):
            if (groups[idx] in group_dict):
                group_dict[groups[idx]].append(idx)
            else:
                group_dict[groups[idx]] = [idx]
        if n_folds > n_groups:
            raise ValueError(
                ("Cannot have number of folds={0} greater than"
                 " the number of groups={1}").format(n_folds,
                                                     n_groups))

        group_test_size = min(n_groups // n_folds, max_test_group_size)
        group_test_starts = range(n_groups - n_splits * group_test_size,
                                  n_groups, group_test_size)
        for group_test_start in group_test_starts:
            train_array = []
            test_array = []

            group_st = max(0, group_test_start - group_gap - max_train_group_size)
            for train_group_idx in unique_groups[group_st:(group_test_start - group_gap)]:
                train_array_tmp = group_dict[train_group_idx]
                
                train_array = np.sort(np.unique(
                                      np.concatenate((train_array,
                                                      train_array_tmp)),
                                      axis=None), axis=None)

            train_end = train_array.size
 
            for test_group_idx in unique_groups[group_test_start:
                                                group_test_start +
                                                group_test_size]:
                test_array_tmp = group_dict[test_group_idx]
                test_array = np.sort(np.unique(
                                              np.concatenate((test_array,
                                                              test_array_tmp)),
                                     axis=None), axis=None)

            test_array  = test_array[group_gap:]
            
            
            if self.verbose > 0:
                    pass
                    
            yield [int(i) for i in train_array], [int(i) for i in test_array]

In [44]:
N_Folds = 5
#ts = TimeSeriesSplit(n_splits=4)
gkf = PurgedGroupTimeSeriesSplit(n_splits = 5, group_gap = 5)

is_train = False

params_nn = {
    "batch_size": 1200,
    "min_lr": 1e-7,
    "lr": 1e-3,
    "epochs": 35,
    "val_iter": 1500,
    "train_log_step": 500,
    "scheduler_factor":0.5,
    "scd_patience": 2,
    "patience": 5,
    "weight_decay": 6.5e-6
}

N, D_c = X_ctg.shape
print(N, D_c)
embed_dims = list(cat_num.to_dict().values())
device = "cuda:0" if torch.cuda.is_available() else "cpu"
print(f"device: {device}")
output_dir = "cnn_models_ScalerVer_Quantile_30000_1200"
# os.system(f'mkdir {output_dir}')

if is_train:
    for fold_i, (train_idx, valid_idx) in enumerate(gkf.split(X, y, X_date_id)):

        if fold_i == 0:
            continue
        
        # data
        tr_X_ctg, tr_X_dsc, tr_y = X_ctg.iloc[train_idx], X_dsc.iloc[train_idx], y[train_idx]
        valid_idx_small = valid_idx[:495000]
        val_X_ctg, val_X_dsc, val_y = X_ctg.iloc[valid_idx_small], X_dsc.iloc[valid_idx_small], y[valid_idx_small]
        
        # build torch dataset and dataloader 
        dataset_tr = NNDataset(tr_X_ctg, tr_X_dsc, tr_y)
        dataset_val = NNDataset(val_X_ctg, val_X_dsc, val_y)
        
        loader_tr = DataLoader(dataset_tr, batch_size=params_nn["batch_size"], shuffle=False, num_workers=10)
        loader_val = DataLoader(dataset_val, batch_size=params_nn["batch_size"], shuffle=False, num_workers=10)
        
        # build model and related modules
        model = CNN(num_features=D_c,
                    n_categories=embed_dims,
                    hidden_size=8*200,
                    emb_dim=30,
                    dropout_cat=0,
                    channel_1=200,
                    channel_2=3*200,
                    channel_3=3*200,
                    dropout_top=0.35,
                    dropout_mid=0.35,
                    dropout_bottom=0.35,
                    weight_norm=True,
                    two_stage=False,
                    celu=False,
                    kernel1=5,
                    leaky_relu=False)
        
        model.to(device)
        criterion = nn.L1Loss()
        optimizer = optim.Adam(model.parameters(), lr=params_nn["lr"], weight_decay=params_nn["weight_decay"] )
        scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, "min", min_lr = params_nn["min_lr"],
                                                         factor=params_nn["scheduler_factor"],
                                                         patience=params_nn["patience"],verbose = True)
        
        # begin training
        n_iter = np.ceil(len(dataset_tr) / params_nn["batch_size"])
        best_loss = np.inf
        last_epoch_loss = np.inf
        p = 0
        for epoch in range(params_nn["epochs"]):
            loss_epoch = []
            for i, (x_c, x_d, y_tr) in enumerate(loader_tr):
                optimizer.zero_grad()
                x_c, x_d, y_tr = x_c.to(device), x_d.to(device), y_tr.to(device)
                y_hat = model(x_c, x_d)
                loss = criterion(y_hat, y_tr)
                loss.backward()
                optimizer.step()
                
                loss_epoch.append(loss.detach().item())
                
                # log
                if i != 0 and i % params_nn["train_log_step"] == 0:
                    print(f"Fold {fold_i:1}, "
                          f"Epoch {epoch:2}/{params_nn['epochs']:2}, "
                          f"iter {i:5}/{n_iter:5}, "
                          f"loss {np.mean(loss_epoch[-params_nn['train_log_step']:]):8.4f}")
                
                # validate
                if i % params_nn["val_iter"] == 0 and i != 0 or i == n_iter - 1:
                    loss_val = []
                    bar = tqdm.tqdm(total=np.ceil(len(dataset_val) / params_nn["batch_size"]))
                    with torch.no_grad():
                        for i, (x_c, x_d, y_val) in enumerate(loader_val):
                            x_c, x_d, y_val = x_c.to(device), x_d.to(device), y_val.to(device)

                            y_hat = model(x_c, x_d)
                            loss = criterion(y_hat, y_val)

                            loss_val.append(loss.item())
                            bar.update()
                    
                    loss_val_avg = np.mean(loss_val)
                    print(f"==> Val current best loss {best_loss:8.4f}")
                    print(f"Fold {fold_i:1}, "
                          f"Epoch {epoch:2}/{params_nn['epochs']:2}, "
                          f"Valid loss {loss_val_avg:8.4f}")
                    if loss_val_avg < best_loss:
                        best_loss = loss_val_avg
                        print(f"Loss decreases! current best loss {best_loss:8.4f}")
                        torch.save(model, os.path.join(output_dir, f"fold{fold_i}.pt"))
                    print()
            
            # early stop
            if loss_val_avg >= last_epoch_loss:
                p += 1
                print(f"Best loss doesn't decrease in this epoch, patience {p}/{params_nn['patience']}")
                if p >= params_nn["patience"]:
                    print(f"Reach patience, quit training of fold {fold_i}")
                    print()
                    break
            else:
                print(f"Epoch loss decreases from {last_epoch_loss:8.4f} to {loss_val_avg:8.4f}")
                last_epoch_loss = loss_val_avg
                p = 0
            
            # scheduler
            scheduler.step(loss_val_avg)
            print()

5237892 71
device: cpu


### upload kaggle dataset

In [45]:
# # want to see feature importance plot for each fold
# for idx, models in enumerate(model_pipeline.models_list):
#     for model_name, model in zip(config["model_name"], models):
#         if "lgb" in model_name:
#             lgb.plot_importance(model, importance_type="gain", figsize=(20, 20))
#         elif "xgb" in model_name:
#             xgb.plot_importance(model, importance_type="gain", figsize=(20, 20))
#         else:
#             raise ValueError("Invalid model name")
#         plt.title(f"Feature Importance ({model_name})")
#         # plt.savefig(f"{config['model_dir']}/{idx}_{model_name}_feature_importance.png")
#         plt.show()



#### dataset init
! /home/username/.local/bin/kaggle datasets init -p {config['model_dir']}
#### dataset create 
! /home/username/.local/bin/kaggle datasets create -p {config['model_dir']}

In [46]:
# KAGGLE_DATASET_NAME = "model-version-31"

In [47]:
if MODE == "train":
    ! /usr/local/bin/kaggle datasets init -p {config['model_dir']}
    import json

    with open(f"{config['model_dir']}/dataset-metadata.json", "r") as file:
        data = json.load(file)

    data["title"] = data["title"].replace("INSERT_TITLE_HERE", f"{KAGGLE_DATASET_NAME}")
    data["id"] = data["id"].replace("INSERT_SLUG_HERE", f"{KAGGLE_DATASET_NAME}")

    with open(f"{config['model_dir']}/dataset-metadata.json", "w") as file:
        json.dump(data, file, indent=2)

    ! /usr/local/bin/kaggle datasets create -p {config['model_dir']}

    # !/usr/local/bin/kaggle datasets version -p {config['model_dir']} -m 'Updated data'

In [48]:
class TestStack:
    #time_id 추가
    def __init__(self, window_size=6):
        self.window_size = window_size * 2
        self.stock_cache = []  # Dictionary to hold cache for each stock

    def test_stack(self, test, time_id):
        # Convert batch_data to DataFrame if it's a list of dicts
        if isinstance(test, list):
            test = pd.DataFrame(test)
            
        test['time_id'] = time_id
        
        #단일 데이터 추가
        self.stock_cache.append(test)
        
        if len(self.stock_cache) > self.window_size:
            # 현재 데이터가 n개를 초과하면 n개 이후 데이터는 버림 
            self.stock_cache = self.stock_cache[-self.window_size:]
            test = pd.concat(self.stock_cache, axis=0).reset_index(drop=True)
        else:
            # 초기화, n개의 데이터를 이미 수집했다면 현재 데이터를 6번 복사
            self.stock_cache = []
            for t in range(self.window_size): # [0, 1, 2, 3, 4, 5]
                test['time_id'] = t - self.window_size + 1 # [-5, -4, -3, -2, -1, 0]
                test_add = test.copy()
                self.stock_cache.append(test_add)
            test = pd.concat(self.stock_cache, axis=0).reset_index(drop=True).sort_values(by='time_id')
            
        return test.sort_values(['time_id', 'stock_id'])

test_cols = None
def df_to_seq(test, seq_len):
    grouped_by_stock = test.groupby('stock_id')
    datas = []

    for _, group in grouped_by_stock:
        group_sorted = group.sort_values(by='time_id')
        cols = remove_element(test.columns, no_feature_cols)
        
        features = group_sorted[cols].values # [12, 23]
        
        features = features[-seq_len:, ]
        datas.append(features)

    return np.stack(datas)

def zero_sum(prices, volumes):
    std_error = np.sqrt(volumes)
    step = np.sum(prices)/np.sum(std_error)
    out = prices-std_error*step
    return out

In [49]:
if is_infer:
    model_names = ['model_epoch_50_3.pt']
    
    models = []
    for model_name in model_names:
        models.append(torch.load(f"/kaggle/input/model-epoch-50-3/{model_name}", map_location=device))

In [50]:
if is_pre_test:
    # 제출 전 테스트 
    main_dir = '/kaggle/input/optiver-trading-at-the-close/'
    
    test_df = pd.read_csv(main_dir + 'example_test_files/test.csv')
    #test_df = test_df.drop(columns=['target'])
    test_group = test_df.groupby(['time_id'])
    tdp = TestStack(window_size=seq_len)

    counter = 0
    for test in test_group:
        test = test[1]
        test = test.drop(columns=['time_id'])

        # zerosum
        volumes = test.loc[:,'bid_size'] + test.loc[:,'ask_size']

        # 결측치를 평균값으로 채우기 
        test['far_price'] = test['far_price'].fillna(far_price_mean)
        test['near_price'] = test['near_price'].fillna(near_price_mean)

        # 데이터 쌓기 
        test_stack = tdp.test_stack(test, counter)

        # FE
        test = sizesum_and_pricestd(test_stack)

        # 정규화
        test_cols = remove_element(test.columns, no_feature_cols)
        test[test_cols] = (test[test_cols] - avg)/std

        # 직렬화
        test = df_to_seq(test, seq_len)
    #     print(test.shape)

        # 예측 
        predictions = np.zeros((test.shape[0],))
        for model in models:
            test = torch.tensor(test, dtype=torch.float32).squeeze().to(device)
            predictions_tmp = model(test).squeeze().cpu()
            predictions_tmp = predictions_tmp.detach().numpy()
            predictions += predictions_tmp
        
        predictions /= len(models)
        # zero sum조정
        predictions = zero_sum(predictions, volumes)

        print(predictions.shape)

        counter += 1

In [51]:
dependencies = {
    "feature_version_imbalance_2_1": ["feature_version_imbalance_1", "feature_version_imbalance_2_0"],
    "feature_version_imbalance_6_1": ["feature_version_imbalance_6_0"],
    "feature_version_imbalance_7": ["feature_version_imbalance_1"],
    "feature_version_imbalance_8": ["feature_version_imbalance_3"],
    "feature_version_imbalance_9": ["feature_version_imbalance_3"],
    "feature_version_imbalance_10": ["feature_version_imbalance_1", "feature_version_imbalance_7"],
}

In [60]:
if config["infer_mode"]:
    import optiver2023

    optiver2023.make_env.func_dict['__called__'] = False
    env = optiver2023.make_env()
    iter_test = env.iter_test()

    y_min, y_max = -64, 64
    qps = []
    counter = 0
    cache = pd.DataFrame()

    model_pipeline.load_models()
    model_pipeline.load_optuna_weights()
    
    batch_size_test = 200
    
    tdp = TestStack(window_size=seq_len)
    
    lgb_model_weights = weighted_average(config["n_splits"] + 1, equal_weight=False)

    # This is for the generate_global_features (only need to run once)
    df = pd.read_csv(f"{config['data_dir']}/train.csv")
    data_processor = DataPreprocessor(data=df)
    df = data_processor.transform()
    feature_engineer = FeatureEngineer(data=df)
    feature_engineer.generate_global_features(data=df)

    for (test, revealed_targets, sample_prediction) in iter_test:
        
        now_time = time.time()
        
        if not test.currently_scored.iloc[0]:
            sample_prediction['target'] = 0
            env.predict(sample_prediction)
            counter += 1
            qps.append(time.time() - now_time)
            if counter % 10 == 0:
                print(counter, 'qps:', np.mean(qps))
            continue
        
        clipped_predictionss = []
        
        test_ = test.copy()
        
        if INFER_USE_TF:
            volumes = test.loc[:,'bid_size'] + test.loc[:,'ask_size']

             # 결측치를 평균값으로 채우기 
            test['far_price'] = test['far_price'].fillna(far_price_mean)
            test['near_price'] = test['near_price'].fillna(near_price_mean)

            test_stack = tdp.test_stack(test, counter)
            # print(counter, test_stack.shape)

            test = sizesum_and_pricestd(test_stack)

            test_cols = remove_element(test.columns, no_feature_cols)
            test[test_cols] = (test[test_cols] - avg)/std

            testseq = df_to_seq(test, seq_len)

            predictions = np.zeros((testseq.shape[0],))
            # print('predictions shape', predictions.shape, 'test shape', test.shape)
            for model in models:
                test2 = torch.tensor(testseq, dtype=torch.float32).squeeze().to(device)
                predictions_tmp = model(test2).squeeze().cpu()
                predictions_tmp = predictions_tmp.detach().numpy()
                predictions += predictions_tmp
            predictions /= len(models)

            predictions = zero_sum(predictions, volumes)
            
            clipped_predictions = predictions.values

            clipped_predictionss.append(clipped_predictions)
        
        if INFER_USE_CNN:
            
            test2 = test_.copy()
            
            test2.drop('currently_scored',axis=1,inplace=True)
            feat2 = feature_eng(test2)
            feat2.drop('date_id',axis=1,inplace=True)
            
            X_ctg, X_dsc, cat_num = feat_eng_nn(feat2)
            scaled_ctg = scaler.transform(X_ctg)
            X_ctg = pd.DataFrame(scaled_ctg, columns = X_ctg.columns)

            test_dataset = NNDataset(X_ctg, X_dsc, None)
            test_loader = DataLoader(test_dataset,batch_size=batch_size_test,shuffle=False)

            fold_prediction = np.zeros((test2.shape[0],))
            for fold in range(0, N_Folds):
                model_filename = f"/kaggle/input/cnn-save-world/fold{fold}.pt"
                m = torch.load(model_filename, map_location="cpu")
                for data_i, (x_c, x_d) in enumerate(test_loader):
                    y_hat = m(x_c, x_d)
                    fold_prediction[data_i * batch_size_test: (data_i + 1) * batch_size_test] += y_hat.detach().cpu().numpy()

            fold_prediction /= N_Folds
            fold_prediction = zero_sum(fold_prediction, test2.loc[:,'bid_size'] + test2.loc[:,'ask_size'])
            clipped_predictions = np.clip(fold_prediction, y_min, y_max)
            
            clipped_predictionss.append(clipped_predictions)
            
        if INFER_USE_ML:
            test = test_.copy()
            
            cache = pd.concat([cache, test], ignore_index=True, axis=0)
            if counter > 0:
                cache = cache.groupby(['stock_id']).tail(21).sort_values(
                    by=['date_id', 'seconds_in_bucket', 'stock_id']).reset_index(drop=True)

            # preprocessing
            data_processor = DataPreprocessor(data=cache, infer=True)
            cache_df = data_processor.transform()

            # feature engineering
            feature_engineer = FeatureEngineer(data=cache_df, infer=True,
                                               feature_versions=['feature_version_time',
                                                                 'feature_version_imbalance_1',
                                                                 'feature_version_imbalance_2_0',
                                                                 'feature_version_imbalance_2_1',
                                                                 'feature_version_imbalance_3',
                                                                 'feature_version_imbalance_6_0',
                                                                 'feature_version_imbalance_6_1',
                                                                 'feature_version_imbalance_7',
                                                                 'feature_version_imbalance_8',
                                                                 'feature_version_imbalance_9',
                                                                 # 'feature_version_imbalance_10',
                                                                 # 'feature_version_imbalance_11',
                                                                 # 'feature_version_imbalance_12',
                                                                 # 'feature_version_custom_weight',
                                                                 # 'feature_version_order_flow',
                                                                 ],
                                               dependencies=dependencies)
            cache_df = feature_engineer.transform()

            feat = cache_df[-len(test):]

            feat = feat.drop(columns=["currently_scored"])

            # feat = generate_all_features(cache)[-len(test):]
            test_predss = np.zeros(feat.shape[0])
            # prediction
            for i in range(config["n_splits"]):
                model_pipeline.predict(idx=i, X_test=feat)
                if config["stacking_mode"] and len(config["model_name"]) > 1:
                    model_pipeline.stacking(idx=i, infer=True)
                test_predss += model_pipeline.inference_prediction  #  * lgb_model_weights[i]
            if config["stacking_mode"] and len(config["model_name"]) == 1:  # single model 에 대한 stacking
                model_pipeline.stacking(idx=-1, infer=True)
                test_predss = model_pipeline.inference_prediction
            # whole data trained model
            model_pipeline.predict(idx=config["n_splits"], X_test=feat)
            test_predss += model_pipeline.inference_prediction
            # test_predss = zero_sum(test_predss, test['bid_size'] + test['ask_size'])
            clipped_predictions = np.clip(test_predss, y_min, y_max)
            clipped_predictionss.append(clipped_predictions)
            
        sample_prediction['target'] = clipped_predictionss[0]*0.1+clipped_predictionss[1]*0.2+clipped_predictions[2]*0.7
        
#         if sample_prediction['target'].isna().sum() != 0:
#             print(counter)
#             print('oh there is a problem here!')

        env.predict(sample_prediction)
        counter += 1
        qps.append(time.time() - now_time)
        if counter % 10 == 0:
            print(counter, 'qps:', np.mean(qps))

    time_cost = 1.146 * np.mean(qps)
    print(f"The code will take approximately {np.round(time_cost, 4)} hours to reason about")

Successfully loaded model (/kaggle/input/model-version-52/0_lgb_b.pkl)
Successfully loaded model (/kaggle/input/model-version-52/1_lgb_b.pkl)
Successfully loaded model (/kaggle/input/model-version-52/2_lgb_b.pkl)
Successfully loaded model (/kaggle/input/model-version-52/3_lgb_b.pkl)
Successfully loaded model (/kaggle/input/model-version-52/4_lgb_b.pkl)
Successfully loaded model (/kaggle/input/model-version-52/5_lgb_b.pkl)

----------------------------------------------------------------------------------------------------
Executed handle_missing_data, Elapsed time: 0.51 seconds, shape((5237892, 17))
----------------------------------------------------------------------------------------------------


----------------------------------------------------------------------------------------------------
Executed transform, Elapsed time: 0.51 seconds, shape((5237892, 17))
----------------------------------------------------------------------------------------------------


-----------------