In [1]:
import os
import gc
import time
import warnings
import collections
from six import with_metaclass
from abc import ABCMeta, abstractmethod
from datetime import datetime

from numba import njit, jit
from tqdm import tqdm
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from numpy import iinfo, finfo, int8, int16, int32, int64, float32, float64

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras import losses
from tensorflow.keras import optimizers
from tensorflow.keras import callbacks
from tensorflow.keras import models
from tensorflow.keras import activations

from sklearn.model_selection import KFold
from sklearn.metrics import roc_auc_score, accuracy_score
from sklearn.model_selection._split import _BaseKFold, indexable, _num_samples
from sklearn.utils.validation import _deprecate_positional_args
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import permutation_test_score

from plotly.subplots import make_subplots
import plotly.graph_objects as go
import plotly.io as pio
from IPython.display import HTML

# 设置plotly为暗黑模式
pio.templates.default = "plotly_dark"
plot_config = dict({'scrollZoom': True, 'displayModeBar': True, 'displaylogo': False})
sns.set(style="ticks", font_scale=1.2, palette='deep', color_codes=True)
colors = ["C" + str(i) for i in range(0, 9+1)]

# 默认plotly色号
default_color_list = [
    '#1f77b4',  # muted blue
    '#ff7f0e',  # safety orange
    '#2ca02c',  # cooked asparagus green
    '#d62728',  # brick red
    '#9467bd',  # muted purple
    '#8c564b',  # chestnut brown
    '#e377c2',  # raspberry yogurt pink
    '#7f7f7f',  # middle gray
    '#bcbd22',  # curry yellow-green
    '#17becf'   # blue-teal
    ]

# 设定全局随机种子，并且屏蔽warnings
GLOBAL_RANDOM_SEED = 2022
np.random.seed(GLOBAL_RANDOM_SEED)
tf.random.set_seed(GLOBAL_RANDOM_SEED)

warnings.filterwarnings('ignore')
pd.set_option('display.max_columns', 50)

# 检查GPU设备情况
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
    try:
        # 限制只能使用第一块GPU（通过GPU的List的id指定）
        tf.config.experimental.set_visible_devices(gpus[0], 'GPU')

        # Currently, memory growth needs to be the same across GPUs
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
        logical_gpus = tf.config.experimental.list_logical_devices('GPU')
        print(len(gpus), "Physical GPUs,", len(logical_gpus), "Logical GPUs")
    except RuntimeError as e:
        # Memory growth must be set before GPUs have been initialized
        print(e)

1 Physical GPUs, 1 Logical GPUs


In [2]:
# 导入数据
load_data_start_time = time.time()
train_df  = pd.read_csv(
    './data/jane-street-market-prediction/train.csv', nrows=None)
feat_df = pd.read_csv(
    './data/jane-street-market-prediction/features.csv')
example_test_df = pd.read_csv(
    './data/jane-street-market-prediction/example_test.csv')
example_prediction_df = pd.read_csv(
    './data/jane-street-market-prediction/example_sample_submission.csv')
load_data_end_time = time.time()

# 打印数据基本情况
print("[INFO] {} End Reading ! It took {:.2f} seconds !".format(
    str(datetime.now())[:-4], load_data_end_time-load_data_start_time))
print("[INFO] {} Basic data description: ".format(str(datetime.now())[:-4]))
print("    -- train_df shape: {}".format(
    train_df.shape))
print("    -- example_test_df shape: {}".format(
    example_test_df.shape))
print("    -- feat_df shape: {}".format(
    feat_df.shape))
print("    -- example_prediction_df shape: {}".format(
    example_prediction_df.shape))

[INFO] 2021-01-14 11:44:44.32 End Reading ! It took 71.04 seconds !
[INFO] 2021-01-14 11:44:44.32 Basic data description: 
    -- train_df shape: (2390491, 138)
    -- example_test_df shape: (15219, 133)
    -- feat_df shape: (130, 30)
    -- example_prediction_df shape: (15219, 2)


In [3]:
def gen_test_data(test_df=None, pred_df=None):
    """测试数据生成器。用于模拟测试数据生成过程，测试模型提交正确性与效率。"""
    n_test = len(test_df)

    for i in range(n_test):
        yield test_df.iloc[i], pred_df.iloc[i]


@jit
def njit_fillna(array, values):
    """利用即时编译（jit）对array数组的NaN值借助values进行填充。

    @References:
    ----------
    [1] https://www.kaggle.com/gogo827jz/optimise-speed-of-filling-nan-function
    """
    if np.isnan(array.sum()):
        array = np.where(np.isnan(array), values, array)
    return array


def custom_metric(dates_array=None,
                  weights_array=None,
                  resp_array=None,
                  action_label_array=None):
    """依据官方要求的Metric，计算分数。

    @References:
    ----------
    [1] https://www.kaggle.com/c/jane-street-market-prediction/discussion/199107
    [2] https://www.kaggle.com/c/jane-street-market-prediction/overview/evaluation
    [3] 
    """
    tmp_df = pd.DataFrame({"date": dates_array,
                           "weight": weights_array,
                           "resp": resp_array,
                           "action": action_label_array})
    tmp_df["p"] = tmp_df["weight"]  * tmp_df["resp"] * tmp_df["action"]
    # tmp_df = tmp_df.query("weight != 0").reset_index(drop=True)
    p_i_val = tmp_df.groupby(["date"])["p"].sum().values

    n_dates = len(p_i_val)
    t = np.sum(p_i_val) / np.sqrt(np.sum(p_i_val ** 2)) * (np.sqrt(250 / n_dates))
    return min(max(t, 0), 6) * np.sum(p_i_val)


In [4]:
class PurgedGroupTimeSeriesSplit(_BaseKFold):
    """针对带有Group id（组id）数据的时间序列交叉验证集合生成类。

    生成针对带有Group id的数据的时序交叉验证集。其中训练与验证的
    Group之间可以指定group_gap，用来隔离时间上的关系。这种情况下
    group_id通常是时间id，例如天或者小时。

    @Parameters:
    ----------
        n_splits: {int-like}, default=5
            切分的集合数目。
        max_train_group_size: {int-like}, default=+inf
            训练集单个组的最大样本数据限制。
        group_gap: {int-like}, default=None
            依据group_id切分组时，训练组与测试组的id的gap数目。
        max_test_group_size: {int-like}, default=+inf
            测试集单个组的最大样本数据限制。

    @References:
    ----------
    [1] https://www.kaggle.com/gogo827jz/jane-street-ffill-xgboost-purgedtimeseriescv
    """

    @_deprecate_positional_args
    def __init__(self, n_splits=5,
                 *,
                 max_train_group_size=np.inf,
                 max_test_group_size=np.inf,
                 group_gap=None,
                 verbose=False
                 ):
        super().__init__(n_splits, shuffle=False, random_state=None)
        self.max_train_group_size = max_train_group_size
        self.group_gap = group_gap
        self.max_test_group_size = max_test_group_size
        self.verbose = verbose

    def split(self, X, y=None, groups=None):
        """生成训练组与测试组的id索引，返回组索引的生成器。

        @Parameters:
        ----------
            X: {array-like} {n_samples, n_features}
                训练数据，输入形状为{n_samples, n_features}。
            y: {array-like} {n_samples, }
                标签数据，形状为{n_samples, }。
            groups: {array-like} {n_samples, }
                用来依据组来划分训练集与测试集的组id，必须为连续的组id。

        @Yields:
        ----------
            train: ndarray
                依据group_id切分的训练组id。
            test: ndarray
                依据group_id切分的测试组id。
        """
        if groups is None:
            raise ValueError(
                "The 'groups' parameter should not be None ！")
        for i in range(1, len(groups)):
            if groups[i] < groups[i-1]:
                raise ValueError("groups must be a monotone increasing sequence !")

        # 初始化基本参数信息
        X, y, groups = indexable(X, y, groups)
        n_samples, n_splits, group_gap = X.shape[0], self.n_splits, self.group_gap
        n_folds = n_splits + 1

        max_test_group_size = self.max_test_group_size
        max_train_group_size = self.max_train_group_size

        # 使得groups的id取值从0顺序开始（假定groups是递增的）
        groups_reid, groupid2reid, index_tmp = [], {}, -1
        for _, item in enumerate(groups):
            if item not in groupid2reid:
                index_tmp += 1
                groupid2reid[item] = index_tmp
            groups_reid.append(index_tmp)

        group_dict = {}
        u, ind = np.unique(groups_reid, return_index=True)
        unique_groups = np.argsort(ind)
        n_groups = _num_samples(unique_groups)

        # 扫描整个数据id list，构建group_dcit，{group_id: 属于该group的样本的idx}
        for idx in np.arange(n_samples):
            if groups_reid[idx] in group_dict:
                group_dict[groups_reid[idx]].append(idx)
            else:
                group_dict[groups_reid[idx]] = [idx]

        if n_folds > n_groups:
            raise ValueError(
                ("Cannot have number of folds={0} greater than"
                 " the number of groups={1}").format(n_folds, n_groups))

        # group_test_size: 每个fold预留的test group的大小
        group_test_size = min(n_groups // n_folds, max_test_group_size)
        group_test_starts = range(n_groups - n_splits * group_test_size,
                                  n_groups, group_test_size)
        for group_test_start in group_test_starts:
            train_array, test_array = [], []

            group_st = max(0, group_test_start - group_gap - max_train_group_size)
            for train_group_idx in unique_groups[group_st:(group_test_start - group_gap)]:
                train_array_tmp = group_dict[train_group_idx]
                train_array = np.sort(np.unique(
                    np.concatenate((train_array, train_array_tmp)),
                    axis=None), axis=None)
 
            for test_group_idx in unique_groups[group_test_start:
                                                group_test_start +
                                                group_test_size]:
                test_array_tmp = group_dict[test_group_idx]
                test_array = np.sort(np.unique(
                    np.concatenate((test_array, test_array_tmp)),
                    axis=None), axis=None)
            test_array  = test_array[group_gap:]

            if self.verbose > 0:
                    pass
            yield [int(i) for i in train_array], [int(i) for i in test_array]


def test_purged_group_time_series_split():
    X = train_df.query('date > 85').reset_index(drop=True)[["ts_id", "feature_0"]].values
    y = train_df.query('date > 85').reset_index(drop=True)["resp"].values
    groups = train_df.query('date > 85').reset_index(drop=True)["date"].values

    group_ts_kfolds = PurgedGroupTimeSeriesSplit(
        n_splits=7, group_gap=20, max_train_group_size=80, max_test_group_size=60)
    train_idx, valid_idx = [], []
    for train_idx_tmp, valid_idx_tmp in group_ts_kfolds.split(X=X, y=y, groups=groups):
        train_idx.append(train_idx_tmp)
        valid_idx.append(valid_idx_tmp)

        print("train range: {}, valid range: {}".format(
            [min(groups[train_idx_tmp]), max(groups[train_idx_tmp])],
            [min(groups[valid_idx_tmp]), max(groups[valid_idx_tmp])]
        ))

# test_purged_group_time_series_split()

In [5]:
class LiteModel:
    """将模型转换为Tensorflow Lite模型，提升推理速度。目前仅支持Keras模型转换。

    @Attributes:
    ----------
    interpreter: {Tensorflow lite transformed object}
        利用tf.lite.interpreter转换后的Keras模型。

    @References:
    ----------
    [1] https://medium.com/@micwurm/using-tensorflow-lite-to-speed-up-predictions-a3954886eb98
    """

    @classmethod
    def from_file(cls, model_path):
        """类方法。用于model_path下的模型，一般为*.h5模型。"""
        return LiteModel(tf.lite.Interpreter(model_path=model_path))

    @classmethod
    def from_keras_model(cls, kmodel):
        """类方法。用于直接转换keras模型。不用实例化类可直接调用该方法，返回
        被转换为tf.lite形式的Keras模型。

        @Attributes:
        ----------
        kmodel: {tf.keras model}
            待转换的Keras模型。

        @Returens:
        ----------
        经过转换的Keras模型。
        """
        converter = tf.lite.TFLiteConverter.from_keras_model(kmodel)
        tflite_model = converter.convert()
        return LiteModel(tf.lite.Interpreter(model_content=tflite_model))

    def __init__(self, interpreter):
        """为经过tf.lite.interpreter转换的模型构建构造输入输出的关键参数。

        @TODO(zhuoyin94@163.com):
        ----------
        [1] 可添加关键字，指定converter选择采用INT8量化还是混合精度量化。
        [2] 可添加关键字，指定converter选择量化的方式：低延迟还是高推理速度？
        """
        self.interpreter = interpreter
        self.interpreter.allocate_tensors()

        input_det = self.interpreter.get_input_details()[0]
        output_det = self.interpreter.get_output_details()[0]
        self.input_index = input_det["index"]
        self.output_index = output_det["index"]
        self.input_shape = input_det["shape"]
        self.output_shape = output_det["shape"]
        self.input_dtype = input_det["dtype"]
        self.output_dtype = output_det["dtype"]

    def predict(self, inp):
        inp = inp.astype(self.input_dtype)
        count = inp.shape[0]
        out = np.zeros((count, self.output_shape[1]), dtype=self.output_dtype)
        for i in range(count):
            self.interpreter.set_tensor(self.input_index, inp[i:i+1])
            self.interpreter.invoke()
            out[i] = self.interpreter.get_tensor(self.output_index)[0]
        return out

    def predict_single(self, inp):
        """ Like predict(), but only for a single record. The input data can be a Python list. """
        inp = np.array([inp], dtype=self.input_dtype)
        self.interpreter.set_tensor(self.input_index, inp)
        self.interpreter.invoke()
        out = self.interpreter.get_tensor(self.output_index)
        return out[0]

In [6]:
"""
数据预处理部分。包括标签生成、数据统计值获取。
"""
# 挑选策略变化之后的数据
train = train_df.query('date > 85').reset_index(drop=True)
train = train.query('weight > 0').reset_index(drop = True)

target_threshold = 0.00001

# 构造标签
train['action'] =  ((train['resp_1'] > target_threshold) & \
                    (train['resp_2'] > target_threshold) & \
                    (train['resp_3'] > target_threshold) & \
                    (train['resp_4'] > target_threshold) &  \
                    (train['resp'] > target_threshold)).astype('int')
feature_name_list = [c for c in train.columns if 'feature' in c]
resp_name_list = ["resp", "resp_1", "resp_2", "resp_3", "resp_4"]

# 使用均值填充缺失值
mean_val_list = []
for name in feature_name_list:
    mean_val = train[name].mean()
    train[name].fillna(mean_val, inplace=True)
    mean_val_list.append(mean_val)
mean_val_array = np.array(mean_val_list)

# 构造后续神经网络模型的输入输出
X = train[feature_name_list].values.astype("float32")
y = np.hstack([(train[c] > target_threshold).astype(
    'int').values.reshape(-1, 1) for c in resp_name_list])

train_dates = train["date"].values
train_weights = train["weight"].values
train_resp = train["resp"].values

print("[INFO] {} Data prepared !".format(
    str(datetime.now())[:-4]))

[INFO] 2021-01-14 11:44:47.86 Data prepared !


In [7]:
"""
特征工程辅助工具。
"""
def build_tabular_autoencoder(verbose=False, is_compile=True,
                              stddev=0.05, **kwargs):
    """降噪自编码器实现。针对表格形式数据的降噪自编码器，噪声等级由高斯噪声的stddev参数指定"""
    input_dim = kwargs.pop("input_dim", None)
    n_labels = kwargs.pop("n_labels", None)

    # 构建降噪自编码器
    layer_input = layers.Input(input_dim, dtype='float32')

    layer_encoded = layers.BatchNormalization()(layer_input)
    layer_encoded = layers.GaussianNoise(stddev=stddev)(layer_encoded)
    layer_encoded = layers.Dense(256, activation='relu')(layer_encoded)

    # 解码层1：针对输入的重构
    layer_decoded = layers.Dropout(0.3)(layer_encoded)
    layer_decoded = layers.Dense(input_dim, name='reconstruct_output')(layer_decoded)

    # 解码层2：针对resp的重构
    layer_output = layers.Dense(128, activation='relu')(layer_decoded)
    layer_output = layers.BatchNormalization()(layer_output)
    layer_output = layers.Dropout(0.3)(layer_output)

    layer_output_labels = layers.Dense(n_labels, activation='sigmoid', name='layer_output_labels')(layer_output)

    # 输出层
    encoder_model = models.Model(inputs=layer_input, outputs=layer_decoded)
    autoencoder_model = models.Model(inputs=layer_input, outputs=[layer_decoded, layer_output_labels])

    if verbose:
        autoencoder_model.summary()
    if is_compile:
        autoencoder_model.compile(loss={'reconstruct_output': 'mse', 'layer_output_labels':'binary_crossentropy'},
                                  metrics={'layer_output_labels':'acc'}, 
                                  optimizer=optimizers.Adam(0.001))
    return encoder_model, autoencoder_model


def feat_pca(feat_array=None, n_dims=30):
    """利用PCA，将feat_array降维至n_dims维度。"""
    if feat_array.shape[1] <= n_dims:
        raise ValueError("n_dims must smaller than the dim of feat_array !")

    # 归一化feat_array
    X_sc = StandardScaler()
    X_sc.fit(feat_array)
    feat_array = X_sc.transform(feat_array)

    # 降维
    pca = PCA(n_components=n_dims)
    pca.fit(feat_array)
    feat_array_pca = pca.transform(feat_array)

    return X_sc, pca, feat_array_pca


@njit
def njit_search_best_thresold_acc(y_pred_proba, y_true):
    """通过阈值搜索最优的准确率切分阈值."""
    best_acc, best_threshold = 0, 0
    for threshold in range(4500, 5800):
        thresold_tmp = threshold / 10000
        y_pred_label = np.where(y_pred_proba > thresold_tmp, 1, 0)
        score_tmp = np.sum(np.where(y_true == y_pred_label, 1, 0)) / len(y_true)

        if score_tmp > best_acc:
            best_acc = score_tmp
            best_threshold = thresold_tmp
    return best_acc, best_threshold


@njit
def njit_custom_metric(dates_array=None,
                       weights_array=None,
                       resp_array=None,
                       action_label_array=None):
    """利用njit装饰器与numpy来计算Kaggle官方要求的Metric。

    @References:
    ----------
    [1] https://www.kaggle.com/c/jane-street-market-prediction/discussion/199107
    [2] https://www.kaggle.com/c/jane-street-market-prediction/overview/evaluation
    """
    p_array = weights_array * resp_array * action_label_array

    n_unique_dates = np.max(dates_array) - np.min(dates_array) + 1
    dates_array = dates_array - np.min(dates_array)

    p_i_val = np.zeros((n_unique_dates, ))
    for ind, item in enumerate(dates_array):
        p_i_val[item] += p_array[ind]

    t = np.sum(p_i_val) / np.sqrt(np.sum(p_i_val ** 2)) * (np.sqrt(250 / n_unique_dates))
    return min(max(t, 0), 6) * np.sum(p_i_val)


@njit
def njit_search_best_thresold_custom(y_pred_proba=None,
                                     dates_array=None,
                                     weights_array=None,
                                     resp_array=None):
    """通过阈值搜索最优的kaggle官方评分的切分阈值."""
    best_acc, best_threshold = 0, 0
    for threshold in range(4500, 5800):
        thresold_tmp = threshold / 10000
        y_pred_label = np.where(y_pred_proba > thresold_tmp, 1, 0)
        score_tmp = njit_custom_metric(dates_array=dates_array,
                                       weights_array=weights_array,
                                       resp_array=resp_array,
                                       action_label_array=y_pred_label)

        if score_tmp > best_acc:
            best_acc = score_tmp
            best_threshold = thresold_tmp
    return best_acc, best_threshold


class RollingBase(with_metaclass(ABCMeta, object)):
    """对于流数据（stream data）的统计量高效抽取的基类。

    通过双端队列存储第i时刻前k个时刻的数组值，并以O(1)时间复杂度计算队内的需求指标。
    每当队满的时候，队尾元素出队，并同时修改累加统计值。队内指标的计算通过push方法来实时更新。

    @Attributes:
    ----------
    n_elements_in_deque: {int-like}
        当前队内元素的个数。
    n_feats: {int-like}
        队内元素的特征维度。
    max_deque_size: {int-like}
        双端队列的大小。
    deque: {collections deque object}
        collections模块的双端队列实现。

    @References:
    ----------
    [1] https://www.kaggle.com/lucasmorin/running-algos-fe-for-fast-inference
    """
    def __init__(self, max_deque_size=10, n_feats=1, **kwargs):
        if n_feats is None:
            self.n_feats = 1
        else:
            self.n_feats = n_feats
        self.n_elements_in_deque = 0
        self.max_deque_size = max_deque_size
        self.window_cum_sum = np.zeros(self.n_feats)
        self.deque = collections.deque(maxlen=max_deque_size + 1)

    @abstractmethod
    def push(self, x):
        """将输入元素x入队，同时根据是否队满执行出队操作，并更新统计指标。"""
        pass

    def clear(self):
        self.n_elements_in_deque = 0
        self.deque.clear()  # 清空队内元素

    def get_n_elements_in_deque(self):
        return self.n_elements_in_deque


class RollingStats(RollingBase):
    """对steam类型的数据执行滑动窗口平均。

    通过双端队列存储第i时刻前k个时刻的数组值，并以O(1)时间复杂度计算队内的平均值。
    每当队满的时候，队尾元素出队，并同时修改累加统计值。

    @Attributes:
    ----------
    window_mean: {array-like}
        队列内元素的平均值。
    window_std: {array-like}
        队列内元素的方差。
    window_skew: {array-like}
        队内元素的偏度。
    window_kurtosis: {array-like}
        队内元素的峰度。
    **kwargs:
        见基类说明。

    @References:
    ----------
    [1] https://www.kaggle.com/lucasmorin/running-algos-fe-for-fast-inference
    [2] https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance
    [3] Rakthanmanon, Thanawin, et al. "Searching and mining trillions of time series subsequences under dynamic time warping." Proceedings of the 18th ACM SIGKDD international conference on Knowledge discovery and data mining. 2012.
    """
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.window_mean = np.zeros(self.n_feats)
        self.window_std = np.zeros(self.n_feats)
        self.window_skew = np.zeros(self.n_feats)
        self.window_kurtosis = np.zeros(self.n_feats)
        self.window_cum_sum_square = np.zeros(self.n_feats)

    def push(self, x):
        """入队一个元素x，并提取统计指标。"""
        self.deque.append(x)
        self.window_cum_sum += x
        self.window_cum_sum_square += np.square(x)

        # 指标抽取：均值，方差，峰度，偏度
        # TODO(zhuoyin94@163.com)：使用One-pass算法计算峰度与偏度
        if self.n_elements_in_deque < self.max_deque_size:
            self.n_elements_in_deque += 1

            self.window_mean = self.window_cum_sum / self.n_elements_in_deque
            self.window_std = self.window_cum_sum_square / self.n_elements_in_deque - np.square(self.window_mean)
        else:
            poped_element = self.deque.popleft()
            self.window_cum_sum -= poped_element
            self.window_cum_sum_square -= np.square(poped_element)

            self.window_mean = self.window_cum_sum / self.n_elements_in_deque
            self.window_std = self.window_cum_sum_square / self.n_elements_in_deque - np.square(self.window_mean)

    def get_mean(self):
        """获取队内元素的平均值。"""
        if self.n_elements_in_deque:
            return self.window_mean
        else:
            return np.zeros(self.n_feats)

    def get_std(self):
        """获取队内元素的方差。"""
        if self.n_elements_in_deque:
            return self.window_std
        else:
            return np.zeros(self.n_feats)


print("[INFO] {} Tools prepared !".format(
    str(datetime.now())[:-4]))

[INFO] 2021-01-14 11:44:47.96 Tools prepared !


In [8]:
"""
特征抽取工具的集中单元测试。
"""
def test_rolling_correctness():
    tmp_data = np.array([[-1, -1],
                         [1, 1],
                         [2, 2],
                         [3, 3],
                         [4, 4]])

    rolling_average_2 = RollingStats(max_deque_size=2, n_feats=tmp_data.shape[1])
    for i in range(tmp_data.shape[0]):
        rolling_average_2.push(tmp_data[i])
        print(rolling_average_2.get_mean())
        print(rolling_average_2.get_std())
        print("---")

    print("\n")
    rolling_average_3 = RollingStats(max_deque_size=4, n_feats=tmp_data.shape[1])
    for i in range(tmp_data.shape[0]):
        rolling_average_3.push(tmp_data[i])
        print(rolling_average_3.get_mean())
        print(rolling_average_3.get_std())
        print("---")

    tmp_data = np.array([-1, 1, 2, 3, 4, 5])
    print("\n")
    rolling_average_3 = RollingStats(max_deque_size=3, n_feats=None)
    for i in range(tmp_data.shape[0]):
        rolling_average_3.push(tmp_data[i])
        print(rolling_average_3.get_mean())
        print(rolling_average_3.get_std())
        print("---")


def test_rolling_performance():
    tmp_data = X.copy()

    rolling_average_4 = RollingStats(max_deque_size=4, n_feats=tmp_data.shape[1])
    print("\nRolling window size: 4")
    for i in tqdm(range(len(X))):
        rolling_average_4.push(tmp_data[i])   

    rolling_average_10000 = RollingStats(max_deque_size=10000, n_feats=tmp_data.shape[1])
    print("\nRolling window size: 10000")
    for i in tqdm(range(len(X))):
        rolling_average_10000.push(tmp_data[i])


In [9]:
"""
模型构建与模型训练部分。
"""
def build_model(verbose=False, is_compile=True, encoder_model=None, **kwargs):
    """针对二分类任务的MLP模型，使用自编码器的编码层作为预训练层。"""
    input_dim = kwargs.pop("input_dim", None)
    output_dim = kwargs.pop("output_dim", None)
    n_labels = kwargs.pop("n_labels", None)

    # 构造网络结构
    layer_input = layers.Input(input_dim, dtype='float32')
    layer_encoded = encoder_model(layer_input)

    layer_feats = layers.concatenate([layer_input, layer_encoded])
    layer_feats = layers.BatchNormalization()(layer_feats)

    # 特征抽取
    layer_dense = layers.Dense(64, activation="relu")(layer_feats)
    layer_dense = layers.BatchNormalization()(layer_dense)
    layer_dense = layers.Dropout(0.3)(layer_dense)

    layer_dense = layers.Dense(32, activation="relu")(layer_dense)
    layer_dense = layers.BatchNormalization()(layer_dense)
    layer_dense = layers.Dropout(0.3)(layer_dense)

    # 输出层构造与模型构造
    layer_output = layers.Dense(output_dim, activation='sigmoid', name="label_output")(layer_dense)
    model = models.Model(layer_input, layer_output)

    if verbose:
        model.summary()
    if is_compile:
        model.compile(loss={'label_output':'binary_crossentropy'},
                      metrics=[tf.keras.metrics.AUC(name='auc')],
                      optimizer=optimizers.Adam(0.003))
    return model


def build_model_1d_cnn(verbose=False, is_compile=True, encoder_model=None, **kwargs):
    pass
    x = tf.keras.layers.Dense(4096, activation='relu')(x)
    x = tf.keras.layers.Reshape((256, 16))(x)
    x = tf.keras.layers.Conv1D(filters=16,
                      kernel_size=5,
                      strides=1,
                      activation='relu')(x)
    x = tf.keras.layers.MaxPooling1D(pool_size=2)(x)
    x = tf.keras.layers.Flatten()(x)


def test_build_model():
    # 构造mlp模型
    mlp_model = build_model(verbose=False, encoder_model=encoder_model,
                            input_dim=X.shape[1], output_dim=y.shape[1])

print("[INFO] {} MLP Model prepared !".format(
    str(datetime.now())[:-4]))

[INFO] 2021-01-14 11:44:48.08 MLP Model prepared !


In [10]:
"""
特征工程的pipline。
"""

X_feats = []

# STEP 1: 时间序列特征
# -----------------------
ts_feats_extractor = [RollingStats(max_deque_size=300, n_feats=len(feature_name_list)),
                      RollingStats(max_deque_size=1000, n_feats=len(feature_name_list)),
                      RollingStats(max_deque_size=5000, n_feats=len(feature_name_list))]

for i in tqdm(range(X.shape[0])):
    x_tmp = X[i]

    x_feats_tmp = []
    for _, extractor in enumerate(ts_feats_extractor):
        extractor.push(x_tmp)
        x_feats_tmp.append(extractor.get_mean())
        x_feats_tmp.append(extractor.get_std())
    X_feats.append(np.concatenate(x_feats_tmp).astype("float16"))

del ts_feats_extractor
gc.collect()

# 选择相关性最高的特征，并将时序特征与实际特征进行组合
def yield_batch_idx(total_size=1023, batsch_size=20):
    for i in range(0, total_size, batch_size):
        yield [i, min(total_size-1, i+batch_size)]

batch_size = 20
keep_top_k = 15

corr_scores = []
for start, end in yield_batch_idx(len(X_feats[0]), 20):
    curr_feats = np.vstack([item[start:end] for item in X_feats]).astype("float16")

    for i in range(curr_feats.shape[1]):
        corr_scores.append([np.corrcoef(curr_feats[:, i], y[:, 0]).ravel()[1], i])

selected_feat_ids = [item[1] for item in sorted(corr_scores, key=lambda x: abs(x[0]))][-keep_top_k:]
X_feats = [item[selected_feat_ids] for item in X_feats]
X_feats = np.hstack([X_feats, X]).astype("float32")

print("[INFO] {} Feature extraction completed !".format(
    str(datetime.now())[:-4]))

100%|██████████| 1571415/1571415 [01:59<00:00, 13196.98it/s]
[INFO] 2021-01-14 11:49:27.20 Feature extraction completed !


In [11]:
# 训练前全局参数准备
N_SPLITS = 7
MODELS = []
early_stop = callbacks.EarlyStopping(monitor="val_auc", mode="max",
                                     verbose=0, patience=15,
                                     restore_best_weights=True)
group_ts_kfolds = PurgedGroupTimeSeriesSplit(
        n_splits=N_SPLITS, group_gap=5, max_test_group_size=60)

# 开始训练模型
valid_acc_total, valid_roc_auc_total, valid_custom_total = [], [], []

print("[INFO] {} Model training start:".format(str(datetime.now())[:-4]))
print("=========================================")
tf.keras.backend.clear_session()
gc.collect()
for fold, (train_idx, valid_idx) in enumerate(group_ts_kfolds.split(X=X_feats, y=y, groups=train_dates)):
    #####################################################
    # Cross validation的数据准备
    X_train, X_val = X_feats[train_idx], X_feats[valid_idx]
    y_train, y_val = y[train_idx], y[valid_idx]

    X_train_weight, X_val_weight = train_weights[train_idx], train_weights[valid_idx]
    X_train_resp, X_val_resp = train_resp[train_idx], train_resp[valid_idx]
    X_train_dates, X_val_dates = train_dates[train_idx], train_dates[valid_idx]

    #####################################################
    # STEP 1: 进行特征工程
    std_scaler, pca_transformer, X_train_pca = feat_pca(feat_array=X_train, n_dims=30)

    X_val_pca = std_scaler.transform(X_val)
    X_val_pca = pca_transformer.transform(X_val)

    X_train = np.hstack([X_train, X_train_pca])
    X_val = np.hstack([X_val, X_val_pca])

    #####################################################
    # STEP 2: 在训练数据上进行预训练
    encoder_model, pretrain_model = build_tabular_autoencoder(
            input_dim=X_train.shape[1], n_labels=y_train.shape[1],
            stddev=0.03, verbose=False)
    pretrain_model.fit(x=X_train, y=[X_train, y_train],
                       batch_size=32758,
                       epochs=20,
                       verbose=0)
    encoder_model.trainable = True

    #####################################################
    # STEP 3: 在训练数据上进行训练
    mlp_model = build_model(verbose=False, encoder_model=encoder_model,
                            input_dim=X_train.shape[1], output_dim=y_train.shape[1])

    mlp_model.fit(x=X_train, y=y_train,
                  validation_data=(X_val, y_val),
                  batch_size=32758,
                  epochs=100,
                  verbose=0,
                  callbacks=[early_stop])

    # 转换模型为Tensorflow Lite模型，节约内存与推理时间
    mlp_model = LiteModel.from_keras_model(mlp_model)

    #####################################################
    # STEP 4: 寻找最优valid的阈值
    valid_pred_proba = np.mean(mlp_model.predict(X_val), axis=1)
    best_custom, THRESHOLD = njit_search_best_thresold_custom(
            y_pred_proba=valid_pred_proba,
            dates_array=X_val_dates,
            weights_array=X_val_weight,
            resp_array=X_val_resp)

    #####################################################
    # STEP 5: valid data上按照官方metric进行结果评估
    valid_pred_label = np.where(
            valid_pred_proba>=THRESHOLD, 1, 0).astype(int)
    valid_custom_metric = custom_metric(dates_array=X_val_dates,
                                        weights_array=X_val_weight,
                                        action_label_array=valid_pred_label,
                                        resp_array=X_val_resp)
    valid_acc = accuracy_score(y_val[:, 0].reshape(-1, 1),
                               valid_pred_label.reshape(-1, 1))
    valid_roc_auc = roc_auc_score(y_val[:, 0].reshape(-1, 1),
                                  valid_pred_label.reshape(-1, 1))

    # 标准打印训练信息
    print("-- folds {}({})(train_range: {}->{}, valid_range: {}->{}), valid_acc: {:.4f}, valid_roc_auc: {:.4f}, valid_custom: {:.4f}".format(
            fold+1, N_SPLITS, min(X_train_dates), max(X_train_dates), min(X_val_dates), max(X_val_dates), valid_acc, valid_roc_auc, valid_custom_metric))

    #####################################################
    # STEP 6: 保存模型与关键训练指标
    MODELS.append([THRESHOLD, std_scaler, pca_transformer, mlp_model])
    valid_acc_total.append(valid_acc)
    valid_roc_auc_total.append(valid_roc_auc)
    valid_custom_total.append(valid_custom_metric)

    # 强制内存回收
    del X_train, X_val, y_train, y_val, X_train_pca, X_val_pca
    del X_train_weight, X_val_weight, X_train_resp, X_val_resp, X_train_dates, X_val_dates
    gc.collect()

# 打印总体分数指标
print("-- total metric, valid_acc: {:.4f}, valid_roc_auc: {:.4f}, valid_custom: {:.4f}".format(
        np.mean(valid_acc_total), np.mean(valid_roc_auc_total), np.mean(valid_custom_total)))

print("=========================================")
print("[INFO] {} Model training end.".format(str(datetime.now())[:-4]))

[INFO] 2021-01-14 11:49:27.27 Model training start:
-- folds 1(7)(train_range: 86->137, valid_range: 143->193), valid_acc: 0.5172, valid_roc_auc: 0.5175, valid_custom: 1235.5659
-- folds 2(7)(train_range: 86->188, valid_range: 194->244), valid_acc: 0.5196, valid_roc_auc: 0.5198, valid_custom: 1247.4478
-- folds 3(7)(train_range: 86->239, valid_range: 245->295), valid_acc: 0.5173, valid_roc_auc: 0.5174, valid_custom: 505.0948
-- folds 4(7)(train_range: 86->290, valid_range: 296->346), valid_acc: 0.5194, valid_roc_auc: 0.5199, valid_custom: 907.0717
-- folds 5(7)(train_range: 86->341, valid_range: 347->397), valid_acc: 0.5154, valid_roc_auc: 0.5161, valid_custom: 924.3744
-- folds 6(7)(train_range: 86->392, valid_range: 398->448), valid_acc: 0.5191, valid_roc_auc: 0.5162, valid_custom: 473.3621
-- folds 7(7)(train_range: 86->443, valid_range: 449->499), valid_acc: 0.5218, valid_roc_auc: 0.5218, valid_custom: 2092.8594
-- total metric, valid_acc: 0.5185, valid_roc_auc: 0.5184, valid_custo

In [12]:
"""
预测标签按CV结果依据概率加权策略：根据CV结果，以CV的custom metric的结果对预测概率进行加权。
"""
# 预备提交预测结果
count_0, count_1 = 0, 0

if True:
    # 初始化环境
    env = gen_test_data(example_test_df, example_prediction_df)
    valid_custom_total_norm = np.array(valid_custom_total) / np.sum(valid_custom_total)

    ts_feats_extractor = [RollingStats(max_deque_size=300, n_feats=len(feature_name_list)),
                          RollingStats(max_deque_size=1000, n_feats=len(feature_name_list)),
                          RollingStats(max_deque_size=5000, n_feats=len(feature_name_list)),
                          RollingStats(max_deque_size=15000, n_feats=len(feature_name_list)),
                          RollingStats(max_deque_size=30000, n_feats=len(feature_name_list))]

    # 开始预测
    THRESHOLD = 0.512
    for (test_df, pred_df) in tqdm(env):
        if test_df['weight'].item() > 0:
            x_test_val = test_df[feature_name_list].values
            x_test_val = njit_fillna(x_test_val, mean_val_array)

            # 特征工程pipeline
            # --------------
            x_test_feats = []
            for _, extractor in enumerate(ts_feats_extractor):
                extractor.push(x_test_val)
                x_test_feats.append(extractor.get_mean())
                x_test_feats.append(extractor.get_std())
            x_test_feats = np.concatenate(x_test_feats)[selected_feat_ids]
            x_test_val = np.concatenate([x_test_feats, x_test_val]).reshape(1, -1)

            # 利用MODELS里训练好的神经网络进行训练
            pred_proba_list = []
            for i in range(len(MODELS)):
                x_test_val_pca = MODELS[i][1].transform(x_test_val)
                x_test_val_pca = MODELS[i][2].transform(x_test_val_pca)
                x_test_val_pca = np.hstack([x_test_val, x_test_val_pca])

                pred_proba = MODELS[i][3].predict(x_test_val_pca)
                pred_proba_list.append(np.mean(pred_proba))

            pred = np.average(pred_proba_list, weights=valid_custom_total_norm)
            pred_label = int(np.where(pred >= THRESHOLD, 1, 0))

            if pred_label == 0:
                count_0 += 1
            else:
                count_1 += 1

            pred_df.action = pred_label
        else:
            pred_df.action = 0

    print(count_0/(count_0+count_1), count_1/(count_0+count_1))
    print(np.sum(y[:, 0] == 0)/(np.sum(y[:, 0] == 0) + np.sum(y[:, 0] == 1)),
        np.sum(y[:, 0] == 1)/(np.sum(y[:, 0] == 0) + np.sum(y[:, 0] == 1)))

15219it [00:43, 352.28it/s]0.5131187908635476 0.4868812091364524
0.4981943025871587 0.5018056974128413



In [13]:
"""
预测标签众数生成策略：选择预测的标签的众数作为样本标签。
"""
# 预备提交预测结果
IS_PREDICT = False

if IS_PREDICT:
    # 初始化环境
    env = gen_test_data(example_test_df, example_prediction_df)

    # 预测
    for (test_df, pred_df) in tqdm(env):
        if test_df['weight'].item() > 0:
            x_test_val = test_df[feature_name_list].values
            x_test_val = njit_fillna(x_test_val, mean_val_array).reshape(1, -1)

            # 利用MODELS里训练好的神经网络进行训练
            pred_label_list = []
            for i in range(len(MODELS)):
                x_test_val_pca = MODELS[i][1].transform(x_test_val)
                x_test_val_pca = MODELS[i][2].transform(x_test_val_pca)
                x_test_val_pca = np.hstack([x_test_val, x_test_val_pca])

                pred_proba = MODELS[i][3].predict(x_test_val_pca)
                pred = np.mean(pred_proba)
                pred_label_list.append(int(np.where(pred >= MODELS[i][0], 1, 0)))

            # 选取预测最频繁的样本作为标签
            # https://stackoverflow.com/questions/6252280/find-the-most-frequent-number-in-a-numpy-array
            pred_df.action = max(map(lambda val: (pred_label_list.count(val), val), set(pred_label_list)))[1]
        else:
            pred_df.action = 0