# 数据理解

- stock_id  股票代码
- date_id 日期
- seconds_in_bucket 收盘集合竞价开始后过了多少秒，从0开始
- imbalance_size 以当前的reference price成交的话，不能成交的规模
- imbalance_buy_sell_flag 不平衡指标
  - buy-side imbalance; 1
  - sell-side imbalance; -1
  - no imbalance; 0
- reference_price “参考价”，在达到该价格时，可配对股票总数被最大化，股票买卖不平衡被最小化。也可以认为是买一和卖一价之间最接近的价格
- matched_size 当前reference price下可成交的数量
- far_price 集合竞价中，能使得成交量最大的意向报价价格
- near_price 集合竞价和连续交易中，成交量最大的价格
- [bid/ask]_price 买一/卖一 价
- [bid/ask]_size 买一/卖一 规模
- wap  
  - weighted average price 加权均价
  - $$\frac{ {BidPrice * AskSize + AskPrice * BidSize}}{BidSize + AskSize}$$
- target 
  - 股票未来60秒的wap走势 - 合成指数未来60秒wap走势
  - 单位为基点（0.01%）BP 
  - $$Target = (\frac{StockWAP_{t+60}}{StockWAP_{t}} - \frac{IndexWAP_{t+60}}{IndexWAP_{t}}) * 10000$$

# 准备工作

In [2]:
import gc
import os
import time
import warnings
from itertools import combinations
from warnings import simplefilter

import joblib
import lightgbm as lgb
import numpy as np
import pandas as pd
from sklearn.metrics import mean_absolute_error
from sklearn.model_selection import KFold, TimeSeriesSplit

warnings.filterwarnings("ignore")
simplefilter(action="ignore", category=pd.errors.PerformanceWarning)

is_offline = False
is_train = True
is_infer = True
max_lookback = np.nan
split_day = 435

## 导入数据

In [3]:
Df = pd.read_csv("D:/kaggle project/optiver competition/data/train.csv")
Df = Df.dropna(subset=["target"]) # 删除空值
Df.reset_index(drop=True, inplace=True)  # 删除空值后重置行index的顺序

df = Df.copy()
cols = [c for c in df.columns if c not in ["row_id", "time_id", "target"]]
df = df[cols]

df.shape

(5237892, 14)

# 内存压缩

In [12]:
def reduce_mem_usage(df, verbose=0):
    """
    Iterate through all numeric columns of a dataframe and modify the data type
    to reduce memory usage.
    """

    start_mem = df.memory_usage().sum() / 1024**2

    for col in df.columns:
        col_type = df[col].dtype

        if col_type != object:
            c_min = df[col].min()
            c_max = df[col].max()
            if str(col_type)[:3] == "int":
                if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max:
                    df[col] = df[col].astype(np.int8)
                elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max:
                    df[col] = df[col].astype(np.int16)
                elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max:
                    df[col] = df[col].astype(np.int32)
                elif c_min > np.iinfo(np.int64).min and c_max < np.iinfo(np.int64).max:
                    df[col] = df[col].astype(np.int64)
            else:
                if c_min > np.finfo(np.float16).min and c_max < np.finfo(np.float16).max:
                    df[col] = df[col].astype(np.float32)
                elif c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max:
                    df[col] = df[col].astype(np.float32)
                else:
                    df[col] = df[col].astype(np.float32)

    if verbose:
        print(f"Memory usage of dataframe is {start_mem:.2f} MB")
        end_mem = df.memory_usage().sum() / 1024**2
        print(f"Memory usage after optimization is: {end_mem:.2f} MB")
        decrease = 100 * (start_mem - end_mem) / start_mem
        print(f"Decreased by {decrease:.2f}%")

    return df


In [None]:
df_train_feats = reduce_mem_usage(df)

# 特征工程

In [4]:
# 创建全局特征

global_stock_id_feats = {
    "median_size": df.groupby("stock_id")["bid_size"].median() + df.groupby("stock_id")["ask_size"].median(),
    "std_size": df.groupby("stock_id")["bid_size"].std() + df.groupby("stock_id")["ask_size"].std(),
    "ptp_size": df.groupby("stock_id")["bid_size"].max() - df.groupby("stock_id")["bid_size"].min(),
    "median_price": df.groupby("stock_id")["bid_price"].median() + df.groupby("stock_id")["ask_price"].median(),
    "std_price": df.groupby("stock_id")["bid_price"].std() + df.groupby("stock_id")["ask_price"].std(),
    "ptp_price": df.groupby("stock_id")["bid_price"].max() - df.groupby("stock_id")["ask_price"].min(),
}

for key, value in global_stock_id_feats.items():
    df[f"global_{key}"] = df["stock_id"].map(value.to_dict())

In [5]:
# 创建双重不平衡特征
prices = ["reference_price", "far_price", "near_price", "ask_price", "bid_price", "wap"]
sizes = ["matched_size", "bid_size", "ask_size", "imbalance_size"]

# V1
df["volume"] = df.eval("ask_size + bid_size")  # 买卖订单总量
df["mid_price"] = df.eval("(ask_price + bid_price) / 2")  # 中间价
df["liquidity_imbalance"] = df.eval("(bid_size-ask_size)/(bid_size+ask_size)")  # 流动性不平衡 [-1,1]
df["matched_imbalance"] = df.eval("(imbalance_size-matched_size)/(matched_size+imbalance_size)")  # 成交量不平衡[-1,1]
df["size_imbalance"] = df.eval("bid_size / ask_size")  # 买压

# 价格两两组合，计算价格间不平衡指标
for c in combinations(prices, 2):
    df[f"{c[0]}_{c[1]}_imb"] = df.eval(f"({c[0]} - {c[1]})/({c[0]} + {c[1]})")

In [7]:
# 创建三重不平衡特征
from numba import njit, prange
@njit(parallel=True)
def compute_triplet_imbalance(df_values, comb_indices):
    num_rows = df_values.shape[0]
    num_combinations = len(comb_indices)
    imbalance_features = np.empty((num_rows, num_combinations))

    for i in prange(num_combinations):
        a, b, c = comb_indices[i]
        for j in range(num_rows):
            max_val = max(df_values[j, a], df_values[j, b], df_values[j, c])  # 最大
            min_val = min(df_values[j, a], df_values[j, b], df_values[j, c])  # 最小
            mid_val = df_values[j, a] + df_values[j, b] + df_values[j, c] - min_val - max_val  # 中数
            if mid_val == min_val:  # Prevent division by zero
                imbalance_features[j, i] = np.nan
            else:
                imbalance_features[j, i] = (max_val - mid_val) / (mid_val - min_val)

    return imbalance_features


def calculate_triplet_imbalance_numba(price, df):
    # Convert DataFrame to numpy array for Numba compatibility
    df_values = df[price].values
    comb_indices = [(price.index(a), price.index(b), price.index(c)) for a, b, c in combinations(price, 3)]

    # Calculate the triplet imbalance
    features_array = compute_triplet_imbalance(df_values, comb_indices)

    # Create a DataFrame from the results
    columns = [f"{a}_{b}_{c}_imb2" for a, b, c in combinations(price, 3)]
    features = pd.DataFrame(features_array, columns=columns)

    return features

for c in [['ask_price', 'bid_price', 'wap', 'reference_price'], sizes]:
    triplet_feature = calculate_triplet_imbalance_numba(c, df)
    df[triplet_feature.columns] = triplet_feature.values

In [8]:
# 创建时序相关不平衡特征
df["imbalance_momentum"] = df.groupby(['stock_id'])['imbalance_size'].diff(periods=1) / df['matched_size']  # 不平衡量趋势
df['mid_price_movement'] = df['mid_price'].diff(periods=5).apply(lambda x: 1 if x > 0 else (-1 if x < 0 else 0))  # 中间价移动
df["price_spread"] = df["ask_price"] - df["bid_price"]
df["spread_intensity"] = df.groupby(['stock_id'])['price_spread'].diff()  # spread 趋势 趋势变大，说明波动变大

In [10]:
# 创建价量组合特征
df['price_pressure'] = df['imbalance_size'] * (df['ask_price'] - df['bid_price'])  # 价格压力，不平衡量大，且价差大的时候，不平衡规模变大
df['market_urgency'] = df['price_spread'] * df['liquidity_imbalance']  # 单边预警  价差 * (bid_size-ask_size)/(bid_size+ask_size)
df['depth_pressure'] = (df['ask_size'] - df['bid_size']) * (df['far_price'] - df['near_price'])
df['spread_depth_ratio'] = (df['ask_price'] - df['bid_price']) / (df['bid_size'] + df['ask_size'])  # spread/总量 市场越活跃，量大，价差小，该值越小
df['relative_spread'] = (df['ask_price'] - df['bid_price']) / df['wap']  # 相对价差=spread/wap

for func in ["mean", "std", "skew", "kurt"]:
    df[f"all_prices_{func}"] = df[prices].agg(func, axis=1)
    df[f"all_sizes_{func}"] = df[sizes].agg(func, axis=1)

for col in ['matched_size', 'imbalance_size', 'reference_price', 'imbalance_buy_sell_flag']:
    for window in [1, 2, 3, 5, 10]:
        df[f"{col}_shift_{window}"] = df.groupby('stock_id')[col].shift(window)  # 往后移动windos个时间窗口
        df[f"{col}_ret_{window}"] = df.groupby('stock_id')[col].pct_change(window)  # 相较于前面window个窗口百分比变动

for col in ['ask_price', 'bid_price', 'ask_size', 'bid_size','wap', 'near_price', 'far_price']:
    for window in [1, 2, 3, 5, 10]:
        df[f"{col}_diff_{window}"] = df.groupby("stock_id")[col].diff(window) # 价量差分
        
df = df.replace([np.inf, -np.inf], 0)

In [11]:
df["dow"] = df["date_id"] % 5
df["dom"] = df["date_id"] % 20
df["seconds"] = df["seconds_in_bucket"] % 60
df["minute"] = df["seconds_in_bucket"] // 60

# 建模

In [13]:
split_day = 435
offline_split = df['date_id'] > (split_day - 45)
df_offline_train = df[~offline_split]
df_offline_valid = df[offline_split]
df_offline_train_target = Df['target'][~offline_split]
df_offline_valid_target = Df['target'][offline_split]

In [14]:
lgb_params = {
    "objective": "mae",
    "n_estimators": 3000,
    "num_leaves": 128,
    "subsample": 0.6,
    "colsample_bytree": 0.6,
    "learning_rate": 0.05,
    "n_jobs": 4,
    "device": "gpu",
    "verbosity": -1,
    "importance_type": "gain",
}

lgb_model = lgb.LGBMRegressor(**lgb_params)

In [15]:
feature_name = [i for i in df.columns if i not in ["row_id", "target", "time_id", "date_id"]]
lgb_model.fit(
    df_offline_train[feature_name],
    df_offline_train_target,
    eval_set=[(df_offline_valid[feature_name], df_offline_valid_target)],
    callbacks=[
        lgb.callback.early_stopping(stopping_rounds=100),
        lgb.callback.log_evaluation(period=100),
    ],
)

del df_offline_train, df_offline_valid, df_offline_train_target, df_offline_valid_target
gc.collect()

LightGBMError: GPU Tree Learner was not enabled in this build.
Please recompile with CMake option -DUSE_GPU=1

In [None]:
# infer
df_train_target = Df["target"]
infer_params = lgb_params.copy()
infer_params["n_estimators"] = int(1.2 * lgb_model.best_iteration_)
infer_lgb_model = lgb.LGBMRegressor(**infer_params)
infer_lgb_model.fit(df[feature_name], df_train_target)