# **Jane Street Real-Time Market Data Forecasting**

Jane Street Group在kaggle平台发布了数据挖掘算法竞赛—Jane Street Real-Time Market Data Forecasting，其提供多个交易品种的真实交易历史数据，要求参赛者充分结合数据特征分布及机器学习技术进行建模，以应对交易数据中存在的厚尾效应、非平稳变化以及市场行为的突变效应等，进而提升指定交易品种的特征变化及交易胜率。

数据部分包括训练数据集、测试数据集、滞后数据集、特征数据集、交易指标集、预测结果集等6个部分，均采用parquet或csv格式，具体描述如下：

（1）训练数据集——包括date_id、time_id、symbol_id、weight、feature_i、responder_i等字段，date_id和time_id提供数据时间结构，symbol_id为交易品种标识，weight用于计算评分函数的权重，feature_i(i=0,1,...,78)表示79个数据特征，responder_i(i=0,1,...,8)表示9个交易指标。

（2）测试数据集——包括date_id、time_id、symbol_id、weight、is_scored、feature_i等字段，其中is_scored表示样本是否参与评分函数计算，其他参数与训练数据中含义相同。

（3）滞后数据集——字段与训练数据集保持一致，其内容为date_id滞后一天的真实交易数据。

（4）数据特征集——表征feature_i(i=0,1,...,78)与tag_i(i=0,1,...,16)之间的布尔关系。

（5）交易指标集——表征responder_i(i=0,1,...,8)与tag_i(i=0,1,...,4)之间的布尔关系。

（6）预测结果集——表征预测交易指标responder_6的结果输出格式，预测精度利用拟合优度指标进行评估。

**解题思路**：首先对原始数据集进行数据探索及预处理，分析（4）和（5）内容表征的含义并完成数据对齐；其次探索数据分布关系构造新生特征，根据特征重要性和相关系数法优选数据特征库；再次选取向量自回归、高斯过程回归、随机森林、梯度提升树、xgboost、lightgbm、多层感知机、LSTM、RNN共十种机器学习算法对交易数据特征进行建模分析，并保存优选的数据模型；最后对测试数据进行特征构建，并加载优选数据模型进行预测存档。

**问题补充**：在实际编码过程中，针对庞大数据集的处理操作容易导致内存溢出问题。因此，考虑针对每一个交易品种独立训练数据模型，在测试过程中对于已知交易品种调用对应模型进行预测，对于未知交易品种则遍历调用模型库进行均值预测。

In [None]:
# 导入三方库
import itertools
import math
import os
import traceback
import warnings

import matplotlib
import numpy as np
import pandas as pd
import polars as pl
import seaborn as sns
from matplotlib import pyplot as plt
from sklearn.ensemble import IsolationForest
from sklearn.neighbors import KNeighborsRegressor

# 设置DataFrame数据展示规格
pd.set_option("display.max_rows", None)
pd.set_option("display.max_columns", None)

# 设置Plot图形中文字体规格
matplotlib.rcParams["font.family"] = "SimHei"
matplotlib.rcParams["axes.unicode_minus"] = False

warnings.filterwarnings("ignore")

In [None]:
# 定义全局参数[孤立森林估计器数目，Z-score检测倍数，单行子图数目，交易品种标识序列，交易品种日交易点数，交易品种模型袋，预测交易指标，数据特征列，源数据文件目录，结果数据文件目录，数据文件字典]
global_params = {
    "max_Iso_estimators": 200,
    "z_score_threshold": 3,
    "subplot_col_num": 3,
    "symbol_ids": set(),
    "symbol_day_sample": dict(),
    "symbol_ids_model_bag": dict(),
    "predict_responder": "responder_6",
    "data_result_file1": "data_exploring_result.xlsx",
    "root_dir": "D:/Kaggle_competitions/Real_time_market_forecasting/data",
    "result_dir": "D:/Kaggle_competitions/Real_time_market_forecasting/result",
    # "root_dir": "/kaggle/input/jane-street-real-time-market-data-forecasting",
    # "result_dir": "/kaggle/working/jane-street-real-time-market-data-forecasting",
    "data_files_dictionary": {"training":[], "testing":[], "lagging":[]}
}

In [None]:
# 校验数据文件目录
def init_checking():
    try:
        if not os.path.exists(global_params["root_dir"]):
            raise Exception("数据源文件目录不存在！")
        if not os.path.exists(global_params["result_dir"]):
            print("数据存储目录不存在，正在创建数据存储目录...")
            os.mkdir(global_params["result_dir"])
            if not os.path.exists(global_params["result_dir"]):
                raise ValueError("数据存储目录无法创建!")
            else:
                print("数据存储目录创建成功！")
    except Exception as e:
        raise e
    finally:
        if os.path.exists(global_params["root_dir"]) and os.path.exists(global_params["result_dir"]):
            print("数据源文件目录和数据存储目录正常,开始数据分析建模任务...")
        else:
            print("数据源文件目录或数据存储目录异常常,请核查源数据文件...")

In [None]:
# 归集源文件信息
def data_file_gathering():
    """
    源文件信息归集
    :return: 无
    """
    # 归集数据文件
    training_data_files, testing_data_files, lagging_data_files = [], [], []
    training_data_dir = os.path.join(global_params["root_dir"], "train.parquet")
    testing_data_dir = os.path.join(global_params["root_dir"], "test.parquet")
    lagging_data_dir = os.path.join(global_params["root_dir"], "lags.parquet")
    for root, dirs, files in os.walk(training_data_dir):
        training_data_files.extend([os.path.join(root, file) for file in files])
    for root, dirs, files in os.walk(testing_data_dir):
        testing_data_files.extend([os.path.join(root, file) for file in files])
    for root, dirs, files in os.walk(lagging_data_dir):
        lagging_data_files.extend([os.path.join(root, file) for file in files])
    global_params["data_files_dictionary"]["training"] = training_data_files
    global_params["data_files_dictionary"]["testing"] = testing_data_files
    global_params["data_files_dictionary"]["lagging"] = lagging_data_files
    # 归集品种信息
    symbol_ids = set()
    for data_file in global_params["data_files_dictionary"]["testing"]:
        current_symbol_ids = pl.read_parquet(data_file)["symbol_id"].unique()
        symbol_ids.update(set(current_symbol_ids))
    global_params["symbol_ids"] = sorted(symbol_ids)
    global_params["symbol_day_sample"] = {symbol_id: 0 for symbol_id in global_params["symbol_ids"]}
    global_params["symbol_ids_model_bag"] = {symbol_id: [] for symbol_id in global_params["symbol_ids"]}

In [None]:
# 抽取训练数据
def data_extracting(symbol_id: int=None) -> pl.DataFrame:
    """
    根据交易品种ID抽取训练数据集
    :param symbol_id: 交易品种ID
    :return: 交易品种ID的训练数据集
    """
    if symbol_id is None:
        raise ValueError("symbol_id参数不能为空！")
    symbol_training_data, sub_block_data_list = pl.DataFrame(), []
    for training_data_file in global_params["data_files_dictionary"]["training"]:
        sub_block_data = pl.read_parquet(training_data_file)
        sub_block_data = sub_block_data.filter(sub_block_data["symbol_id"] == symbol_id) if symbol_id is not None else sub_block_data
        sub_block_data_list.append(sub_block_data)
    symbol_training_data = pl.concat(sub_block_data_list).unique().sort(by=["date_id", "time_id"])
    global_params["symbol_day_sample"][symbol_id] = symbol_training_data["time_id"].max() + 1
    
    return symbol_training_data

In [None]:
# 数据缺失率计算
def missing_ratio_computing(source_data: pl.DataFrame=None, data_type: str="training", missing_phase: int=0, symbol_id: int=None) -> pl.DataFrame:
    """
    计算数据缺失率
    :param source_data: 原始数据
    :param data_type: 数据类型
    :param missing_phase: 数据截断
    :param symbol_id: 交易品种ID
    :return: 数据缺失率计算结果
    """
    data_rows, missing_ratio_data = source_data.shape[0], {}
    for col in source_data:
        if col.dtype in [pl.Int64, pl.Float64, pl.Int32, pl.Float32]:
            missing_ratio_data[col.name] = (col.is_nan() | col.is_null()).sum() / data_rows
        else:
            missing_ratio_data[col.name] = col.is_null().sum() / data_rows
    plt.figure(figsize=(20, 4))
    plt_name = f"{symbol_id if symbol_id is not None else ''}{'-原始' if missing_phase==1 else '-截断' if missing_phase==2  else '-标记' if missing_phase==3 else ''}{'训练' if data_type=='training' else '测试'}数据特征缺失率"
    sns.barplot(x=source_data.columns, y=list(missing_ratio_data.values()), color="red", alpha=0.8, edgecolor="black", linewidth=0.5)
    plt.title(plt_name, fontsize=16)
    plt.xticks(rotation=90, fontsize=10)
    plt.yticks(fontsize=10)
    plt.xlabel("数据特征名称", fontsize=12)
    plt.ylabel("数据特征缺失率", fontsize=12)
    plt_path = os.path.join(global_params["result_dir"], f"{plt_name}.png")
    plt.savefig(plt_path)
    plt.show()
    
    return pl.DataFrame(data=missing_ratio_data)

In [None]:
# 相关特征提取
def correlation_factor_extracting(source_data: pl.DataFrame=None, max_corr_param: float=0.8) -> list:
    """
    构建相关性矩阵并提取强相关特征
    :param source_data: 特征数据
    :param max_corr_param: 最大相关系数
    :return: 混淆矩阵,强相关特征序列
    """
    correlation_matrix = source_data.corr()
    feature_names_series, is_stronger_correlation_tag = pl.Series(correlation_matrix.columns), pl.Series()
    while True:
        is_stronger_correlation_tag = correlation_matrix.select(pl.col(global_params["predict_responder"]).abs())[global_params["predict_responder"]] > max_corr_param
        if is_stronger_correlation_tag.sum() >= 9:
            break
        else:
            max_corr_param -= 0.05
    stronger_correlation_factor = feature_names_series.filter(is_stronger_correlation_tag).to_list()
    
    return [correlation_matrix, stronger_correlation_factor]

In [None]:
# 数据关联探索
def correlation_analyzing(source_data: pl.DataFrame=None, data_symbol: str=None, data_phase: int=None, is_double: bool=None, is_sum: bool=None, symbol_id: int=None) -> pl.DataFrame:
    """
    探索数据关联关系
    :param source_data: 原始数据
    :param data_symbol: 数据标记
    :param data_phase: 数据阶段
    :param is_double: 是否成对分析
    :param is_sum: 指标是否累积
    :param symbol_id: 交易品种ID
    :return: 无
    """
    sub_type_dict = {"features": "数据特征", "responders": "交易指标", "responder&responders": "预测交易指标&辅助交易指标", "responder&features": "预测交易指标&数据特征"}
    plt_name = f"{str(symbol_id) + '-' if symbol_id is not None else ''}{'全局' if data_phase==1 else '局部' if data_phase==2 else ''}{'累积' + sub_type_dict[data_symbol] if is_sum else sub_type_dict[data_symbol]}间的{'成对' if is_double else '成组'}相关性分析"
    is_large_plt = False if source_data.shape[1] <= 18 else True
    data_cols = list(source_data.columns)
    if global_params["predict_responder"] in data_cols:
        data_cols.remove(global_params["predict_responder"])
        data_cols.append(global_params["predict_responder"])
    source_data = source_data[data_cols]
    correlation_matrix, stronger_correlation_factor = correlation_factor_extracting(source_data=source_data, max_corr_param=0.8)
    source_data = source_data[stronger_correlation_factor]
    try:
        if not is_double:
            plt.figure(figsize=(12, 12))
            sns.heatmap(correlation_matrix, square=True, cmap="coolwarm", alpha =0.5, vmin=-1, vmax=1, center= 0, linewidths=0.5, linecolor="white", annot=True if not is_large_plt else False, fmt=".3f")
            plt.xticks(fontsize=9)
            plt.yticks(fontsize=9)
            if not is_large_plt:
                plt.xticks(ticks=range(len(data_cols)), labels=data_cols, rotation=45, va="top")
                plt.yticks(ticks=range(len(data_cols)), labels=data_cols, rotation=0, ha="right")
            plt.xlabel(data_symbol, fontsize=12)
            plt.ylabel(data_symbol, fontsize=12)
            plt.title(plt_name, fontsize=14)
        else:
            plt_nums = source_data.shape[1] - 1
            plt_col_nums = global_params["subplot_col_num"]
            plt_row_nums = math.ceil(plt_nums / plt_col_nums)
            fig, axes = plt.subplots(nrows=plt_row_nums, ncols=plt_col_nums, figsize=(15, (plt_col_nums + 1) * plt_row_nums), squeeze=False)
            plt.suptitle(plt_name, ha="center", va="bottom", fontsize=16, y=0.95 if data_symbol!="responder&features" else 0.96)
            for plt_idx in range(plt_col_nums * plt_row_nums):
                row_idx = plt_idx // plt_col_nums
                col_idx = plt_idx % plt_col_nums
                if plt_idx < plt_nums:
                    # 探索预测交易指标与第i列数据间的相关性
                    col = source_data.columns[plt_idx]
                    ax = axes[row_idx, col_idx]
                    ax.grid(color="grey")
                    ax.set_xlabel(f"{col}", fontsize=10)
                    ax.set_ylabel(f"{global_params['predict_responder']}", fontsize=10)
                    ax.set_title(f"{col}&{global_params['predict_responder']}", fontsize=12)
                    ax.hexbin(source_data[col], source_data[global_params['predict_responder']], gridsize=500, cmap="coolwarm", bins="log", alpha = 0.2)
                else:
                    axes[row_idx, col_idx].axis("off")
            fig.patch.set_linewidth(3)
            fig.patch.set_edgecolor("#000000")
            fig.patch.set_facecolor("#eeeeee")
        plt.tight_layout(rect=[0, 0, 1, 0.96])
        plt_path = os.path.join(global_params["result_dir"], f"{plt_name}.png")
        plt.savefig(plt_path)
        plt.show()
    except Exception:
        print(f"{plt_name}绘制失败，原因：{traceback.print_exc()}")
    
    return correlation_matrix

In [None]:
# 数据分布探索
def distribution_exploring(source_data: pl.DataFrame=None, is_division: bool=None, data_phase: int=None, is_responder: bool=False, is_feature: bool=False, symbol_id: int=None):
    """
    探索数据分布关系
    :param source_data: 原始数据
    :param is_division: 是否独立
    :param data_phase: 数据阶段
    :param is_responder: 是否为指标数据
    :param is_feature: 是否为特征数据
    :param symbol_id: 交易品种ID
    :return: 无
    """
    plt_name = f"{symbol_id}{'-单一交易' if is_division and is_responder else '-不同交易' if not is_division else ''}{'全局' if data_phase==1 else '局部' if data_phase==2 else ''}{'-特征' if is_feature else '指标' if is_responder else ''}数据分布关系探索"
    correlation_matrix, stronger_correlation_factor = correlation_factor_extracting(source_data=source_data, max_corr_param=0.8)
    source_data = source_data[stronger_correlation_factor]
    try:
        if not is_division:
            plt.figure(figsize=(16, 6))
            for col in source_data.columns:
                if col == global_params["predict_responder"]:
                    plt.plot(range(source_data.shape[0]), source_data[col].cum_sum(), color="red", linewidth=1.2)
                else:
                    plt.plot(range(source_data.shape[0]), source_data[col].cum_sum(), linewidth=1)
                plt.legend(source_data.columns, fontsize=14)
                plt.xticks(fontsize=12)
                plt.yticks(fontsize=12)
                plt.xlabel("时间", fontsize=14)
                plt.ylabel("交易指标值", fontsize=14)
                plt.title(plt_name, fontsize=16)
        else:
            plt_col_nums = global_params["subplot_col_num"]
            plt_row_nums = source_data.shape[1] if not is_feature else source_data.shape[1] - 1
            fig, axes = plt.subplots(nrows=plt_row_nums, ncols=plt_col_nums, figsize=(15, (plt_col_nums + 1) * plt_row_nums), squeeze=False)
            plt.suptitle(plt_name, ha="center", va="bottom", fontsize=16, y=0.96)
            for plt_idx in range(plt_row_nums * plt_col_nums):
                row_idx = plt_idx // plt_col_nums
                col_idx = plt_idx % plt_col_nums
                if is_responder:
                    # 探索第i维的指标数据分布
                    col = source_data.columns[row_idx]
                    ax = axes[row_idx, col_idx]
                    ax.grid(color="grey")
                    plt_color = "blue" if col != global_params["predict_responder"] else "red"
                    line_color = "red" if col != global_params["predict_responder"] else "blue"
                    if col_idx == 0:
                        ax.set_ylim([-6, 6])
                        ax.set_xlabel("时间", fontsize=10)
                        ax.set_ylabel("交易指标值", fontsize=10)
                        ax.plot(range(source_data.shape[0]), source_data[col], color=plt_color, linewidth=0.08)
                    if col_idx == 1:
                        ax.set_xlabel("时间", fontsize=10)
                        ax.set_ylabel("累积交易指标值", fontsize=10)
                        ax.plot(range(source_data.shape[0]), source_data[col].cum_sum(), color=plt_color, linewidth=0.8)
                    else:
                        ax.set_xlabel("交易指标值", fontsize=10)
                        ax.set_ylabel("交易指标值频度", fontsize=10)
                        ax.hist(source_data[col], bins=1000, color=plt_color, density=True, histtype="step")
                        ax.hist(source_data[col], bins=1000, color="lightgrey", density=True)
                    ax.axhline(0, color=line_color, linestyle="-", linewidth=1)
                    ax.set_title(f"{col}-{'原始分布' if col_idx == 0 else '累积分布' if col_idx == 1 else '密度分布'}", fontsize=12)
                elif is_feature:
                    # 探索第i维的特征数据分布
                    col = source_data.columns[row_idx]
                    ax = axes[row_idx, col_idx]
                    ax.grid(color="grey")
                    if col_idx == 0:
                        ax.set_xlabel("时间", fontsize=10)
                        ax.set_ylabel("特征值", fontsize=10)
                        ax.plot(range(source_data.shape[0]), source_data[col], color="blue")
                    if col_idx == 1:
                        ax.set_xlabel("时间", fontsize=10)
                        ax.set_ylabel("累积特征值", fontsize=10)
                        ax.plot(range(source_data.shape[0]), source_data[col].cum_sum(), color="blue")
                    else:
                        ax.set_xlabel("特征值", fontsize=10)
                        ax.set_ylabel("特征值频度", fontsize=10)
                        ax.hist(source_data[col], bins=1000, color="blue", density=True, histtype="step")
                        ax.hist(source_data[col], bins=1000, color="lightgrey", density=True)
                    ax.axhline(0, color="red", linestyle="-", linewidth=1)
                    ax.set_title(f"{col}-{'原始分布' if col_idx == 0 else '累积分布' if col_idx == 1 else '密度分布'}", fontsize=12)
            fig.patch.set_linewidth(3)
            fig.patch.set_edgecolor("#000000")
            fig.patch.set_facecolor("#eeeeee")
        plt.tight_layout(rect=[0, 0, 1, 0.96])
        plt_path = os.path.join(global_params["result_dir"], f"{plt_name}.png")
        plt.savefig(plt_path)
        plt.show()
    except Exception:
        print(f"{plt_name}绘制失败，原因：{traceback.print_exc()}")

In [None]:
# 分析交易指标相关性
def responder_correlation_analyzing(sub_data: pl.DataFrame=None, correlation_phase: int=None, is_sum: bool=None, symbol_id: int=None) -> pl.DataFrame:
    """
    分析指定交易品种预测指标与其他交易指标间的相关性
    :param sub_data: 交易品种对应的指标数据
    :param correlation_phase: 相关分析阶段
    :param is_sum: 指标数据是否累积
    :param symbol_id: 交易品种ID
    :return: 交易指标间的相关性矩阵
    """
    # 获取交易指标列名
    if not is_sum:
        responder_cols = sorted([col for col in sub_data.columns if col.startswith("responder_") and not "_cumsum" in col])
        # 探索交易指标间的数据分布关系
        distribution_exploring(source_data=sub_data[responder_cols], is_division=False, data_phase=correlation_phase, is_responder=True, symbol_id=symbol_id)
        distribution_exploring(source_data=sub_data[responder_cols], is_division=True, data_phase=correlation_phase, is_responder=True, symbol_id=symbol_id)
        # 探索交易指标间的数据相关性关系
        correlation_analyzing(source_data=sub_data[responder_cols], data_symbol="responder&responders", data_phase=correlation_phase, is_double=True, is_sum=False, symbol_id=symbol_id)
        responder_responders_correlation_matrix = correlation_analyzing(source_data=sub_data[responder_cols], data_symbol="responder&responders", data_phase=correlation_phase, is_double=False, is_sum=False, symbol_id=symbol_id)
    else:
        responder_cols = sorted([col for col in sub_data.columns if all([col.startswith("responder_"), "_cumsum" in col]) or col == global_params["predict_responder"]])
        # 探索累积交易指标间的相关性关系
        correlation_analyzing(source_data=sub_data[responder_cols], data_symbol="responder&responders", data_phase=correlation_phase, is_double=True, is_sum=True, symbol_id=symbol_id)
        responder_responders_correlation_matrix = correlation_analyzing(source_data=sub_data[responder_cols], data_symbol="responder&responders", data_phase=correlation_phase, is_double=False, is_sum=True, symbol_id=symbol_id)
    
    return responder_responders_correlation_matrix

In [None]:
# 分析指标特征相关性
def feature_correlation_analyzing(sub_data: pl.DataFrame=None, correlation_phase: int=None, is_sum: bool=None, symbol_id: int=None) -> pl.DataFrame:
    """
    分析指定交易品种预测指标与数据特征间的相关性
    :param correlation_phase: 相关分析阶段
    :param sub_data: 交易品种对应的特征数据
    :param is_sum: 指标数据是否累积
    :param symbol_id: 交易品种ID
    :return: 预测交易指标与数据特征间的相关性矩阵
    """
    # 获取数据特征列名
    if not is_sum:
        feature_cols = sorted([col for col in sub_data.columns if all([col.startswith("feature_"), not "_cumsum" in col]) or col == global_params["predict_responder"]])
        # 探索特征数据分布关系
        distribution_exploring(source_data=sub_data[feature_cols], is_division=True, data_phase=correlation_phase, is_feature=True, symbol_id=symbol_id)
        # 探索预测交易指标与数据特征间的相关性关系
        correlation_analyzing(source_data=sub_data[feature_cols], data_symbol="responder&features", data_phase=correlation_phase, is_double=True, is_sum=False, symbol_id=symbol_id)
        responder_features_correlation_matrix = correlation_analyzing(source_data=sub_data[feature_cols], data_symbol="responder&features", data_phase=correlation_phase, is_double=False, is_sum=False, symbol_id=symbol_id)
    else:
        feature_cols = sorted([col for col in sub_data.columns if all([col.startswith("feature_"), "_cumsum" in col]) or col == global_params["predict_responder"]])
        # 探索预测交易指标与数据特征间的相关性关系
        correlation_analyzing(source_data=sub_data[feature_cols], data_symbol="responder&features", data_phase=correlation_phase, is_double=True, is_sum=True, symbol_id=symbol_id)
        responder_features_correlation_matrix = correlation_analyzing(source_data=sub_data[feature_cols], data_symbol="responder&features", data_phase=correlation_phase, is_double=False, is_sum=True, symbol_id=symbol_id)
    
    return responder_features_correlation_matrix

In [None]:
# 截断对齐数据
def data_truncating_aligning(source_data: pl.DataFrame=None, symbol_id: int=None) -> pl.DataFrame:
    """
    原始数据截断后对齐
    :param source_data: 原始数据
    :param symbol_id: 交易品种ID
    :return: 截断后对齐的缺失数据
    """
    def longest_subsequence_searching(date_ids: list=None):
        """
        查找最长交易子序列
        :param date_ids: 交易日期序列
        :return: 最长交易子序列
        """
        groups = []
        for key, group in itertools.groupby(enumerate(date_ids), lambda x: x[1] - x[0]):
            groups.append(list(map(lambda x: x[1], group)))
        longest_trading_subsequence = max(groups, key=len)

        return longest_trading_subsequence

    # 规整时间索引
    longest_subsequence = longest_subsequence_searching(date_ids=sorted(source_data["date_id"].unique()))
    start_date, end_date = min(longest_subsequence), max(longest_subsequence)
    date_dtype, time_dtype = source_data["date_id"].dtype, source_data["time_id"].dtype
    time_index_data = pl.DataFrame({
        "date_id": pl.Series(np.arange(start_date, end_date + 1).repeat(source_data["time_id"].max() + 1),
                             dtype=date_dtype),
        "time_id": pl.Series(list(range(global_params["symbol_day_sample"][symbol_id])) * (end_date - start_date + 1),
                             dtype=time_dtype)
    })
    source_data = time_index_data.join(source_data, on=["date_id", "time_id"], how="left")

    return source_data

In [None]:
# 区分数据分布
def data_category_recognizing(source_data: pl.DataFrame=None) -> pl.DataFrame:
    """
    根据峰度和偏度指标对数据密度分布情况对进行归类
    1. 近似正态分布 —— 峰度和偏度指标适中
    2. 近似均匀分布 —— 峰度较小且偏度较大
    3. 未知数据分布 —— 峰度很大且偏度很小及其他情况
    :param source_data: 原始数据
    :return: 数据分布类别
    """
    def col_data_categorizing(data_series: pl.Series=None) -> float:
        """
        对单列数据进行数据分布归类
        :param data_series: 单一数据列
        :return: 数据分布类别
        """
        # 临时处理序列空值
        valued_data_series = data_series.drop_nans().drop_nulls()
        fill_value = 0 if valued_data_series.is_empty() else valued_data_series.mean()
        data_series = data_series.fill_null(fill_value).fill_nan(fill_value)
        # 计算峰度和偏度指标
        col_kurtosis = abs(data_series.kurtosis())
        col_skewness = abs(data_series.skew())
        if (2 <= col_kurtosis <= 20) and col_skewness <= 2:
            col_data_category = 1.0
        elif col_kurtosis < 2 and col_skewness <= 10:
            col_data_category = 2.0
        else:
            col_data_category = 3.0
        
        return col_data_category
    
    distribution_category_dict = dict([(col_name, col_data_categorizing(data_series=source_data[col_name])) for col_name in source_data.columns])
    
    return pl.DataFrame(distribution_category_dict)

In [None]:
# 标记异常数据
def data_detecting(source_data: pl.DataFrame = None, density_category: pl.DataFrame = None) -> pl.DataFrame:
    """
    异常数据标记
    :param source_data: 原始数据
    :param density_category: 分布类别
    :return: 填补后的完备数据
    """
    def col_outlier_detecting(data_series: pl.Series=None, data_category: float=None, needing_cols: list=None) -> pl.Series:
        """
        序列异常值标记
        :param data_series: 原始数据序列
        :param data_category: 数据分布类别
        :param needing_cols: 待处理数据列
        :return: 标记异常后的缺失序列
        """
        print(f"正在处理列：{data_series.name}")
        if data_series.name not in needing_cols:
            return data_series
        else:
            # 标记空值索引
            null_flag = data_series.is_in([np.nan, np.inf, -np.inf]) | data_series.is_null()
            if data_category in [1.0, 3.0]:
                # 基于Z-score方法对近正态分布异常值进行标记
                mean_value = data_series.filter(~null_flag).mean()
                std_value = data_series.filter(~null_flag).std()
                z_scores = (data_series - mean_value) / std_value
                null_flag |= (z_scores > global_params["z_score_threshold"]) | (z_scores < -global_params["z_score_threshold"])
            elif data_category == 2.0:
                # 基于孤立森林对近均匀分布异常值进行标记
                iso_forest = IsolationForest(n_estimators=global_params["max_Iso_estimators"], contamination="auto", n_jobs=-1)
                labels = iso_forest.fit_predict(data_series.filter(~null_flag).to_numpy().reshape(-1, 1))
                
                # 创建掩码标记异常值并置空
                mask = labels == -1
                nan_count, mask_index, mask_length = [0, 0, len(mask)]
                for null_tag in null_flag:
                    if null_tag:
                        nan_count += 1
                    else:
                        null_flag[mask_index + nan_count] = mask[mask_index]
                        mask_index += 1
    
            # 异常数据置空
            for index in range(len(null_flag)):
                if null_flag[index]:
                    data_series[index] = np.nan
    
            return data_series
    
    # 遍历特征列进行异常数据标记
    needing_cols = [col_name for col_name in source_data.columns if (col_name.startswith("feature_") or col_name.startswith("responder_")) and len(source_data[col_name].unique()) > 1]
    source_data = source_data.select([col_outlier_detecting(data_series=source_data[col_name], data_category=density_category[col_name].item(), needing_cols=needing_cols) for col_name in source_data.columns])
    
    return source_data

In [None]:
# 填补缺失数据
def data_filling(source_data: pl.DataFrame=None, missing_ratio_data: pl.DataFrame=None, is_predict_phase: bool=False) -> pl.DataFrame:
    """
    原始数据填补
    :param source_data: 原始数据
    :param missing_ratio_data: 数据缺失率
    :param is_predict_phase: 是否预测阶段
    :return: 填补后的完备数据
    """
    # 填补列缺失数据
    def col_null_filling(data_series: pl.Series = None, missing_ratio_data: pl.DataFrame=None) -> pl.Series:
        """
        序列缺失值近邻填补
        :param missing_ratio_data: 数据缺失率
        :param data_series: 原始数据序列
        :return: k近邻填充后的完备序列
        """
        if not isinstance(data_series, pl.Series):
           raise ValueError("data_series应为Polars Series类型")
        
        if missing_ratio_data[data_series.name].item() == 0:
            return data_series
        else:
            # 提取非空值索引和对应值
            not_null_mask = (~data_series.is_nan()) & (~data_series.is_null())
            x_known = np.arange(len(data_series))[not_null_mask].reshape(-1, 1)
            y_known = data_series.filter(not_null_mask).to_numpy().reshape(-1, 1)
            
            # 基于非空值占比选择近邻数据点并训练KNN回归模型
            n_samples = len(x_known)
            if n_samples == 0:
                return pl.Series(np.zeros(data_series.shape[0]))
            elif n_samples  < data_series.shape[0] * 0.0001:
                knn_neighbors_num = n_samples - 1
            else:
                knn_neighbors_num = 5
            knn_regressor = KNeighborsRegressor(n_neighbors=knn_neighbors_num)
            knn_regressor.fit(x_known, y_known.ravel())
            
            # 利用KNN回归模型填补缺失值
            x_all = np.arange(len(data_series)).reshape(-1, 1)
            filled_values = knn_regressor.predict(x_all)
            
            return pl.Series(filled_values).alias(data_series.name).fill_nan(0).fill_null(0)
    
    if is_predict_phase:
        source_data = source_data.fill_nan(0).fill_null(0)
    else:
        # 遍历特征列进行缺失数据填补
        source_data = source_data.select([col_null_filling(data_series=source_data[col_name], missing_ratio_data=missing_ratio_data) for col_name in source_data.columns])
        
    return source_data

In [None]:
# 组合统计信息
def statistic_info_combining(source_data: pl.DataFrame=None, density_category_description: pl.DataFrame=None):
    """
    统计信息组合
    :param source_data: 填补后的完备数据
    :param density_category_description: 数据分布类别描述
    :return: 数据统计信息
    """
    source_data_description = source_data.describe()

    def combine_col_info(col_category: float=None, col_name: str=None, statistic_description: pl.DataFrame=None):
        """
        对列统计信息进行组合
        :param col_category: 列数据分布类别
        :param col_name: 数据列名
        :param statistic_description: 数据统计信息
        :return: 列统计信息
        """
        if col_category == 2.0:
            col_mean_value = statistic_description.filter(pl.col("statistic") == "mean").select(col_name).item()
            col_std_value = statistic_description.filter(pl.col("statistic") == "std").select(col_name).item()
            col_statistic_info = [col_category, col_mean_value, col_std_value]
        else:
            col_min_value = statistic_description.filter(pl.col("statistic") == "min").select(col_name).item()
            col_max_value = statistic_description.filter(pl.col("statistic") == "max").select(col_name).item()
            col_statistic_info = [col_category, col_min_value, col_max_value]

        return col_statistic_info

    # 对数据统计信息进行遍历更新
    for col_name in source_data.columns:
        col_category = density_category_description[col_name].item()
        density_category_description = density_category_description.with_columns(pl.lit(combine_col_info(col_category, col_name, source_data_description)).alias(col_name))

    return density_category_description

In [None]:
# 构造数据特征
def feature_constructing(source_data: pl.DataFrame=None):
    """
    数据特征构造
    :param source_data: 原始数据
    :return: 构造后的数据特征
    """
    # 提取可变列名
    not_fixed_cols = [col_name for col_name in source_data.columns if col_name.startswith("feature_") or col_name.startswith("responder_")]

    # 构造累积特征
    source_data = source_data.with_columns([pl.col(col_name).cum_sum().alias(f"{col_name}_cumsum") for col_name in not_fixed_cols])
    
    # 构造滞后特征
    source_data = source_data.with_columns([pl.col(col_name).shift(1).alias(f"{col_name}_time_lagging_1") for col_name in not_fixed_cols])
    source_data = source_data.with_columns([pl.col(col_name).shift(global_params["symbol_day_sample"][symbol_id]).alias(f"{col_name}_date_lagging_1") for col_name in not_fixed_cols])

    return source_data

In [None]:
# 原始数据探索
def training_data_exploring():
    # 初始化分析结果变量
    factor_name_setting = False
    truncated_dates_result = pl.DataFrame({"factor_name": ["start_date", "end_date"]})
    missing_description_result1, missing_description_result2, missing_description_result3, partial_density_description_result = [pl.DataFrame() for _ in range(4)]
    entire_individual_correlation_analyzing_result, partial_individual_correlation_analyzing_result, partial_cumsum_correlation_analyzing_result = [pl.DataFrame() for _ in range(3)]

    # 遍历交易品种进行数据探索
    for symbol_id in global_params["symbol_ids"]:
        print(f"交易品种-{symbol_id}-数据探索开始")

        # 抽取原始数据
        current_training_data = data_extracting(symbol_id=symbol_id)

        # 设置分析结果矩阵的因子名称
        if not factor_name_setting:
            entire_factor_name = pl.DataFrame({"factor_name": current_training_data.columns})
            missing_description_result1 = missing_description_result1.with_columns(entire_factor_name.select("factor_name"))
            missing_description_result2 = missing_description_result2.with_columns(entire_factor_name.select("factor_name"))
            missing_description_result3 = missing_description_result3.with_columns(entire_factor_name.select("factor_name"))
            partial_density_description_result = partial_density_description_result.with_columns(
                entire_factor_name.select("factor_name"))

            # 定义特征因子名称
            factor_name = [[f"{col_name}", f"{col_name}_cumsum", f"{col_name}_time_id_lagging_1", f"{col_name}_date_id_lagging_1"] for col_name in current_training_data.columns if any([col_name.startswith("feature_"), col_name.startswith("responder_")])]
            factor_name = sorted([col_name for sub_partial_factor in factor_name for col_name in sub_partial_factor])

            # 定义全局独立特征响应变量名称
            partial_factor_name1 = [col_name for col_name in factor_name if all([col_name.startswith("feature_") or (col_name.startswith("responder_") and col_name != global_params["predict_responder"]), "cumsum" not in col_name, "time_id_lagging_1" not in col_name, "date_id_lagging_1" not in col_name])]
            correlation_factor_name = pl.DataFrame({"factor_name": partial_factor_name1})
            entire_individual_correlation_analyzing_result = entire_individual_correlation_analyzing_result.with_columns(correlation_factor_name.select("factor_name"))

            # 定义截断独立特征响应变量名称
            partial_factor_name2 = [col_name for col_name in factor_name if "cumsum" not in col_name and col_name != global_params["predict_responder"]]
            correlation_factor_name = pl.DataFrame({"factor_name": partial_factor_name2})
            partial_individual_correlation_analyzing_result = partial_individual_correlation_analyzing_result.with_columns(correlation_factor_name.select("factor_name"))

            # 定义截断累积特征响应变量名称
            partial_factor_name3 = [col_name for col_name in factor_name if "cumsum" in col_name]
            correlation_factor_name = pl.DataFrame({"factor_name": partial_factor_name3})
            partial_cumsum_correlation_analyzing_result = partial_cumsum_correlation_analyzing_result.with_columns(correlation_factor_name.select("factor_name"))

        factor_name_setting = True

        # 源数据探索—缺失率计算
        missing_description = missing_ratio_computing(source_data=current_training_data, missing_phase=1, symbol_id=symbol_id).fill_nan(0).fill_null(0)

        # 源数据探索—指标关联分析
        responders_correlation_matrix = responder_correlation_analyzing(sub_data=current_training_data, correlation_phase=1, is_sum=False, symbol_id=symbol_id).fill_nan(0)
        individual_responders_correlation_data = responders_correlation_matrix.select(pl.col(global_params["predict_responder"]).alias(f"symbol_id_{symbol_id}"))[:-1]

        # 源数据探索—特征关联分析
        features_correlation_matrix = feature_correlation_analyzing(sub_data=current_training_data, correlation_phase=1, is_sum=False, symbol_id=symbol_id).fill_nan(0)
        individual_features_correlation_data = features_correlation_matrix.select(pl.col(global_params["predict_responder"]).alias(f"symbol_id_{symbol_id}"))[:-1]

        # 源数据预处理—截断对齐
        truncated_training_data = data_truncating_aligning(source_data=current_training_data)

        # 源数据预处理—缺失率计算
        truncated_missing_description = missing_ratio_computing(source_data=truncated_training_data, missing_phase=2, symbol_id=symbol_id)

        # 源数据预处理—分布划分
        density_category_description = data_category_recognizing(source_data=truncated_training_data)

        # 源数据预处理—异常标记
        detected_training_data = data_detecting(source_data=truncated_training_data, density_category=density_category_description)

        # 源数据预处理—缺失率计算
        detected_missing_description = missing_ratio_computing(source_data=detected_training_data, missing_phase=3, symbol_id=symbol_id)

        # 源数据预处理—缺失填补
        filled_training_data = data_filling(source_data=detected_training_data, missing_ratio_data=detected_missing_description, is_predict_phase=False)

        # 源数据预处理—统计组合
        density_category_description = statistic_info_combining(source_data=filled_training_data, density_category_description=density_category_description)

        # 源数据预处理—特征构造
        constructed_training_data = feature_constructing(source_data=filled_training_data).drop_nans().drop_nulls()
        constructed_training_data.write_parquet(os.path.join(global_params["result_dir"], f"交易品种-{symbol_id}-的数据特征.parquet"))

        # 源数据探索—独立指标关联分析
        truncated_individual_responders_correlation_matrix = responder_correlation_analyzing(sub_data=constructed_training_data,correlation_phase=2, is_sum=False,symbol_id=symbol_id).fill_nan(0)
        partial_individual_responders_correlation_data = truncated_individual_responders_correlation_matrix.select(pl.col(global_params["predict_responder"]).alias(f"symbol_id_{symbol_id}"))[:-1]

        # 源数据探索—累积指标关联分析
        truncated_cumsum_responders_correlation_matrix = responder_correlation_analyzing(sub_data=constructed_training_data, correlation_phase=2, is_sum=True, symbol_id=symbol_id).fill_nan(0)
        partial_cumsum_responders_correlation_data = truncated_cumsum_responders_correlation_matrix.select(pl.col(global_params["predict_responder"]).alias(f"symbol_id_{symbol_id}"))[:-1]

        # 源数据探索—独立特征关联分析
        truncated_individual_features_correlation_matrix = feature_correlation_analyzing(sub_data=constructed_training_data, correlation_phase=2, is_sum=False, symbol_id=symbol_id).fill_nan(0)
        partial_individual_features_correlation_data = truncated_individual_features_correlation_matrix.select(pl.col(global_params["predict_responder"]).alias(f"symbol_id_{symbol_id}"))[:-1]

        # 源数据探索—累积特征关联分析
        truncated_cumsum_features_correlation_matrix = feature_correlation_analyzing(sub_data=constructed_training_data, correlation_phase=2, is_sum=True, symbol_id=symbol_id).fill_nan(0)
        partial_cumsum_features_correlation_data = truncated_cumsum_features_correlation_matrix.select(pl.col(global_params["predict_responder"]).alias(f"symbol_id_{symbol_id}"))[:-1]

        # 记录数据分析结果
        truncated_dates_result = truncated_dates_result.with_columns(pl.Series([constructed_training_data["date_id"].min(), constructed_training_data["date_id"].max()]).alias(f"symbol_id_{symbol_id}"))
        missing_ratio_data1 = missing_description.select(pl.concat_list(pl.all()).alias(f"symbol_id_{symbol_id}")).explode(f"symbol_id_{symbol_id}")
        missing_description_result1 = missing_description_result1.with_columns([missing_ratio_data1[f"symbol_id_{symbol_id}"].alias(f"symbol_id_{symbol_id}")])
        missing_ratio_data2 = truncated_missing_description.select(pl.concat_list(pl.all()).alias(f"symbol_id_{symbol_id}")).explode(f"symbol_id_{symbol_id}")
        missing_description_result2 = missing_description_result2.with_columns([missing_ratio_data2[f"symbol_id_{symbol_id}"].alias(f"symbol_id_{symbol_id}")])
        missing_ratio_data3 = detected_missing_description.select(pl.concat_list(pl.all()).alias(f"symbol_id_{symbol_id}")).explode(f"symbol_id_{symbol_id}")
        missing_description_result3 = missing_description_result3.with_columns([missing_ratio_data3[f"symbol_id_{symbol_id}"].alias(f"symbol_id_{symbol_id}")])
        density_category_data = density_category_description.select(pl.Series(density_category_description.to_dicts()[0].values()).alias(f"symbol_id_{symbol_id}"))
        partial_density_description_result = partial_density_description_result.with_columns([density_category_data[f"symbol_id_{symbol_id}"].alias(f"symbol_id_{symbol_id}")])
        entire_individual_correlation_data = pl.concat([individual_features_correlation_data, individual_responders_correlation_data], how="vertical")
        entire_individual_correlation_analyzing_result = entire_individual_correlation_analyzing_result.with_columns([entire_individual_correlation_data[f"symbol_id_{symbol_id}"].alias(f"symbol_id_{symbol_id}")])
        truncated_individual_correlation_data = pl.concat([partial_individual_features_correlation_data, partial_individual_responders_correlation_data], how="vertical")
        partial_individual_correlation_analyzing_result = partial_individual_correlation_analyzing_result.with_columns([truncated_individual_correlation_data[f"symbol_id_{symbol_id}"].alias(f"symbol_id_{symbol_id}")])
        partial_cumsum_correlation_data = pl.concat([partial_cumsum_features_correlation_data, partial_cumsum_responders_correlation_data], how="vertical")
        partial_cumsum_correlation_analyzing_result = partial_cumsum_correlation_analyzing_result.with_columns([partial_cumsum_correlation_data[f"symbol_id_{symbol_id}"].alias(f"symbol_id_{symbol_id}")])

        print(f"交易品种-{symbol_id}-数据探索完成")
        del current_training_data
        del truncated_training_data
        del constructed_training_data

    # 存储数据分析结果
    with pd.ExcelWriter(path=os.path.join(global_params["result_dir"], "数据分析结果.xlsx"), engine="openpyxl") as writer:
        truncated_dates_result.to_pandas().to_excel(writer, sheet_name="截断日期区间", index=False)
        missing_description_result1.to_pandas().to_excel(writer, sheet_name="原始数据缺失率", index=False)
        missing_description_result2.to_pandas().to_excel(writer, sheet_name="截断对齐数据缺失率", index=False)
        missing_description_result3.to_pandas().to_excel(writer, sheet_name="异常标记数据缺失率", index=False)
        partial_density_description_result.to_pandas().to_excel(writer, sheet_name="截断对齐数据密度分类", index=False)
        entire_individual_correlation_analyzing_result.to_pandas().to_excel(writer, sheet_name="原始独立特征相关性", index=False)
        partial_individual_correlation_analyzing_result.to_pandas().to_excel(writer, sheet_name="截断独立特征相关性", index=False)
        partial_cumsum_correlation_analyzing_result.to_pandas().to_excel(writer, sheet_name="截断累积特征相关性", index=False)

In [None]:
# 初始化检验配置
init_checking()
data_file_gathering()
# training_data_exploring()

In [None]:
# 初始化分析结果变量
factor_name_setting = False
truncated_dates_result = pl.DataFrame({"factor_name": ["start_date", "end_date"]})
missing_description_result1, missing_description_result2, missing_description_result3, partial_density_description_result = [pl.DataFrame() for _ in range(4)]
entire_individual_correlation_analyzing_result, partial_individual_correlation_analyzing_result, partial_cumsum_correlation_analyzing_result = [pl.DataFrame() for _ in range(3)]

# 遍历交易品种进行数据探索
symbol_id = 18
print(f"交易品种-{symbol_id}-数据探索开始")
# 抽取原始数据
current_training_data = data_extracting(symbol_id=symbol_id)

# 设置分析结果矩阵的因子名称
if not factor_name_setting:
    entire_factor_name = pl.DataFrame({"factor_name": current_training_data.columns})
    missing_description_result1 = missing_description_result1.with_columns(entire_factor_name.select("factor_name"))
    missing_description_result2 = missing_description_result2.with_columns(entire_factor_name.select("factor_name"))
    missing_description_result3 = missing_description_result3.with_columns(entire_factor_name.select("factor_name"))
    partial_density_description_result = partial_density_description_result.with_columns(entire_factor_name.select("factor_name"))

    # 定义特征因子名称
    factor_name = [[f"{col_name}", f"{col_name}_cumsum", f"{col_name}_time_id_lagging_1", f"{col_name}_date_id_lagging_1"] for col_name in current_training_data.columns if any([col_name.startswith("feature_"), col_name.startswith("responder_")])]
    factor_name = sorted([col_name for sub_partial_factor in factor_name for col_name in sub_partial_factor])

    # 定义全局独立特征响应变量名称
    partial_factor_name1 = [col_name for col_name in factor_name if all([col_name.startswith("feature_") or (col_name.startswith("responder_") and col_name != global_params["predict_responder"]), "cumsum" not in col_name, "time_id_lagging_1" not in col_name, "date_id_lagging_1" not in col_name])]
    correlation_factor_name = pl.DataFrame({"factor_name": partial_factor_name1})
    entire_individual_correlation_analyzing_result = entire_individual_correlation_analyzing_result.with_columns(correlation_factor_name.select("factor_name"))

    # 定义截断独立特征响应变量名称
    partial_factor_name2 = [col_name for col_name in factor_name if "cumsum" not in col_name and col_name != global_params["predict_responder"]]
    correlation_factor_name = pl.DataFrame({"factor_name": partial_factor_name2})
    partial_individual_correlation_analyzing_result = partial_individual_correlation_analyzing_result.with_columns(correlation_factor_name.select("factor_name"))

    # 定义截断累积特征响应变量名称
    partial_factor_name3 = [col_name for col_name in factor_name if "cumsum" in col_name]
    correlation_factor_name = pl.DataFrame({"factor_name": partial_factor_name3})
    partial_cumsum_correlation_analyzing_result = partial_cumsum_correlation_analyzing_result.with_columns(correlation_factor_name.select("factor_name"))

    factor_name_setting = True

In [None]:
# 源数据探索—统计描述
current_training_data_description = current_training_data.describe()
current_training_data_description

In [None]:
# 源数据探索—缺失率计算
missing_description = missing_ratio_computing(source_data=current_training_data, missing_phase=1, symbol_id=symbol_id).fill_nan(0).fill_null(0)
missing_description

In [None]:
# 源数据探索—指标关联分析
responders_correlation_matrix = responder_correlation_analyzing(sub_data=current_training_data, correlation_phase=1, is_sum=False, symbol_id=symbol_id).fill_nan(0)
individual_responders_correlation_data = responders_correlation_matrix.select(pl.col(global_params["predict_responder"]).alias(f"symbol_id_{symbol_id}"))[:-1]
responders_correlation_matrix

In [None]:
# 源数据探索—特征关联分析
features_correlation_matrix = feature_correlation_analyzing(sub_data=current_training_data, correlation_phase=1, is_sum=False, symbol_id=symbol_id).fill_nan(0)
individual_features_correlation_data = features_correlation_matrix.select(pl.col(global_params["predict_responder"]).alias(f"symbol_id_{symbol_id}"))[:-1]
features_correlation_matrix

In [None]:
# 源数据预处理—截断对齐
truncated_training_data = data_truncating_aligning(source_data=current_training_data)
truncated_training_data

In [None]:
# 源数据预处理—缺失率计算
truncated_missing_description = missing_ratio_computing(source_data=truncated_training_data, missing_phase=2, symbol_id=symbol_id)
truncated_missing_description

In [None]:
# 源数据预处理—分布划分
density_category_description = data_category_recognizing(source_data=truncated_training_data)
density_category_description

In [None]:
# 源数据预处理—异常标记
detected_training_data = data_detecting(source_data=truncated_training_data, density_category=density_category_description)
detected_training_data

In [None]:
# 源数据预处理—缺失率计算
detected_missing_description = missing_ratio_computing(source_data=detected_training_data, missing_phase=3, symbol_id=symbol_id)
detected_missing_description

In [None]:
# 源数据预处理—缺失填补
filled_training_data = data_filling(source_data=detected_training_data, missing_ratio_data=detected_missing_description, is_predict_phase=False)
filled_training_data

In [None]:
# 源数据预处理—统计组合
density_category_description = statistic_info_combining(source_data=filled_training_data, density_category_description=density_category_description)
density_category_description

In [None]:
# 源数据预处理—特征构造
constructed_training_data = feature_constructing(source_data=filled_training_data).drop_nans().drop_nulls()
constructed_training_data.write_parquet(os.path.join(global_params["result_dir"], f"交易品种-{symbol_id}-的数据特征.parquet"))
constructed_training_data

In [None]:
# 源数据探索—独立指标关联分析
truncated_individual_responders_correlation_matrix = responder_correlation_analyzing(sub_data=constructed_training_data, correlation_phase=2, is_sum=False, symbol_id=symbol_id).fill_nan(0)
partial_individual_responders_correlation_data = truncated_individual_responders_correlation_matrix.select(pl.col(global_params["predict_responder"]).alias(f"symbol_id_{symbol_id}"))[:-1]
truncated_individual_responders_correlation_matrix

In [None]:
# 源数据探索—累积指标关联分析
truncated_cumsum_responders_correlation_matrix = responder_correlation_analyzing(sub_data=constructed_training_data, correlation_phase=2, is_sum=True, symbol_id=symbol_id).fill_nan(0)
partial_cumsum_responders_correlation_data = truncated_cumsum_responders_correlation_matrix.select(pl.col(global_params["predict_responder"]).alias(f"symbol_id_{symbol_id}"))[:-1]
truncated_cumsum_responders_correlation_matrix

In [None]:
# 源数据探索—独立特征关联分析
truncated_individual_features_correlation_matrix = feature_correlation_analyzing(sub_data=constructed_training_data, correlation_phase=2, is_sum=False, symbol_id=symbol_id).fill_nan(0)
partial_individual_features_correlation_data = truncated_individual_features_correlation_matrix.select(pl.col(global_params["predict_responder"]).alias(f"symbol_id_{symbol_id}"))[:-1]
truncated_individual_features_correlation_matrix

In [None]:
# 源数据探索—累积特征关联分析
truncated_cumsum_features_correlation_matrix = feature_correlation_analyzing(sub_data=constructed_training_data, correlation_phase=2, is_sum=True, symbol_id=symbol_id).fill_nan(0)
partial_cumsum_features_correlation_data = truncated_cumsum_features_correlation_matrix.select(pl.col(global_params["predict_responder"]).alias(f"symbol_id_{symbol_id}"))[:-1]
truncated_cumsum_features_correlation_matrix

In [None]:
# 记录数据分析结果
truncated_dates_result = truncated_dates_result.with_columns(pl.Series([constructed_training_data["date_id"].min(), constructed_training_data["date_id"].max()]).alias(f"symbol_id_{symbol_id}"))

missing_ratio_data1 = missing_description.select(pl.concat_list(pl.all()).alias(f"symbol_id_{symbol_id}")).explode(f"symbol_id_{symbol_id}")
missing_description_result1 = missing_description_result1.with_columns([missing_ratio_data1[f"symbol_id_{symbol_id}"].alias(f"symbol_id_{symbol_id}")])

missing_ratio_data2 = truncated_missing_description.select(pl.concat_list(pl.all()).alias(f"symbol_id_{symbol_id}")).explode(f"symbol_id_{symbol_id}")
missing_description_result2 = missing_description_result2.with_columns([missing_ratio_data2[f"symbol_id_{symbol_id}"].alias(f"symbol_id_{symbol_id}")])

missing_ratio_data3 = detected_missing_description.select(pl.concat_list(pl.all()).alias(f"symbol_id_{symbol_id}")).explode(f"symbol_id_{symbol_id}")
missing_description_result3 = missing_description_result3.with_columns([missing_ratio_data3[f"symbol_id_{symbol_id}"].alias(f"symbol_id_{symbol_id}")])

density_category_data = density_category_description.select(pl.Series(density_category_description.to_dicts()[0].values()).alias(f"symbol_id_{symbol_id}"))
partial_density_description_result = partial_density_description_result.with_columns([density_category_data[f"symbol_id_{symbol_id}"].alias(f"symbol_id_{symbol_id}")])

entire_individual_correlation_data = pl.concat([individual_features_correlation_data, individual_responders_correlation_data], how="vertical")
entire_individual_correlation_analyzing_result = entire_individual_correlation_analyzing_result.with_columns([entire_individual_correlation_data[f"symbol_id_{symbol_id}"].alias(f"symbol_id_{symbol_id}")])

truncated_individual_correlation_data = pl.concat([partial_individual_features_correlation_data, partial_individual_responders_correlation_data], how="vertical")
partial_individual_correlation_analyzing_result = partial_individual_correlation_analyzing_result.with_columns([truncated_individual_correlation_data[f"symbol_id_{symbol_id}"].alias(f"symbol_id_{symbol_id}")])

partial_cumsum_correlation_data = pl.concat([partial_cumsum_features_correlation_data, partial_cumsum_responders_correlation_data], how="vertical")
partial_cumsum_correlation_analyzing_result = partial_cumsum_correlation_analyzing_result.with_columns([partial_cumsum_correlation_data[f"symbol_id_{symbol_id}"].alias(f"symbol_id_{symbol_id}")])

print(f"交易品种-{symbol_id}-数据探索完成")

In [None]:
# 存储数据分析结果
with pd.ExcelWriter(path=os.path.join([global_params["result_dir"], global_params["data_result_file1"]]), engine="openpyxl") as writer:
    truncated_dates_result.to_pandas().to_excel(writer, sheet_name="截断日期区间", index=False)
    missing_description_result1.to_pandas().to_excel(writer, sheet_name="原始数据缺失率", index=False)
    missing_description_result2.to_pandas().to_excel(writer, sheet_name="截断对齐数据缺失率", index=False)
    missing_description_result3.to_pandas().to_excel(writer, sheet_name="异常标记数据缺失率", index=False)
    partial_density_description_result.to_pandas().to_excel(writer, sheet_name="截断对齐数据统计信息", index=False)
    entire_individual_correlation_analyzing_result.to_pandas().to_excel(writer, sheet_name="原始独立特征相关性", index=False)
    partial_individual_correlation_analyzing_result.to_pandas().to_excel(writer, sheet_name="截断独立特征相关性", index=False)
    partial_cumsum_correlation_analyzing_result.to_pandas().to_excel(writer, sheet_name="截断累积特征相关性", index=False)