In [1]:
# import os

# os.environ['http_proxy'] = 'http://127.0.0.1:11000'
# os.environ['https_proxy'] = 'http://127.0.0.1:11000'

In [2]:
import pandas as pd   # == 2.2.3
import numpy as np    # == 2.2.6
import torch 
import scipy.stats
import statsmodels.tsa.api as tsa 
from statsmodels.tsa.ar_model import AutoReg
import antropy 
import sklearn
from tsfresh.feature_extraction import feature_calculators as tsfresh_fe
import ruptures as rpt

import lightgbm as lgb  # == 4.6.0
import catboost as cat
import xgboost as xgb
from sklearn.metrics import roc_auc_score
from sklearn.model_selection import StratifiedKFold

import os
import re
import sys
import json
import time
import logging
import inspect
import typing
import joblib
from itertools import combinations
from pathlib import Path
from tqdm.auto import tqdm
from datetime import datetime
from joblib import Parallel, delayed
from typing import List, Dict, Tuple, Optional
import warnings
from statsmodels.tools.sm_exceptions import InterpolationWarning

  from .autonotebook import tqdm as notebook_tqdm


In [3]:
import crunch

# Load the Crunch Toolings
crunch = crunch.load_notebook()

loaded inline runner with module: <module '__main__'>

cli version: 6.6.1
available ram: 15.73 gb
available cpu: 16 core
----


In [4]:
# @crunch/keep:on
warnings.filterwarnings("ignore", category=UserWarning)
warnings.simplefilter("ignore", InterpolationWarning)
warnings.simplefilter("ignore", FutureWarning)
warnings.simplefilter('ignore', np.exceptions.RankWarning)

In [5]:
# @crunch/keep:on
def get_logger(name: str, log_dir: Path, verbose: bool = True):
    """
    获取一个配置好的 logger 实例，它会生成带时间戳的详细日志。
    """
    # 确保日志目录存在
    log_dir.mkdir(exist_ok=True, parents=True)
    
    # 1. 创建带时间戳的详细日志文件名
    timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
    detail_log_file = log_dir / f'{name.lower()}_{timestamp}.log'

    # 2. 为 logger 设置一个唯一的名称（基于时间戳），避免冲突
    logger = logging.getLogger(f"{name}-{timestamp}")
    logger.setLevel(logging.INFO)

    # 防止将日志消息传播到根 logger
    logger.propagate = False

    # 如果已经有处理器，则不重复添加
    if logger.hasHandlers():
        logger.handlers.clear()

    # 3. 创建详细日志的文件处理器
    detail_handler = logging.FileHandler(detail_log_file, mode='a', encoding='utf-8')
    detail_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
    detail_handler.setFormatter(detail_formatter)
    logger.addHandler(detail_handler)
    
    # 4. 创建控制台处理器
    # 控制台 - INFO级别 (受verbose控制)
    if verbose:
        info_handler = logging.StreamHandler(sys.stdout)
        info_handler.setLevel(logging.INFO)
        info_handler.addFilter(lambda record: record.levelno == logging.INFO)
        info_formatter = logging.Formatter('%(message)s')
        info_handler.setFormatter(info_formatter)
        logger.addHandler(info_handler)

    # 控制台 - WARNING及以上 (始终输出)
    warn_handler = logging.StreamHandler(sys.stdout)
    warn_handler.setLevel(logging.WARNING)
    warn_formatter = logging.Formatter('%(levelname)s: %(message)s')
    warn_handler.setFormatter(warn_formatter)
    logger.addHandler(warn_handler)

    return logger, detail_log_file # 返回 logger 和日志文件路径 

logger = None
log_file_path = None

In [None]:
# @crunch/keep:on
class Config:
    # --- Feature Engineer ---
    N_JOBS = -1
    SEED = 42

    # --- Data Enhancement ---
    # 数据增强配置，指定要加载的增强数据ID列表
    # 如果为'0'，则只使用原始数据
    ENHANCEMENT_IDS = ["0"] 

    # --- Model ---
    TRAIN_STRATEGY = 'cv'   # 'cv' or 'multi'
    MODEL = ['LGB', 'CAT', 'XGB']  # ['LGB']
    LGBM_PARAMS = {
        # --- 基础设定 ---
        'objective': 'binary',
        'metric': 'auc',
        'boosting_type': 'gbdt',
        'n_estimators': 3600, 
        'learning_rate': 0.005,
        'num_leaves': 29,
        'random_state': 42,
        'n_jobs': N_JOBS,
        'verbosity': 0, 

        # --- 正则化和采样 ---
        'reg_alpha': 3,            # L1 正则化
        'reg_lambda': 3,           # L2 正则化
        # 'min_child_samples': 50,   # 叶子节点样本量
        'colsample_bytree': 0.8,   # 构建树时对特征的列采样率
        'subsample': 0.8,          # 训练样本的采样率
    }
    CAT_PARAMS = {
        # --- 基础设定 ---
        'bootstrap_type': 'Bernoulli',
        'loss_function': 'Logloss',
        'eval_metric': 'AUC',
        'grow_policy': 'Lossguide',
        # 'task_type': 'GPU',
        'iterations': 3600, 
        'learning_rate': 0.01,
        'num_leaves': 29,
        'random_seed': 114514,
        'thread_count': N_JOBS,
        
        # --- 正则化和采样 ---
        'subsample': 0.8,
        # 'rsm': 0.7,
        'l2_leaf_reg': 9,
    }
    XGB_PARAMS = {
        # --- 基础设定 ---
        'objective': 'binary:logistic',
        'eval_metric': 'auc',
        # 'device': 'cuda', 
        'n_estimators': 3600,
        'learning_rate': 0.0075,
        'max_leaves': 29,
        'random_state': 2025,
        'n_jobs': N_JOBS,
        'verbosity': 0, 
        
        # --- 正则化和采样 ---
        'reg_alpha': 3,          # L1 正则化
        'reg_lambda': 3,         # L2 正则化
        'colsample_bytree': 0.8, # 构建树时对特征的列采样率
        'subsample': 0.8,        # 训练样本的采样率
    }

    # --- Early Stopping ---
    # 设置为 >0 启用早停；设置为 0 禁用早停
    EARLY_STOPPING_ROUNDS = 0

    # --- CV ---
    CV_PARAMS = [{
        'n_splits': 5,
        'shuffle': True,
        'random_state': 42
    }, {
        'n_splits': 5,
        'shuffle': True,
        'random_state': 114514
    }, {
        'n_splits': 5,
        'shuffle': True,
        'random_state': 2025
    } ]

    # --- Exclude Features ---
    # 在这里定义不希望在 "一键生成所有特征" 时运行的函数名称
    # 如果要运行这些特征，需要在命令行中通过 --funcs 参数明确指定
    # 例如: python -m experiment.main gen-feats --funcs ar_model_features
    EXPERIMENTAL_FEATURES = [
    ] 

    # --- Top Features ---
    TOP_FEATURES = [
    ]

    # --- Interaction Operators ---
    OPERATOR_FLAGS = [
        'mul', 'sqmul', 'sub', 'add', 'div', 'sq'
    ]
    FEAT_FLAGS = [
        '_left', '_right', '_whole', '_diff', '_ratio', '_contribution_left', '_contribution_right', '_ratio_to_whole_left', '_ratio_to_whole_right'
    ]

    # --- Remain Features ---
    REMAIN_FEATURES = [
        'sqmul_RAW_1_stats_mean_whole_RAW_3_detrend_volatility_normalized_whole',
        'mul_RAW_3_detrend_volatility_normalized_whole_RAW_1_stats_std_whole',
        'mul_RAW_1_stats_cv_whole_RAW_1_stats_std_whole',
        'sub_RAW_8_ratio_value_number_to_time_series_length_ratio_to_whole_right_CUMSUM_4_autocorr_lag1_whole',
        'div_RAW_8_energy_ratio_by_chunks_num_segments_10_segment_focus_9_left_RAW_8_agg_linear_trend_attr_intercept_chunk_len_10_f_agg_max_ratio_to_whole_right',
        'div_RAW_2_ad_stat_RAW_8_percentage_of_reoccurring_values_to_all_values_contribution_left',
        'div_RAW_8_percentage_of_reoccurring_datapoints_to_all_datapoints_whole_RAW_8_percentage_of_reoccurring_values_to_all_values_diff',
        'add_RAW_8_index_mass_quantile_q_0_8_ratio_to_whole_left_RAW_8_index_mass_quantile_q_0_1_right',
        'mul_CUMSUM_2_ad_pvalue_DIFF_8_change_quantiles_f_agg_var_isabs_True_qh_0_4_ql_0_2_whole',
        'div_RAW_8_ratio_value_number_to_time_series_length_whole_CUMSUM_2_ad_pvalue',
        'div_RAW_1_stats_median_right_CUMSUM_1_stats_max_diff',
        'mul_RAW_2_bartlett_stat_DIFF_2_jb_pvalue_left',
        'sqmul_CUMSUM_1_stats_max_ratio_to_whole_left_RAW_8_percentage_of_reoccurring_values_to_all_values_ratio_to_whole_left',
        'mul_RAW_8_ratio_beyond_r_sigma_3_left_CUMSUM_1_stats_theil_sen_slope_whole',
        'mul_RAW_8_ratio_beyond_r_sigma_3_left_CUMSUM_1_stats_max_diff',
        'div_RAW_8_agg_linear_trend_attr_intercept_chunk_len_10_f_agg_max_ratio_to_whole_right_RAW_8_energy_ratio_by_chunks_num_segments_10_segment_focus_9_left',
        'div_RAW_7_sample_entropy_left_DIFF_2_jb_pvalue_left',
        'mul_RAW_8_agg_linear_trend_attr_slope_chunk_len_10_f_agg_mean_contribution_left_CUMSUM_1_stats_max_diff',
        'sqmul_RAW_8_index_mass_quantile_q_0_1_right_RAW_8_quantile_0_4_contribution_left',
        'sub_RAW_8_percentage_of_reoccurring_values_to_all_values_contribution_left_RAW_7_sample_entropy_left',
        'add_RAW_7_sample_entropy_left_CUMSUM_8_first_location_of_maximum_whole',
        'sub_DIFF_7_hjorth_complexity_contribution_right_RAW_7_higuchi_fd_contribution_right',
        'div_RAW_1_stats_median_whole_RAW_8_percentage_of_reoccurring_values_to_all_values_contribution_left',
        'div_RAW_8_percentage_of_reoccurring_values_to_all_values_contribution_left_RAW_8_percentage_of_reoccurring_datapoints_to_all_datapoints_whole',
        'div_RAW_8_percentage_of_reoccurring_values_to_all_values_contribution_left_RAW_8_percentage_of_reoccurring_values_to_all_values_ratio_to_whole_left',
        'sqmul_RAW_8_energy_ratio_by_chunks_num_segments_10_segment_focus_9_left_RAW_8_index_mass_quantile_q_0_1_right',
        'div_RAW_8_benford_correlation_whole_CUMSUM_2_ad_pvalue',
        'div_CUMSUM_8_ar_coefficient_coeff_2_k_10_ratio_RAW_1_stats_kurt_left',
        'sqmul_RAW_7_sample_entropy_left_RAW_8_count_above_0_whole',
        'mul_CUMSUM_2_wilcoxon_stat_RAW_8_change_quantiles_f_agg_var_isabs_True_qh_0_6_ql_0_4_ratio_to_whole_right',
        'add_RAW_8_ratio_beyond_r_sigma_3_left_DIFF_7_hjorth_complexity_ratio_to_whole_right',
        'sqmul_RAW_8_index_mass_quantile_q_0_1_right_RAW_8_index_mass_quantile_q_0_8_ratio_to_whole_left',
        'add_RAW_8_index_mass_quantile_q_0_1_right_RAW_8_index_mass_quantile_q_0_8_left',
        'sqmul_RAW_8_linear_trend_attr_pvalue_ratio_to_whole_left_CUMSUM_8_change_quantiles_f_agg_var_isabs_False_qh_0_8_ql_0_0_ratio_to_whole_right',
        'add_RAW_8_index_mass_quantile_q_0_1_right_RAW_8_energy_ratio_by_chunks_num_segments_10_segment_focus_9_left',
        'div_DIFF_8_change_quantiles_f_agg_var_isabs_False_qh_0_6_ql_0_4_whole_DIFF_8_change_quantiles_f_agg_var_isabs_True_qh_0_4_ql_0_2_whole',
        'div_RAW_8_change_quantiles_f_agg_var_isabs_False_qh_0_8_ql_0_2_diff_CUMSUM_9_ar_residuals_s1_pred_mean',
        'div_RAW_1_stats_min_whole_CUMSUM_8_friedrich_coefficients_coeff_3_m_3_r_30_ratio_to_whole_left',
        'sub_CUMSUM_8_change_quantiles_f_agg_var_isabs_False_qh_0_8_ql_0_0_whole_CUMSUM_2_ks_pvalue',
        'add_DIFF_2_bartlett_pvalue_DIFF_8_ar_coefficient_coeff_2_k_10_left',
        'sub_RAW_1_stats_std_whole_RAW_8_ratio_value_number_to_time_series_length_whole',
        'mul_RAW_8_index_mass_quantile_q_0_8_ratio_to_whole_left_RAW_8_ratio_value_number_to_time_series_length_ratio_to_whole_left',
        'div_RAW_3_detrend_volatility_normalized_whole_RAW_8_percentage_of_reoccurring_values_to_all_values_contribution_left',
        'sqmul_RAW_2_ks_stat_RAW_8_percentage_of_reoccurring_values_to_all_values_ratio_to_whole_left',
        'mul_RAW_8_index_mass_quantile_q_0_1_right_RAW_10_rpt_cost_cosine_whole',
        'mul_RAW_10_rpt_cost_cosine_whole_RAW_1_stats_median_ratio',
        'div_RAW_2_bartlett_stat_CUMSUM_7_cond_entropy_whole',
        'div_RAW_1_stats_mean_whole_RAW_8_ratio_beyond_r_sigma_3_left',
        'div_RAW_8_change_quantiles_f_agg_var_isabs_True_qh_0_6_ql_0_2_whole_RAW_1_stats_min_whole',
        'CUMSUM_8_benford_correlation_left',
        'sqmul_CUMSUM_3_detrend_volatility_normalized_right_RAW_8_quantile_0_4_right',
        'div_RAW_8_percentage_of_reoccurring_values_to_all_values_contribution_left_CUMSUM_5_dominant_freq_ratio_to_whole_right',
        'DIFF_7_spectral_entropy_ratio_to_whole_left',
        'div_RAW_7_approx_entropy_left_CUMSUM_2_ad_pvalue',
        'div_RAW_7_sample_entropy_left_RAW_8_ratio_beyond_r_sigma_1_left',
        'div_RAW_7_sample_entropy_whole_RAW_8_ratio_beyond_r_sigma_1_5_whole',
        'sqmul_RAW_8_linear_trend_attr_pvalue_ratio_to_whole_left_DIFF_8_change_quantiles_f_agg_var_isabs_False_qh_1_0_ql_0_2_ratio',
        'mul_RAW_8_change_quantiles_f_agg_var_isabs_False_qh_0_6_ql_0_4_ratio_to_whole_right_CUMSUM_2_wilcoxon_stat',
        'sub_RAW_8_index_mass_quantile_q_0_8_ratio_to_whole_left_RAW_8_ratio_beyond_r_sigma_1_left',
        'mul_RAW_8_ratio_value_number_to_time_series_length_ratio_RAW_8_ratio_value_number_to_time_series_length_whole',
        'div_CUMSUM_1_stats_theil_sen_slope_whole_RAW_8_percentage_of_reoccurring_values_to_all_values_contribution_left',
        'mul_CUMSUM_2_ad_pvalue_RAW_8_change_quantiles_f_agg_var_isabs_False_qh_0_8_ql_0_0_right',
        'mul_CUMSUM_3_detrend_volatility_normalized_right_RAW_8_change_quantiles_f_agg_var_isabs_True_qh_0_6_ql_0_2_whole',
        'div_RAW_8_change_quantiles_f_agg_var_isabs_False_qh_0_6_ql_0_4_left_RAW_8_percentage_of_reoccurring_values_to_all_values_contribution_left',
        'div_RAW_8_energy_ratio_by_chunks_num_segments_10_segment_focus_9_left_DIFF_8_change_quantiles_f_agg_var_isabs_False_qh_1_0_ql_0_2_ratio',
        'div_DIFF_2_levene_pvalue_RAW_1_stats_median_right',
        'mul_RAW_8_ratio_value_number_to_time_series_length_diff_CUMSUM_3_trend_normalized_slope_whole',
        'sub_RAW_8_energy_ratio_by_chunks_num_segments_10_segment_focus_9_left_RAW_8_quantile_0_1_contribution_right',
        'sqmul_RAW_8_agg_linear_trend_attr_intercept_chunk_len_10_f_agg_max_ratio_to_whole_right_RAW_8_ratio_beyond_r_sigma_1_5_whole',
        'div_RAW_2_ks_stat_RAW_8_change_quantiles_f_agg_var_isabs_False_qh_0_8_ql_0_0_right',
        'add_RAW_7_sample_entropy_left_DIFF_2_bartlett_pvalue',
        'sqmul_RAW_8_quantile_0_4_whole_RAW_7_katz_fd_whole',
        'mul_RAW_8_agg_linear_trend_attr_slope_chunk_len_10_f_agg_mean_contribution_left_DIFF_2_jb_pvalue_left',
        'add_RAW_7_sample_entropy_left_RAW_2_bartlett_pvalue',
        'sqmul_RAW_8_ratio_value_number_to_time_series_length_ratio_to_whole_right_RAW_8_ratio_value_number_to_time_series_length_ratio_to_whole_left',
        'sqmul_RAW_2_shapiro_pvalue_whole_RAW_2_bartlett_stat',
        'sqmul_RAW_8_change_quantiles_f_agg_var_isabs_True_qh_0_6_ql_0_4_ratio_to_whole_right_CUMSUM_7_hjorth_complexity_whole',
        'sub_RAW_8_percentage_of_reoccurring_values_to_all_values_contribution_left_DIFF_7_sample_entropy_right',
        'sqmul_CUMSUM_7_cond_entropy_whole_RAW_1_stats_kurt_left',
        'add_RAW_8_count_above_0_whole_CUMSUM_8_change_quantiles_f_agg_mean_isabs_True_qh_1_0_ql_0_4_left',
        'mul_DIFF_2_levene_pvalue_CUMSUM_3_detrend_volatility_normalized_right',
        'div_RAW_1_stats_kurt_left_RAW_1_stats_std_whole',
        'add_RAW_8_index_mass_quantile_q_0_1_right_RAW_8_ratio_value_number_to_time_series_length_whole',
        'div_RAW_8_quantile_0_4_right_RAW_8_change_quantiles_f_agg_var_isabs_False_qh_0_8_ql_0_4_whole',
        'mul_RAW_8_energy_ratio_by_chunks_num_segments_10_segment_focus_9_left_RAW_8_quantile_0_4_contribution_left',
        'DIFF_8_change_quantiles_f_agg_var_isabs_False_qh_0_8_ql_0_2_ratio_to_whole_left',
        'sqmul_RAW_8_ratio_value_number_to_time_series_length_ratio_to_whole_left_DIFF_8_benford_correlation_whole',
        'div_RAW_8_agg_linear_trend_attr_intercept_chunk_len_10_f_agg_max_ratio_to_whole_right_RAW_8_agg_linear_trend_attr_slope_chunk_len_10_f_agg_mean_contribution_left',
        'mul_RAW_8_benford_correlation_whole_RAW_8_count_above_0_whole',
        'div_RAW_8_ratio_value_number_to_time_series_length_whole_RAW_7_katz_fd_whole',
        'sub_RAW_8_percentage_of_reoccurring_values_to_all_values_ratio_to_whole_left_RAW_8_percentage_of_reoccurring_datapoints_to_all_datapoints_ratio_to_whole_left',
        'RAW_8_ratio_beyond_r_sigma_0_5_contribution_left',
        'sqmul_RAW_8_change_quantiles_f_agg_var_isabs_True_qh_0_6_ql_0_4_contribution_left_CUMSUM_7_katz_fd_whole',
        'sqmul_RAW_8_friedrich_coefficients_coeff_3_m_3_r_30_ratio_CUMSUM_8_friedrich_coefficients_coeff_3_m_3_r_30_ratio_to_whole_left',
        'sub_RAW_1_stats_min_whole_RAW_7_approx_entropy_whole',
        'mul_RAW_8_index_mass_quantile_q_0_1_right_RAW_8_energy_ratio_by_chunks_num_segments_10_segment_focus_9_left',
        'div_RAW_2_bartlett_pvalue_CUMSUM_2_kpss_pvalue_ratio_to_whole_left',
        'add_DIFF_2_bartlett_pvalue_RAW_7_sample_entropy_whole',
        'sub_RAW_8_ratio_value_number_to_time_series_length_ratio_to_whole_right_RAW_2_shapiro_pvalue_whole',
        'CUMSUM_2_adf_pvalue_ratio_to_whole_right',
        'div_DIFF_2_bartlett_pvalue_RAW_8_agg_linear_trend_attr_rvalue_chunk_len_50_f_agg_max_ratio_to_whole_left',
        'div_RAW_7_sample_entropy_left_CUMSUM_2_ad_pvalue',
        'div_CUMSUM_2_adf_stat_diff_RAW_8_friedrich_coefficients_coeff_3_m_3_r_30_ratio',
        'div_CUMSUM_3_detrend_volatility_normalized_right_CUMSUM_1_stats_range_right',
        'div_RAW_8_percentage_of_reoccurring_values_to_all_values_ratio_to_whole_left_RAW_8_percentage_of_reoccurring_values_to_all_values_contribution_left',
        'add_DIFF_7_hjorth_complexity_contribution_right_RAW_1_stats_mean_right',
        'div_RAW_1_stats_median_right_RAW_8_percentage_of_reoccurring_values_to_all_values_ratio',
    ]

config = Config()

In [7]:
# @crunch/keep:on
# --- 时序变换函数注册表 ---
TRANSFORM_REGISTRY = {}

def register_transform(_func=None, *, output_mode_names=[]):
    """一个用于注册时序变换函数的装饰器。"""
    def decorator_register(func):
        TRANSFORM_REGISTRY[func.__name__] = {
            "func": func, 
            "output_mode_names": output_mode_names
        }
        return func

    if _func is None:
        # Used as @register_transform(output_mode_names=...)
        return decorator_register
    else:
        # Used as @register_transform
        return decorator_register(_func)

@register_transform(output_mode_names=['RAW'])
def no_transformation(X_df: pd.DataFrame) -> List[pd.DataFrame]:
    """
    原始时序
    """
    result_dfs = []
    result_dfs.append(X_df)

    return result_dfs

@register_transform(output_mode_names=['CUMSUM'])
def cumsum_transformation(X_df: pd.DataFrame) -> List[pd.DataFrame]:
    """
    累计和变换
    Args:
        X_df: 输入数据框，包含MultiIndex (id, time) 和 columns ['value', 'period']
    Returns:
        List[pd.DataFrame]: 包含一个数据框的列表 [累计和值]
    """
    X_df_sorted = X_df.sort_index()
    result_dfs = []

    result_df = X_df_sorted.copy()
    result_df['value'] = np.nan
    
    for series_id in X_df_sorted.index.get_level_values('id').unique():
        series_data = X_df_sorted.loc[series_id]
        series_data = series_data.sort_index()
        values = series_data['value'].values
        
        cumsum_values = np.cumsum(values)
        result_df.loc[series_id, 'value'] = cumsum_values
    
    result_dfs.append(result_df)
    return result_dfs

@register_transform(output_mode_names=['DIFF'])
def diff_transformation(X_df: pd.DataFrame) -> List[pd.DataFrame]:
    """
    差分变换
    Args:
        X_df: 输入数据框，包含MultiIndex (id, time) 和 columns ['value', 'period']
    Returns:
        List[pd.DataFrame]: 包含一个数据框的列表 [差分值]
    """
    X_df_sorted = X_df.sort_index()
    result_dfs = []
    
    result_df = X_df_sorted.copy()
    result_df['value'] = np.nan
    
    for series_id in X_df_sorted.index.get_level_values('id').unique():
        series_data = X_df_sorted.loc[series_id]
        series_data = series_data.sort_index()
        values = series_data['value'].values
        
        diff_values = np.diff(values, prepend=0)  # 使用prepend=0使长度保持一致
        result_df.loc[series_id, 'value'] = diff_values
    
    result_dfs.append(result_df)
    return result_dfs

In [8]:
# @crunch/keep:on
# --- 特征函数注册表 ---
FEATURE_REGISTRY = {}

def register_feature(_func=None, *, parallelizable=True, func_id=""):
    """一个用于注册特征函数的装饰器，可以标记特征是否可并行化。"""
    def decorator_register(func):
        FEATURE_REGISTRY[func.__name__] = {
            "func": func, 
            "parallelizable": parallelizable,
            "func_id": func_id
        }
        return func

    if _func is None:
        # Used as @register_feature(parallelizable=...)
        return decorator_register
    else:
        # Used as @register_feature
        return decorator_register(_func)

def _add_diff_ratio_feats(feats: dict, name: str, left, right):
    """
    一个辅助函数，用于向特征字典中添加差异和比例特征。

    Args:
        feats (dict): 要更新的特征字典。
        name (str): 特征的基础名称 (例如, 'stats_mean')。
        left (float): 左侧分段的特征值。
        right (float): 右侧分段的特征值。
    """
    # check nan/None 
    if np.isnan(left) or np.isnan(right) or left is None or right is None:
        feats[f'{name}_diff'] = 0.0
        feats[f'{name}_ratio'] = 0.0
        return
    # 做差
    feats[f'{name}_diff'] = right - left
    # 做比
    feats[f'{name}_ratio'] = right / (left + 1e-6)

def _add_contribution_ratio_feats(feats: dict, name: str, left, right, whole):
    """
    一个辅助函数，用于向特征字典中添加贡献度和与整体的比例特征。

    Args:
        feats (dict): 要更新的特征字典。
        name (str): 特征的基础名称 (例如, 'stats_mean')。
        left (float): 左侧分段的特征值。
        right (float): 右侧分段的特征值。
        whole (float): 整个序列的特征值。
    """
    # check nan/None 
    if np.isnan(left) or np.isnan(right) or np.isnan(whole) or left is None or right is None or whole is None :
        feats[f'{name}_contribution_left'] = 0.0
        feats[f'{name}_contribution_right'] = 0.0
        feats[f'{name}_ratio_to_whole_left'] = 0.0
        feats[f'{name}_ratio_to_whole_right'] = 0.0
        return
    # 特征贡献度
    feats[f'{name}_contribution_left'] = left / (left + right + 1e-6)
    feats[f'{name}_contribution_right'] = right / (left + right + 1e-6)
    # 与整体特征的比例
    feats[f'{name}_ratio_to_whole_left'] = left / (whole + 1e-6)
    feats[f'{name}_ratio_to_whole_right'] = right / (whole + 1e-6)

# --- 1. 分布统计特征 ---
def safe_cv(s):
    s = pd.Series(s)
    m = s.mean()
    std = s.std()
    return std / m if abs(m) > 1e-6 else 0.0

def rolling_std_mean(s, window=50):
    s = pd.Series(s)
    if len(s) < window:
        return 0.0
    return s.rolling(window=window).std().dropna().mean()

def slope_theil_sen(s):
    s = pd.Series(s)
    if len(s) < 2:
        return 0.0
    try:
        slope, intercept, _, _ = scipy.stats.theilslopes(s.values, np.arange(len(s)))
        return slope
    except Exception:
        return 0.0

class STATSFeatureExtractor:
    def __init__(self, selected_features):
        # 所有可用的func类及其名称
        self.func_classes = {
            'mean': np.mean,
            'median': np.median,
            'max': np.max,
            'min': np.min,
            'range': lambda x: np.max(x) - np.min(x),
            'std': np.std,
            'skew': scipy.stats.skew,
            'kurt': scipy.stats.kurtosis,
            'cv': safe_cv,
            'mean_of_rolling_std': rolling_std_mean,
            'theil_sen_slope': slope_theil_sen,
        }
        self.selected_features = selected_features
    
    def fit(self, signal):
        self.signal = np.asarray(signal)
        self.n = len(signal)

    def calculate(self, func, start, end):
        result = func(self.signal[start:end])
        if isinstance(result, float) or isinstance(result, int):
            return result
        else:
            return result.item()

    def should_keep(self, name):
        if self.selected_features is None:
            return True
        keep = f'stats_{name}' in self.selected_features
        return keep

    def extract(self, signal, boundary):
        """
        输入：
            signal: 1D numpy array，单变量时间序列
            boundary: int，分割点
        输出：
            result: dict，格式为 {func_name: {'left': value, 'right': value}}
        """
        n = self.n
        result = {}
        for name, func in self.func_classes.items():
            if not self.should_keep(name):
                continue
            try:
                left = self.calculate(func, 0, boundary)
                right = self.calculate(func, boundary, n)
                whole = self.calculate(func, 0, n)
                # diff = right - left
                # ratio = right / (left + 1e-6)
            except Exception:
                left = None
                right = None
                whole = None
                # diff = None
                # ratio = None
            # Move to _add_diff_ratio_feats, 'diff': diff, 'ratio': ratio
            result[name] = {'left': left, 'right': right, 'whole': whole}   
        return result

@register_feature(func_id="1")
def distribution_stats_features(u: pd.DataFrame, selected_features: set = None) -> dict:
    """统计量的分段值、Diff值、Ratio值"""
    value = u['value'].values.astype(np.float32)
    period = u['period'].values.astype(np.float32)
    boundary = np.where(np.diff(period) != 0)[0].item()
    feats = {}

    extractor = STATSFeatureExtractor(selected_features)
    extractor.fit(value)
    features = extractor.extract(value, boundary)

    feats = {}
    for k, v in features.items():
        for seg, value in v.items():
            feats[f'stats_{k}_{seg}'] = value
        _add_diff_ratio_feats(feats, f'stats_{k}', v['left'], v['right'])
        _add_contribution_ratio_feats(feats, f'stats_{k}', v['left'], v['right'], v['whole'])

    return {k: float(v) if not np.isnan(v) else 0 for k, v in feats.items()}
    
# --- 2. 假设检验统计量特征 ---
@register_feature(func_id="2")
def test_stats_features_first(u: pd.DataFrame, selected_features: set = None) -> dict:
    s1 = u['value'][u['period'] == 0]
    s2 = u['value'][u['period'] == 1]
    s_whole = u['value']
    feats = {}

    def should_keep(candidates):
        if selected_features is None:
            return True
        keep = any(c in selected_features for c in candidates)
        return keep

    """假设检验统计量"""
    # KS检验
    if should_keep(['ks_stat', 'ks_pvalue']):
        ks_stat, ks_pvalue = scipy.stats.ks_2samp(s1, s2)
        feats['ks_stat'] = ks_stat
        feats['ks_pvalue'] = -ks_pvalue

    # T检验
    if should_keep(['ttest_stat', 'ttest_pvalue']):
        ttest_stat, ttest_pvalue = scipy.stats.ttest_ind(s1, s2, equal_var=False)
        feats['ttest_stat'] = ttest_stat
        feats['ttest_pvalue'] = -ttest_pvalue if not np.isnan(ttest_pvalue) else 1

    # AD检验
    if should_keep(['ad_stat', 'ad_pvalue']):
        ad_stat, _, ad_pvalue = scipy.stats.anderson_ksamp([s1.to_numpy(), s2.to_numpy()])
        feats['ad_stat'] = ad_stat
        feats['ad_pvalue'] = -ad_pvalue

    # Mann-Whitney U检验 (非参数，不假设分布)
    if should_keep(['mannwhitney_stat', 'mannwhitney_pvalue']):
        mw_stat, mw_pvalue = scipy.stats.mannwhitneyu(s1, s2, alternative='two-sided')
        feats['mannwhitney_stat'] = mw_stat if not np.isnan(mw_stat) else 0
        feats['mannwhitney_pvalue'] = -mw_pvalue if not np.isnan(mw_pvalue) else 1
        
    # Wilcoxon秩和检验
    if should_keep(['wilcoxon_stat', 'wilcoxon_pvalue']):
        w_stat, w_pvalue = scipy.stats.ranksums(s1, s2)
        feats['wilcoxon_stat'] = w_stat if not np.isnan(w_stat) else 0
        feats['wilcoxon_pvalue'] = -w_pvalue if not np.isnan(w_pvalue) else 1

    # Levene检验
    if should_keep(['levene_stat', 'levene_pvalue']):
        levene_stat, levene_pvalue = scipy.stats.levene(s1, s2)
        feats['levene_stat'] = levene_stat if not np.isnan(levene_stat) else 0
        feats['levene_pvalue'] = -levene_pvalue if not np.isnan(levene_pvalue) else 1
    
    # Bartlett检验
    if should_keep(['bartlett_stat', 'bartlett_pvalue']):
        bartlett_stat, bartlett_pvalue = scipy.stats.bartlett(s1, s2)
        feats['bartlett_stat'] = bartlett_stat if not np.isnan(bartlett_stat) else 0
        feats['bartlett_pvalue'] = -bartlett_pvalue if not np.isnan(bartlett_pvalue) else 1

    return {k: float(v) if not np.isnan(v) else 0 for k, v in feats.items()}

# --- 2. 假设检验统计量特征 ---
@register_feature(func_id="2")
def test_stats_features_second(u: pd.DataFrame, selected_features: set = None) -> dict:
    s1 = u['value'][u['period'] == 0]
    s2 = u['value'][u['period'] == 1]
    s_whole = u['value']
    feats = {}

    def should_keep(candidates):
        if selected_features is None:
            return True
        keep = any(c in selected_features for c in candidates)
        return keep
    
    # """分段假设检验的分段值、Diff值、Ratio值"""
    # Shapiro-Wilk检验
    if should_keep(['shapiro_stat', 'shapiro_pvalue']):
        sw1_stat, sw1_pvalue, sw2_stat, sw2_pvalue, sw_whole_stat, sw_whole_pvalue = (np.nan,)*6
        try:
            sw1_stat, sw1_pvalue = scipy.stats.shapiro(s1)
            sw2_stat, sw2_pvalue = scipy.stats.shapiro(s2)
            sw_whole_stat, sw_whole_pvalue = scipy.stats.shapiro(s_whole)
        except Exception as e:
            pass 
        feats['shapiro_pvalue_left'] = sw1_pvalue
        feats['shapiro_pvalue_right'] = sw2_pvalue
        feats['shapiro_pvalue_whole'] = sw_whole_pvalue
        # _add_diff_ratio_feats(feats, 'shapiro_pvalue', sw1_pvalue, sw2_pvalue)
        _add_contribution_ratio_feats(feats, 'shapiro_pvalue', sw1_pvalue, sw2_pvalue, sw_whole_pvalue)

    # Jarque-Bera检验差异
    if should_keep(['jb_stat', 'jb_pvalue']):
        jb1_stat, jb1_pvalue, jb2_stat, jb2_pvalue, jb_whole_stat, jb_whole_pvalue = (np.nan,)*6
        try:
            jb1_stat, jb1_pvalue = scipy.stats.jarque_bera(s1)
            jb2_stat, jb2_pvalue = scipy.stats.jarque_bera(s2)
            jb_whole_stat, jb_whole_pvalue = scipy.stats.jarque_bera(s_whole)
        except Exception as e:
            pass 
        feats['jb_pvalue_left'] = jb1_pvalue
        feats['jb_pvalue_right'] = jb2_pvalue
        # feats['jb_pvalue_whole'] = jb_whole_pvalue
        _add_diff_ratio_feats(feats, 'jb_pvalue', jb1_pvalue, jb2_pvalue)
        # _add_contribution_ratio_feats(feats, 'jb_pvalue', jb1_pvalue, jb2_pvalue, jb_whole_pvalue)

    # KPSS检验
    def extract_kpss_features(s):
        if len(s) <= 12:
            return {'p': 0.1, 'stat': 0.0, 'lag': 0, 'crit_5pct': 0.0, 'reject_5pct': 0}
        kpss = tsa.stattools.kpss(s, regression='c', nlags='auto')
        stat, p, lag, crit = kpss
        crit_5pct = crit['5%']
        return {
            'p': p,
            'stat': stat,
            'lag': lag,
            'crit_5pct': crit_5pct,
            'reject_5pct': int(stat > crit_5pct)  # KPSS原假设是"平稳"，所以 > 临界值 拒绝平稳
        }
    if should_keep(['kpss_stat', 'kpss_pvalue']):
        kpss1_stat, kpss1_pvalue, kpss2_stat, kpss2_pvalue, kpss_whole_stat, kpss_whole_pvalue = (np.nan,)*6
        try:
            k1 = extract_kpss_features(s1)
            k2 = extract_kpss_features(s2)
            k_whole = extract_kpss_features(s_whole)

            kpss1_stat, kpss1_pvalue = k1['stat'], k1['p']
            kpss2_stat, kpss2_pvalue = k2['stat'], k2['p']
            kpss_whole_stat, kpss_whole_pvalue = k_whole['stat'], k_whole['p']
        except Exception as e:
            pass 
        # feats['kpss_pvalue_left'] = kpss1_pvalue
        # feats['kpss_pvalue_right'] = kpss2_pvalue
        # feats['kpss_pvalue_whole'] = kpss_whole_pvalue
        # _add_diff_ratio_feats(feats, 'kpss_pvalue', kpss1_pvalue, kpss2_pvalue)
        _add_contribution_ratio_feats(feats, 'kpss_pvalue', kpss1_pvalue, kpss2_pvalue, kpss_whole_pvalue)
        # feats['kpss_stat_left'] = kpss1_stat
        # feats['kpss_stat_right'] = kpss2_stat
        # feats['kpss_stat_whole'] = kpss_whole_stat
        # _add_diff_ratio_feats(feats, 'kpss_stat', kpss1_stat, kpss2_stat)
        # _add_contribution_ratio_feats(feats, 'kpss_stat', kpss1_stat, kpss2_stat, kpss_whole_stat)

    # 平稳性检验 (ADF)
    def extract_adf_features(s):
        if len(s) <= 12:
            return {'p': np.nan, 'stat': np.nan, 'lag': np.nan, 'ic': np.nan, 'crit_5pct': np.nan, 'reject_5pct': 0}
        adf = tsa.stattools.adfuller(s, autolag='AIC')
        stat, p, lag, _, crit, ic = adf
        crit_5pct = crit['5%']
        return {
            'p': p,
            'stat': stat,
            'lag': lag,
            'ic': ic,
            'crit_5pct': crit_5pct,
            'reject_5pct': int(stat < crit_5pct)  # ADF 原假设是“非平稳”，stat < 临界值 ⇒ 拒绝非平稳 ⇒ 平稳
        }
    if should_keep(['adf_stat', 'adf_pvalue', 'adf_icbest']):
        adf1_stat, adf1_pvalue, adf2_stat, adf2_pvalue, adf_whole_stat, adf_whole_pvalue = (np.nan,) * 6
        adf1_ic, adf2_ic, adf_whole_ic = (np.nan,) * 3
        try:
            f1 = extract_adf_features(s1)
            f2 = extract_adf_features(s2)
            f_whole = extract_adf_features(s_whole)
            adf1_stat, adf1_pvalue, adf1_ic = f1['stat'], f1['p'], f1['ic']
            adf2_stat, adf2_pvalue, adf2_ic = f2['stat'], f2['p'], f2['ic']
            adf_whole_stat, adf_whole_pvalue, adf_whole_ic = f_whole['stat'], f_whole['p'], f_whole['ic']
        except Exception as e:
            pass 
        feats['adf_pvalue_left'] = adf1_pvalue
        feats['adf_pvalue_right'] = adf2_pvalue
        feats['adf_pvalue_whole'] = adf_whole_pvalue
        _add_diff_ratio_feats(feats, 'adf_pvalue', adf1_pvalue, adf2_pvalue)
        _add_contribution_ratio_feats(feats, 'adf_pvalue', adf1_pvalue, adf2_pvalue, adf_whole_pvalue)
        feats['adf_stat_left'] = adf1_stat
        feats['adf_stat_right'] = adf2_stat
        feats['adf_stat_whole'] = adf_whole_stat
        _add_diff_ratio_feats(feats, 'adf_stat', adf1_stat, adf2_stat)
        # _add_contribution_ratio_feats(feats, 'adf_stat', adf1_stat, adf2_stat, adf_whole_stat)
        # feats['adf_icbest_left'] = adf1_ic
        # feats['adf_icbest_right'] = adf2_ic
        # feats['adf_icbest_whole'] = adf_whole_ic
        # _add_diff_ratio_feats(feats, 'adf_icbest', adf1_ic, adf2_ic)
        # _add_contribution_ratio_feats(feats, 'adf_icbest', adf1_ic, adf2_ic, adf_whole_ic)

    return {k: float(v) if not np.isnan(v) else 0 for k, v in feats.items()}

# --- 3. 趋势特征 ---
@register_feature(func_id="3")
def trend_features(u: pd.DataFrame, selected_features: set = None) -> dict:
    s1 = u['value'][u['period'] == 0]
    s2 = u['value'][u['period'] == 1]
    s_whole = u['value']
    feats = {}

    def analyze_trend(series, seg):
        """分析时间序列的趋势特征"""
        trend_feats = {}
        x = np.arange(len(series))
        
        try:
            # 1. 线性趋势分析 (使用scipy.stats.linregress)
            slope, intercept, r_value, p_value, std_err = scipy.stats.linregress(x, series)
            # trend_feats[f'linear_trend_slope_{seg}'] = slope
            # trend_feats[f'linear_trend_intercept_{seg}'] = intercept
            # trend_feats[f'linear_trend_r_value_{seg}'] = r_value
            trend_feats[f'linear_trend_r2_{seg}'] = r_value ** 2
            # trend_feats[f'linear_trend_pvalue_{seg}'] = p_value
            # trend_feats[f'linear_trend_std_err_{seg}'] = std_err
        except Exception as e:
            logger.error(f"Error in linear trend analysis for {seg}: {e}")

        #     trend_feats[f'linear_trend_slope_{seg}'] = 0
        #     trend_feats[f'linear_trend_intercept_{seg}'] = 0
        #     trend_feats[f'linear_trend_r_value_{seg}'] = 0
            trend_feats[f'linear_trend_r2_{seg}'] = 0
        #     trend_feats[f'linear_trend_pvalue_{seg}'] = 1
        #     trend_feats[f'linear_trend_std_err_{seg}'] = 0

        try:
            # 2. 去趋势分析 (detrended features)
            linear_trend = slope * x + intercept
            detrended = series - linear_trend
            # trend_feats[f'detrend_mean_{seg}'] = np.mean(detrended)
            # trend_feats[f'detrend_volatility_{seg}'] = np.std(detrended)
            trend_feats[f'detrend_volatility_normalized_{seg}'] = np.std(detrended) / (np.abs(np.mean(series)) + 1e-6)
            # trend_feats[f'detrend_max_deviation_{seg}'] = np.max(np.abs(detrended))
        except Exception as e:
            logger.error(f"Error in detrending analysis for {seg}: {e}")

            # trend_feats[f'detrend_mean_{seg}'] = 0
            # trend_feats[f'detrend_volatility_{seg}'] = 0
            trend_feats[f'detrend_volatility_normalized_{seg}'] = 0
            # trend_feats[f'detrend_max_deviation_{seg}'] = 0

        try:
        #     # 3. 趋势变化率
        #     trend_feats[f'trend_change_rate_{seg}'] = slope / (np.mean(np.abs(series)) + 1e-6)  # 相对变化率
            trend_feats[f'trend_normalized_slope_{seg}'] = slope / (np.std(series) + 1e-6)  # 标准化斜率
        except Exception as e:
            logger.error(f"Error in trend change rate analysis for {seg}: {e}")

        #     trend_feats[f'trend_change_rate_{seg}'] = 0
            trend_feats[f'trend_normalized_slope_{seg}'] = 0
        
        return trend_feats
    
    feats.update(analyze_trend(s1, 'left'))
    feats.update(analyze_trend(s2, 'right'))
    feats.update(analyze_trend(s_whole, 'whole'))
    # _add_diff_ratio_feats(feats, 'linear_trend_slope', feats['linear_trend_slope_left'] if 'linear_trend_slope_left' in feats else 0, feats['linear_trend_slope_right'] if 'linear_trend_slope_right' in feats else 0)
    _add_diff_ratio_feats(feats, 'linear_trend_r2', feats['linear_trend_r2_left'] if 'linear_trend_r2_left' in feats else 0, feats['linear_trend_r2_right'] if 'linear_trend_r2_right' in feats else 0)
    # _add_diff_ratio_feats(feats, 'linear_trend_pvalue', feats['linear_trend_pvalue_left'] if 'linear_trend_pvalue_left' in feats else 0, feats['linear_trend_pvalue_right'] if 'linear_trend_pvalue_right' in feats else 0)
    # _add_diff_ratio_feats(feats, 'detrend_mean', feats['detrend_mean_left'] if 'detrend_mean_left' in feats else 0, feats['detrend_mean_right'] if 'detrend_mean_right' in feats else 0)
    # _add_diff_ratio_feats(feats, 'detrend_volatility_normalized', feats['detrend_volatility_normalized_left'] if 'detrend_volatility_normalized_left' in feats else 0, feats['detrend_volatility_normalized_right'] if 'detrend_volatility_normalized_right' in feats else 0)
    # _add_diff_ratio_feats(feats, 'detrend_max_deviation', feats['detrend_max_deviation_left'] if 'detrend_max_deviation_left' in feats else 0, feats['detrend_max_deviation_right'] if 'detrend_max_deviation_right' in feats else 0)
    
    return {k: float(v) if not np.isnan(v) else 0 for k, v in feats.items()}

# --- 4. 振荡特征 ---
@register_feature(func_id="4")
def oscillation_features(u: pd.DataFrame, selected_features: set = None) -> dict:
    s1 = u['value'][u['period'] == 0].reset_index(drop=True)
    s2 = u['value'][u['period'] == 1].reset_index(drop=True)
    s_whole = u['value'].reset_index(drop=True)
    feats = {}

    # def count_zero_crossings(series: pd.Series):
    #     if len(series) < 2: return 0
    #     centered_series = series - series.mean()
    #     if centered_series.eq(0).all(): return 0
    #     return np.sum(np.diff(np.sign(centered_series)) != 0)
    # zc1, zc2, zc_whole = count_zero_crossings(s1), count_zero_crossings(s2), count_zero_crossings(s_whole)
    # feats['zero_cross_left'] = zc1
    # feats['zero_cross_right'] = zc2
    # feats['zero_cross_whole'] = zc_whole
    # _add_diff_ratio_feats(feats, 'zero_cross', zc1, zc2)
    # _add_contribution_ratio_feats(feats, 'zero_cross', zc1, zc2, zc_whole)
    
    def autocorr_lag1(s):
        if len(s) < 2: return 0.0
        ac = s.autocorr(lag=1)
        return ac if not np.isnan(ac) else 0.0
    # ac1 = autocorr_lag1(s1)
    # ac2 = autocorr_lag1(s2)
    ac_whole = autocorr_lag1(s_whole)
    # feats['autocorr_lag1_left'] = ac1
    # feats['autocorr_lag1_right'] = ac2
    feats['autocorr_lag1_whole'] = ac_whole
    # _add_diff_ratio_feats(feats, 'autocorr_lag1', ac1, ac2)
    # _add_contribution_ratio_feats(feats, 'autocorr_lag1', ac1, ac2, ac_whole)

    # var1, var2, var_whole = s1.diff().var(), s2.diff().var(), s_whole.diff().var()
    # feats['diff_var_left'] = var1
    # feats['diff_var_right'] = var2
    # feats['diff_var_whole'] = var_whole
    # # _add_diff_ratio_feats(feats, 'diff_var', var1, var2)
    # _add_contribution_ratio_feats(feats, 'diff_var', var1, var2, var_whole)
    
    return {k: float(v) if not np.isnan(v) else 0 for k, v in feats.items()}

# --- 5. 频域特征 ---
@register_feature(func_id="5")
def cyclic_features(u: pd.DataFrame, selected_features: set = None) -> dict:
    s1 = u['value'][u['period'] == 0]
    s2 = u['value'][u['period'] == 1]
    s_whole = u['value']
    feats = {}

    def get_fft_props(series):
        if len(series) < 2: return 0.0, 0.0
        
        N = len(series)
        yf = np.fft.fft(series.values)
        power = np.abs(yf[1:N//2])**2
        xf = np.fft.fftfreq(N, 1)[1:N//2]
        
        if len(power) == 0: return 0.0, 0.0
            
        dominant_freq = xf[np.argmax(power)]
        max_power = np.max(power)
        return dominant_freq, max_power

    freq1, power1 = get_fft_props(s1)
    freq2, power2 = get_fft_props(s2)
    freq_whole, power_whole = get_fft_props(s_whole)
    
    feats['dominant_freq_left'] = freq1
    feats['dominant_freq_right'] = freq2
    feats['dominant_freq_whole'] = freq_whole
    # _add_diff_ratio_feats(feats, 'dominant_freq', freq1, freq2)
    _add_contribution_ratio_feats(feats, 'dominant_freq', freq1, freq2, freq_whole)

    # feats['max_power_left'] = power1
    # feats['max_power_right'] = power2
    # feats['max_power_whole'] = power_whole
    # _add_diff_ratio_feats(feats, 'max_power', power1, power2)
    # _add_contribution_ratio_feats(feats, 'max_power', power1, power2, power_whole)
    
    return {k: float(v) if not np.isnan(v) else 0 for k, v in feats.items()}

# --- 6. 振幅特征 ---
@register_feature(func_id="6")
def amplitude_features(u: pd.DataFrame, selected_features: set = None) -> dict:
    s1 = u['value'][u['period'] == 0]
    s2 = u['value'][u['period'] == 1]
    s_whole = u['value']
    feats = {}
    
    # ptp1, ptp2, ptp_whole = np.ptp(s1), np.ptp(s2), np.ptp(s_whole)
    # iqr1, iqr2, iqr_whole = scipy.stats.iqr(s1), scipy.stats.iqr(s2), scipy.stats.iqr(s_whole)

    # feats['ptp_left'] = ptp1
    # feats['ptp_right'] = ptp2
    # feats['ptp_whole'] = ptp_whole
    # _add_diff_ratio_feats(feats, 'ptp', ptp1, ptp2)
    # _add_contribution_ratio_feats(feats, 'ptp', ptp1, ptp2, ptp_whole)

    # feats['iqr_left'] = iqr1
    # feats['iqr_right'] = iqr2
    # feats['iqr_whole'] = iqr_whole
    # _add_diff_ratio_feats(feats, 'iqr', iqr1, iqr2)
    # _add_contribution_ratio_feats(feats, 'iqr', iqr1, iqr2, iqr_whole)
    
    return {k: float(v) if not np.isnan(v) else 0 for k, v in feats.items()}

# --- 7. 熵信息 ---
@register_feature(func_id="7")
def entropy_features_first(u: pd.DataFrame, selected_features: set = None) -> dict:
    s1 = u['value'][u['period'] == 0].to_numpy()
    s2 = u['value'][u['period'] == 1].to_numpy()
    s_whole = u['value'].to_numpy()
    feats = {}

    def should_keep(name):
        if selected_features is None:
            return True
        keep = name in selected_features
        return keep

    def compute_entropy(x):
        hist, _ = np.histogram(x, bins='auto', density=True)
        hist = hist[hist > 0]
        return scipy.stats.entropy(hist)
    
    entropy_funcs = {
        'shannon_entropy': compute_entropy,
        'perm_entropy': lambda x: antropy.perm_entropy(x, normalize=True),
        'spectral_entropy': lambda x: antropy.spectral_entropy(x, sf=1.0, normalize=True),
        'svd_entropy': lambda x: antropy.svd_entropy(x, normalize=True),
        'approx_entropy': antropy.app_entropy,
        'sample_entropy': antropy.sample_entropy,
        'petrosian_fd': antropy.petrosian_fd,
        'katz_fd': antropy.katz_fd,
        'higuchi_fd': antropy.higuchi_fd,
        'detrended_fluctuation': antropy.detrended_fluctuation,
    }

    for name, func in entropy_funcs.items():
        if not should_keep(name):
            continue
        try:
            v1, v2, v_whole = func(s1), func(s2), func(s_whole)
            feats[f'{name}_left'] = v1
            feats[f'{name}_right'] = v2
            feats[f'{name}_whole'] = v_whole
            _add_diff_ratio_feats(feats, name, v1, v2)
            _add_contribution_ratio_feats(feats, name, v1, v2, v_whole)
        except Exception:
            feats.update({f'{name}_left': 0, f'{name}_right': 0, f'{name}_whole': 0, f'{name}_diff': 0, f'{name}_ratio': 0})
    
    return {k: float(v) if not np.isnan(v) else 0 for k, v in feats.items()}

# --- 7. 熵信息 ---
@register_feature(func_id="7")
def entropy_features_second(u: pd.DataFrame, selected_features: set = None) -> dict:
    s1 = u['value'][u['period'] == 0].to_numpy()
    s2 = u['value'][u['period'] == 1].to_numpy()
    s_whole = u['value'].to_numpy()
    feats = {}

    def should_keep(candidates):
        if selected_features is None:
            return True
        keep = any(c in selected_features for c in candidates)
        return keep

    if should_keep(['hjorth_mobility', 'hjorth_complexity']):
        try:
            m1, c1 = antropy.hjorth_params(s1)
            m2, c2 = antropy.hjorth_params(s2)
            m_whole, c_whole = antropy.hjorth_params(s_whole)
            feats.update({
                # 'hjorth_mobility_left': m1, 
                # 'hjorth_mobility_right': m2, 
                # 'hjorth_mobility_whole': m_whole,
                'hjorth_complexity_left': c1, 
                'hjorth_complexity_right': c2, 
                'hjorth_complexity_whole': c_whole,
            })
            # _add_diff_ratio_feats(feats, 'hjorth_mobility', m1, m2)
            # _add_contribution_ratio_feats(feats, 'hjorth_mobility', m1, m2, m_whole)
            _add_diff_ratio_feats(feats, 'hjorth_complexity', c1, c2)
            _add_contribution_ratio_feats(feats, 'hjorth_complexity', c1, c2, c_whole)
        except Exception:
            feats.update({'hjorth_mobility_left':0, 'hjorth_mobility_right':0, 'hjorth_mobility_whole':0, 'hjorth_mobility_diff':0, 'hjorth_mobility_ratio':0,
                        'hjorth_complexity_left':0, 'hjorth_complexity_right':0, 'hjorth_complexity_whole':0, 'hjorth_complexity_diff':0, 'hjorth_complexity_ratio':0})

    def series_to_binary_str(x, method='median'):
        if method == 'median':
            threshold = np.median(x)
            return ''.join(['1' if val > threshold else '0' for val in x])
        return None
    if should_keep(['lziv_complexity']):
        try:
            bin_str1 = series_to_binary_str(s1)
            bin_str2 = series_to_binary_str(s2)
            bin_str_whole = series_to_binary_str(s_whole)
            lz1, lz2, lz_whole = antropy.lziv_complexity(bin_str1, normalize=True), antropy.lziv_complexity(bin_str2, normalize=True), antropy.lziv_complexity(bin_str_whole, normalize=True)
            feats.update({
                'lziv_complexity_left': lz1, 'lziv_complexity_right': lz2, 'lziv_complexity_whole': lz_whole,
            })
            _add_diff_ratio_feats(feats, 'lziv_complexity', lz1, lz2)
            _add_contribution_ratio_feats(feats, 'lziv_complexity', lz1, lz2, lz_whole)
        except Exception:
            feats.update({'lziv_complexity_left':0, 'lziv_complexity_right':0, 'lziv_complexity_whole':0, 'lziv_complexity_diff':0, 'lziv_complexity_ratio':0})

    def estimate_cond_entropy(x, lag=1):
        x = x - np.mean(x)
        x_lag = x[:-lag]
        x_now = x[lag:]
        bins = 10
        joint_hist, _, _ = np.histogram2d(x_lag, x_now, bins=bins, density=True)
        joint_hist = joint_hist[joint_hist > 0]
        H_xy = -np.sum(joint_hist * np.log(joint_hist))
        H_x = -np.sum(np.histogram(x_lag, bins=bins, density=True)[0] * \
                      np.log(np.histogram(x_lag, bins=bins, density=True)[0] + 1e-12))
        return H_xy - H_x
    if should_keep(['cond_entropy']):
        try:
            # ce1 = estimate_cond_entropy(s1)
            # ce2 = estimate_cond_entropy(s2)
            ce_whole = estimate_cond_entropy(s_whole)
            feats.update({
                # 'cond_entropy_left': ce1, 
                # 'cond_entropy_right': ce2, 
                'cond_entropy_whole': ce_whole,
            })
            # _add_diff_ratio_feats(feats, 'cond_entropy', ce1, ce2)
            # _add_contribution_ratio_feats(feats, 'cond_entropy', ce1, ce2, ce_whole)
        except Exception:
            feats.update({'cond_entropy_left':0, 'cond_entropy_right':0, 'cond_entropy_whole':0, 'cond_entropy_diff':0, 'cond_entropy_ratio':0})
        
    return {k: float(v) if not np.isnan(v) else 0 for k, v in feats.items()}

# --- 8. tsfresh --- 
@register_feature(func_id="8")
def tsfresh_features_first(u: pd.DataFrame, selected_features: set = None) -> dict:
    """基于tsfresh的特征工程"""
    s1 = u['value'][u['period'] == 0].to_numpy()
    s2 = u['value'][u['period'] == 1].to_numpy()
    s_whole = u['value'].to_numpy()
    feats = {}

    funcs = {
        tsfresh_fe.ratio_value_number_to_time_series_length: None,
        tsfresh_fe.sum_of_reoccurring_data_points: None,
        tsfresh_fe.percentage_of_reoccurring_values_to_all_values: None,
        tsfresh_fe.percentage_of_reoccurring_datapoints_to_all_datapoints: None,
        tsfresh_fe.last_location_of_maximum: None,
        tsfresh_fe.first_location_of_maximum: None,
        tsfresh_fe.has_duplicate: None,
        tsfresh_fe.benford_correlation: None,
        tsfresh_fe.ratio_beyond_r_sigma: [
            6, 
            3, 1.5, 1, 
            0.5
        ],
        tsfresh_fe.quantile: [
            0.6, 
            0.4, 
            0.1
        ],
        tsfresh_fe.count_above: [0],
        tsfresh_fe.number_peaks: [
            25, 
            50
        ],
        tsfresh_fe.partial_autocorrelation: [
            {"lag": 2}, 
            {"lag": 4},
            {"lag": 6}
        ],
        tsfresh_fe.index_mass_quantile: [
            {"q": 0.1}, 
            {"q": 0.6}, 
            {"q": 0.8}
        ],
        tsfresh_fe.ar_coefficient: [
            {"coeff": 0, "k": 10}, 
            {"coeff": 2, "k": 10}, 
            {"coeff": 8, "k": 10}
        ],
        tsfresh_fe.linear_trend: [
            {"attr": "slope"}, 
            {"attr": "rvalue"}, 
            {"attr": "pvalue"}, 
            {"attr": "intercept"}
        ],
        tsfresh_fe.fft_coefficient: [
            {"attr": "imag", "coeff": 3}, 
            {"attr": "imag", "coeff": 2}, 
            {"attr": "imag", "coeff": 1}
        ],
        tsfresh_fe.energy_ratio_by_chunks: [
            {"num_segments": 10, "segment_focus": 9},
            {"num_segments": 20, "segment_focus": 16},
        ],
        tsfresh_fe.friedrich_coefficients: [
            {"coeff": 2, "m": 3, "r": 30}, 
            {"coeff": 3, "m": 3, "r": 30}
        ],
        tsfresh_fe.change_quantiles: [
            {"f_agg": "var", "isabs": True,  "qh": 1.0, "ql": 0.4},
            {"f_agg": "var", "isabs": True,  "qh": 1.0, "ql": 0.2},
            {"f_agg": "var", "isabs": True,  "qh": 0.8, "ql": 0.6},
            {"f_agg": "var", "isabs": True,  "qh": 0.8, "ql": 0.4},
            {"f_agg": "var", "isabs": True,  "qh": 0.8, "ql": 0.2},
            {"f_agg": "var", "isabs": True,  "qh": 0.6, "ql": 0.4},
            {"f_agg": "var", "isabs": True,  "qh": 0.6, "ql": 0.2},
            {"f_agg": "var", "isabs": True,  "qh": 0.4, "ql": 0.2},
            {"f_agg": "var", "isabs": False, "qh": 1.0, "ql": 0.4},
            {"f_agg": "var", "isabs": False, "qh": 1.0, "ql": 0.2},
            {"f_agg": "var", "isabs": False, "qh": 0.8, "ql": 0.4},
            {"f_agg": "var", "isabs": False, "qh": 0.8, "ql": 0.2},
            {"f_agg": "var", "isabs": False, "qh": 0.8, "ql": 0.0},
            {"f_agg": "var", "isabs": False, "qh": 0.6, "ql": 0.4},
            {"f_agg": "var", "isabs": False, "qh": 0.6, "ql": 0.2},
            {"f_agg": "var", "isabs": False, "qh": 0.4, "ql": 0.2},
            {"f_agg": "mean","isabs": True,  "qh": 1.0, "ql": 0.4},
            {"f_agg": "mean","isabs": True,  "qh": 0.6, "ql": 0.4},
        ],
        tsfresh_fe.agg_linear_trend: [
            {"attr": "slope", "chunk_len": 50, "f_agg": "mean"},
            {"attr": "slope", "chunk_len": 5,  "f_agg": "mean"},
            {"attr": "slope", "chunk_len": 10, "f_agg": "mean"},
            {"attr": "rvalue", "chunk_len": 50, "f_agg": "mean"},
            {"attr": "rvalue", "chunk_len": 50, "f_agg": "max"},
            {"attr": "rvalue", "chunk_len": 5,  "f_agg": "mean"},
            {"attr": "rvalue", "chunk_len": 5,  "f_agg": "max"},
            {"attr": "rvalue", "chunk_len": 10, "f_agg": "mean"},
            {"attr": "rvalue", "chunk_len": 10, "f_agg": "max"},
            {"attr": "intercept", "chunk_len": 50, "f_agg": "mean"},
            {"attr": "intercept", "chunk_len": 50, "f_agg": "max"},
            {"attr": "intercept", "chunk_len": 5,  "f_agg": "mean"},
            {"attr": "intercept", "chunk_len": 5,  "f_agg": "max"},
            {"attr": "intercept", "chunk_len": 10, "f_agg": "mean"},
            {"attr": "intercept", "chunk_len": 10, "f_agg": "max"},
        ],
    }
    
    def param_to_str(param):
        if isinstance(param, dict):
            return '_'.join([f"{k}_{v}" for k, v in param.items()])
        else:
            return str(param)

    def calculate_stats_for_feature(func, param=None):
        results = {}
        base_name = func.__name__
        if param is not None:
            base_name += f"_{param_to_str(param)}"

        try:
            # Prepare arguments for each segment
            args_s1 = [s1]
            args_s2 = [s2]
            args_s_whole = [s_whole]
            is_combiner = False

            if param is None: # Simple function, no params
                pass
            elif isinstance(param, dict):
                # Check if it's a combiner function or a function with kwargs
                sig = inspect.signature(func)
                if 'param' in sig.parameters: # Combiner function
                    is_combiner = True
                    args_s1.append([param])
                    args_s2.append([param])
                    args_s_whole.append([param])
                else: # Function with kwargs
                    args_s1.append(param)
                    args_s2.append(param)
                    args_s_whole.append(param)
            else: # Simple function with a single parameter
                args_s1.append(param)
                args_s2.append(param)
                args_s_whole.append(param)

            # Execute function for each segment
            if is_combiner:
                v1_dict = {k: v for k, v in func(*args_s1)}
                v2_dict = {k: v for k, v in func(*args_s2)}
                v_whole_dict = {k: v for k, v in func(*args_s_whole)}
                
                for key in v1_dict:
                    v1, v2, v_whole = v1_dict[key], v2_dict[key], v_whole_dict[key]
                    feat_name_base = f"{func.__name__}_{key}"
                    results[f'{feat_name_base}_left'] = v1
                    results[f'{feat_name_base}_right'] = v2
                    results[f'{feat_name_base}_whole'] = v_whole
                    _add_diff_ratio_feats(feats, feat_name_base, v1, v2)
                    _add_contribution_ratio_feats(results, feat_name_base, v1, v2, v_whole)
                return results

            else:
                if isinstance(param, dict) and not is_combiner:
                    v1, v2, v_whole = func(args_s1[0], **args_s1[1]), func(args_s2[0], **args_s2[1]), func(args_s_whole[0], **args_s_whole[1])
                else:
                    v1, v2, v_whole = func(*args_s1), func(*args_s2), func(*args_s_whole)

                results[f'{base_name}_left'] = v1
                results[f'{base_name}_right'] = v2
                results[f'{base_name}_whole'] = v_whole
                _add_diff_ratio_feats(feats, base_name, v1, v2)
                _add_contribution_ratio_feats(results, base_name, v1, v2, v_whole)
        
        except Exception:
            # For combiner functions, need to know keys to create nulls
            if 'param' in locals() and inspect.isfunction(func) and 'param' in inspect.signature(func).parameters:
                 # It's a combiner, but we can't get keys without running it. Skip for now on error.
                 pass
            else:
                results[f'{base_name}_left'] = np.nan
                results[f'{base_name}_right'] = np.nan
                results[f'{base_name}_whole'] = np.nan
                results[f'{base_name}_diff'] = np.nan
                results[f'{base_name}_ratio'] = np.nan
                
        return results

    def should_keep(func, param):
        if selected_features is None:
            return True
        base_name = func.__name__
        if param is not None:
            base_name += f"_{param_to_str(param)}"
        base_name = clean_feature_name(base_name)
        candidates = [
            f"{base_name}"
        ]
        keep = any(c in selected_features for c in candidates)
        return keep

    for func, params in funcs.items():
        if params is None:
            if should_keep(func, None):
                feats.update(calculate_stats_for_feature(func))
        else:
            for param in params:
                if should_keep(func, param):
                    feats.update(calculate_stats_for_feature(func, param))

    return {k: float(v) if not np.isnan(v) else 0 for k, v in feats.items()}

# --- 9. 时间序列建模 ---
@register_feature(func_id="9")
def ar_model_features(u: pd.DataFrame, selected_features: set = None) -> dict:
    """
    基于AR模型派生特征。
    1. 在 period 0 上训练模型，预测 period 1，计算残差统计量。
    2. 在 period 1 上训练模型，预测 period 0，计算残差统计量。
    3. 分别在 period 0 和 1 上训练模型，比较模型参数、残差和信息准则(AIC/BIC)。
    """
    s1 = u['value'][u['period'] == 0].to_numpy()
    s2 = u['value'][u['period'] == 1].to_numpy()
    s_whole = u['value'].to_numpy()
    feats = {}
    lags = 5 # 固定阶数以保证可比性

    def should_keep(candidates):
        if selected_features is None:
            return True
        keep = any(c in selected_features for c in candidates)
        return keep

    # --- 特征组1: 用 s1 训练，预测 s2 ---
    if should_keep(['ar_residuals_s2_pred_mean', 'ar_residuals_s2_pred_std', 'ar_residuals_s2_pred_skew', 'ar_residuals_s2_pred_kurt']):
        try:
            model1_fit = AutoReg(s1, lags=lags).fit()
            predictions = model1_fit.predict(start=len(s1), end=len(s1) + len(s2) - 1, dynamic=True)
            residuals = s2 - predictions
            feats['ar_residuals_s2_pred_mean'] = np.mean(residuals)
            feats['ar_residuals_s2_pred_std'] = np.std(residuals)
            feats['ar_residuals_s2_pred_skew'] = pd.Series(residuals).skew()
            feats['ar_residuals_s2_pred_kurt'] = pd.Series(residuals).kurt()
        except Exception:
            # 宽泛地捕获异常，防止因数值问题中断
            feats.update({'ar_residuals_s2_pred_mean': 0, 'ar_residuals_s2_pred_std': 0, 'ar_residuals_s2_pred_skew': 0, 'ar_residuals_s2_pred_kurt': 0})

    # --- 特征组2: 用 s2 训练，预测 s1 ---
    if should_keep(['ar_residuals_s1_pred_mean', 'ar_residuals_s1_pred_std', 'ar_residuals_s1_pred_skew', 'ar_residuals_s1_pred_kurt']):
        try:
            model2_fit = AutoReg(s2, lags=lags).fit()
            predictions_on_s1 = model2_fit.predict(start=len(s2), end=len(s2) + len(s1) - 1, dynamic=True)
            residuals_s1_pred = s1 - predictions_on_s1
            feats['ar_residuals_s1_pred_mean'] = np.mean(residuals_s1_pred)
            feats['ar_residuals_s1_pred_std'] = np.std(residuals_s1_pred)
            feats['ar_residuals_s1_pred_skew'] = pd.Series(residuals_s1_pred).skew()
            feats['ar_residuals_s1_pred_kurt'] = pd.Series(residuals_s1_pred).kurt()
        except Exception:
            feats.update({'ar_residuals_s1_pred_mean': 0, 'ar_residuals_s1_pred_std': 0, 'ar_residuals_s1_pred_skew': 0, 'ar_residuals_s1_pred_kurt': 0})

    # --- 特征组3: 分别建模，比较差异 ---
    if should_keep(['ar_param_0', 'ar_param_1', 'ar_param_2', 'ar_param_3', 'ar_param_4', 'ar_param_5']):
        s1_resid_std, s1_params = np.nan, np.full(lags + 1, np.nan)
        s1_aic, s1_bic = np.nan, np.nan
        if len(s1) > lags:
            try:
                fit1 = AutoReg(s1, lags=lags).fit()
                s1_resid_std = np.std(fit1.resid)
                s1_params = fit1.params
                s1_aic = fit1.aic
                s1_bic = fit1.bic
            except Exception:
                pass

        s2_resid_std, s2_params = np.nan, np.full(lags + 1, np.nan)
        s2_aic, s2_bic = np.nan, np.nan
        if len(s2) > lags:
            try:
                fit2 = AutoReg(s2, lags=lags).fit()
                s2_resid_std = np.std(fit2.resid)
                s2_params = fit2.params
                s2_aic = fit2.aic
                s2_bic = fit2.bic
            except Exception:
                pass

        swhole_resid_std, swhole_params = np.nan, np.full(lags + 1, np.nan)
        swhole_aic, swhole_bic = np.nan, np.nan
        if len(s_whole) > lags:
            try:
                fit_whole = AutoReg(s_whole, lags=lags).fit()
                swhole_resid_std = np.std(fit_whole.resid)
                swhole_params = fit_whole.params
                swhole_aic = fit_whole.aic
                swhole_bic = fit_whole.bic
            except Exception:
                pass
                
        # feats['ar_resid_std_left'] = s1_resid_std
        # feats['ar_resid_std_right'] = s2_resid_std
        # feats['ar_resid_std_whole'] = swhole_resid_std
        # _add_diff_ratio_feats(feats, 'ar_resid_std', s1_resid_std, s2_resid_std)
        # _add_contribution_ratio_feats(feats, 'ar_resid_std', s1_resid_std, s2_resid_std, swhole_resid_std)
        
        # feats['ar_aic_left'] = s1_aic
        # feats['ar_aic_right'] = s2_aic
        # feats['ar_aic_whole'] = swhole_aic
        # _add_diff_ratio_feats(feats, 'ar_aic', s1_aic, s2_aic)
        # _add_contribution_ratio_feats(feats, 'ar_aic', s1_aic, s2_aic, swhole_aic)

        # feats['ar_bic_left'] = s1_bic
        # feats['ar_bic_right'] = s2_bic
        # feats['ar_bic_whole'] = swhole_bic
        # _add_diff_ratio_feats(feats, 'ar_bic', s1_bic, s2_bic)
        # _add_contribution_ratio_feats(feats, 'ar_bic', s1_bic, s2_bic, swhole_bic)
        
        # 比较模型系数
        for i in range(lags + 1):
            if should_keep([f'ar_param_{i}']):
                feats[f'ar_param_{i}_left'] = s1_params[i]
                feats[f'ar_param_{i}_right'] = s2_params[i]
                feats[f'ar_param_{i}_whole'] = swhole_params[i]
                _add_diff_ratio_feats(feats, f'ar_param_{i}', s1_params[i], s2_params[i])
                _add_contribution_ratio_feats(feats, f'ar_param_{i}', s1_params[i], s2_params[i], swhole_params[i])

    return {k: float(v) if not np.isnan(v) else 0 for k, v in feats.items()}

# --- 10. 分段损失 ---
class RPTFeatureExtractor:
    def __init__(self, selected_features):
        # 所有可用的cost类及其名称
        self.cost_classes = {
            'l1': rpt.costs.CostL1,               # 中位数
            'l2': rpt.costs.CostL2,               # 均值
            'clinear': rpt.costs.CostCLinear,     # 线性协方差
            'rbf': rpt.costs.CostRbf,             # RBF核
            'normal': rpt.costs.CostNormal,       # 协方差
            'ar': rpt.costs.CostAR,               # 自回归
            'mahalanobis': rpt.costs.CostMl,      # 马氏距离
            'rank': rpt.costs.CostRank,           # 排名
            'cosine': rpt.costs.CostCosine,       # 余弦距离
        }
        self.selected_features = selected_features

    def calculate(self, cost, start, end):
        result = cost.error(start, end)
        if isinstance(result, (np.ndarray, list)) and np.array(result).size == 1:
            return float(np.array(result).squeeze())
        return result

    def should_keep(self, name):
        if self.selected_features is None:
            return True
        keep = f'rpt_cost_{name}' in self.selected_features
        return keep

    def extract(self, signal, boundary):
        """
        输入：
            signal: 1D numpy array，单变量时间序列
            boundary: int，分割点
        输出：
            result: dict，格式为 {cost_name: {'left': value, 'right': value}}
        """
        signal = np.asarray(signal)
        n = len(signal)
        result = {}
        for name, cls in self.cost_classes.items():
            if not self.should_keep(name):
                continue
            try:
                if name == 'ar':
                    cost = cls(order=4)
                else:
                    cost = cls()
                cost.fit(signal)
                left = self.calculate(cost, 0, boundary)
                right = self.calculate(cost, boundary, n)
                whole = self.calculate(cost, 0, n)
                # diff = right - left if left is not None and right is not None else None
                # ratio = right / (left + 1e-6) if left is not None and right is not None else None
            except Exception:
                left = None
                right = None
                whole = None
                # diff = None
                # ratio = None
            # Move to _add_diff_ratio_feats, 'diff': diff, 'ratio': ratio
            result[name] = {'left': left, 'right': right, 'whole': whole}
        return result

@register_feature(func_id="10")
def rupture_cost_features(u: pd.DataFrame, selected_features: set = None) -> dict:
    value = u['value'].values.astype(np.float32)
    period = u['period'].values.astype(np.float32)
    boundary = np.where(np.diff(period) != 0)[0].item()
    feats = {}

    extractor = RPTFeatureExtractor(selected_features)
    features = extractor.extract(value, boundary)

    feats = {}
    for k, v in features.items():
        for seg, value in v.items():
            feats[f'rpt_cost_{k}_{seg}'] = value
        _add_diff_ratio_feats(feats, f'rpt_cost_{k}', v['left'], v['right'])
        _add_contribution_ratio_feats(feats, f'rpt_cost_{k}', v['left'], v['right'], v['whole'])

    return {k: float(v) if not np.isnan(v) else 0 for k, v in feats.items()}

In [9]:
# --- feature.py 特征生成工具 ---
def _apply_feature_func_sequential(
        func, 
        X_df: pd.DataFrame, 
        use_tqdm: bool = False,
        selected_features: set | None = None
    ) -> pd.DataFrame:
    """顺序应用单个特征函数"""

    all_ids = X_df.index.get_level_values("id").unique()
    iterator = (
        tqdm(all_ids, desc=f"Running {func.__name__} (sequentially)") 
        if use_tqdm else all_ids
    )

    results = [
        {**{'id': id_val}, **func(X_df.loc[id_val], selected_features=selected_features)}
        for id_val in iterator
    ]

    return pd.DataFrame(results).set_index('id')

def _apply_feature_func_parallel(
        func, 
        X_df: pd.DataFrame, 
        use_tqdm: bool = False,
        selected_features: set | None = None
    ) -> pd.DataFrame:
    """并行应用单个特征函数"""
    all_ids = X_df.index.get_level_values("id").unique()
    iterator = (
        tqdm(all_ids, desc=f"Running {func.__name__} (parallel)")
        if use_tqdm else all_ids
    )
    results = Parallel(n_jobs=config.N_JOBS)(
        delayed(lambda df_id, id_val: {**{'id': id_val}, **func(df_id, selected_features=selected_features)})(X_df.loc[id_val], id_val)
        for id_val in iterator
    )
    return pd.DataFrame(results).set_index('id')

def _apply_transform_func(func, X_df: pd.DataFrame) -> List[pd.DataFrame]:
    """执行变换函数"""
    return func(X_df)

def apply_transformation(X_df: pd.DataFrame, transform_funcs: List[str] = None) -> Dict[str, pd.DataFrame]:
    """
    应用时序变换
    
    Args:
        X_df: 输入数据框
        transform_funcs: 要应用的变换函数名称列表，如果为None则应用所有注册的变换函数
        
    Returns:
        Dict[str, pd.DataFrame]: 键为模态名称，值为对应的数据框
    """
    if transform_funcs is None:
        transform_funcs = list(TRANSFORM_REGISTRY.keys())
    
    # 验证变换函数是否存在
    valid_transform_funcs = []
    for func_name in transform_funcs:
        if func_name not in TRANSFORM_REGISTRY:
            pass
            # logger.warning(f"变换函数 {func_name} 未在注册表中找到，已跳过。")
        else:
            valid_transform_funcs.append(func_name)
    
    transform_funcs = valid_transform_funcs
    
    # 存储所有模态的数据框
    transformed_data = {}
    
    for func_name in transform_funcs:
        # logger.info(f"--- 开始应用变换函数: {func_name} ---")
        start_time = time.time()
        
        transform_info = TRANSFORM_REGISTRY[func_name]
        func = transform_info['func']
        output_mode_names = transform_info['output_mode_names']
        
        # 执行变换
        transformed_results = _apply_transform_func(func, X_df)
        
        # 存储结果
        for mode_name, mode_df in zip(output_mode_names, transformed_results):
            transformed_data[mode_name] = mode_df
        
        duration = time.time() - start_time
        # logger.info(f"'{func_name}' 变换完毕，耗时: {duration:.2f} 秒，生成模态: {output_mode_names}")
    
    return transformed_data

def clean_feature_name(name: str, prefix: str = "f") -> str:
    """清理单个特征名称，确保它是合法的标识符。"""
    # 替换非法字符为 _
    cleaned = re.sub(r"[^\w]", "_", name)
    # 防止开头是数字
    if re.match(r"^\d", cleaned):
        cleaned = f"{prefix}_{cleaned}"
    # 多个连续 _ 合并为一个
    cleaned = re.sub(r"__+", "_", cleaned)
    return cleaned

def clean_feature_names(df: pd.DataFrame, prefix: str = "f") -> pd.DataFrame:
    """清理特征名称，确保它们是合法的列名。"""
    cleaned_columns = []
    for i, col in enumerate(df.columns):
        # 替换非法字符为 _
        cleaned = re.sub(r'[^\w]', '_', col)
        # 防止开头是数字（如 "123_feature"）非法
        if re.match(r'^\d', cleaned):
            cleaned = f"{prefix}_{cleaned}"
        # 多个连续 _ 合并为一个
        cleaned = re.sub(r'__+', '_', cleaned)
        cleaned_columns.append(cleaned)
    df.columns = cleaned_columns
    return df

In [10]:
# --- feature.py 特征管理工具 ---
def _get_latest_feature_file() -> Path | None:
    """查找并返回最新的特征文件路径"""
    # 获取特征文件目录下的所有特征文件
    feature_files = list(config.FEATURE_DIR.glob('features_*.parquet'))
    # 如果没有特征文件，返回None
    if not feature_files:
        return None
    return max(feature_files, key=lambda p: p.stat().st_mtime)

def _load_feature_file(file_path: Path):
    """加载指定的特征文件及其元数据。"""
    if not file_path or not file_path.exists():
        return pd.DataFrame(), {}
    try:
        table = pd.read_parquet(file_path)
        metadata_str = table.attrs.get('feature_metadata', '{}')
        metadata = json.loads(metadata_str)
        return table, metadata
    except Exception as e:
        logger.warning(f"无法加载特征文件 {file_path}: {e}。")
        return pd.DataFrame(), {}

def _load_feature_dict_file(file_path: Path):
    """加载字典格式的特征文件及其元数据。"""
    if not file_path or not file_path.exists():
        return {}, {}
    try:
        # 加载主文件获取元数据
        main_table = pd.read_parquet(file_path)
        metadata_str = main_table.attrs.get('feature_metadata', '{}')
        metadata = json.loads(metadata_str)
        
        # 加载字典格式的特征数据
        feature_dict = {}
        base_name = file_path.stem  # 去掉扩展名
        
        # 查找所有相关的特征文件
        for data_id_file in file_path.parent.glob(f"{base_name}_id_*.parquet"):
            # 从文件名提取数据ID
            data_id = data_id_file.stem.split('_id_')[-1]
            feature_dict[data_id] = pd.read_parquet(data_id_file)
        
        # 如果没有找到分离的文件，尝试从主文件加载（向后兼容）
        if not feature_dict and not main_table.empty:
            feature_dict["0"] = main_table
            
        return feature_dict, metadata
    except Exception as e:
        logger.warning(f"无法加载字典格式特征文件 {file_path}: {e}。")
        return {}, {}

def load_features(feature_file: str = None, data_ids: list = None) -> tuple[pd.DataFrame | None, str | None]:
    """加载指定的或最新的特征文件，并拼接指定数据ID的特征数据。
    
    Args:
        feature_file (str, optional): 特征文件名。如果未指定，将加载最新版本。
        data_ids (list, optional): 要使用的数据ID列表，例如["0", "1"]。如果未指定，默认使用["0"]。
    
    Returns:
        tuple: (拼接后的特征数据, 文件名) 或 (None, None)
    """
    # 使用一个临时的logger，避免依赖全局logger
    import logging
    temp_logger = logging.getLogger('load_features')
    if not temp_logger.handlers:
        temp_logger.addHandler(logging.StreamHandler())
        temp_logger.setLevel(logging.INFO)

    if feature_file:
        path_to_load = config.FEATURE_DIR / feature_file
    else:
        temp_logger.info("未指定特征文件，将尝试加载最新版本。")
        path_to_load = _get_latest_feature_file()

    if not path_to_load or not path_to_load.exists():
        temp_logger.error(f"无法找到要加载的特征文件: {path_to_load}")
        return None, None

    temp_logger.info(f"正在从 {path_to_load.name} 加载特征...")
    
    # 如果未指定data_ids，默认使用["0"]
    if data_ids is None:
        data_ids = ["0"]
    
    # 尝试加载字典格式的特征文件
    try:
        feature_dict, _ = _load_feature_dict_file(path_to_load)
        temp_logger.info(f"加载字典格式特征文件成功，包含数据ID: {list(feature_dict.keys())}")
        
        # 检查请求的数据ID是否存在
        available_ids = list(feature_dict.keys())
        missing_ids = [id for id in data_ids if id not in available_ids]
        if missing_ids:
            temp_logger.warning(f"请求的数据ID {missing_ids} 在特征文件中不存在，可用的ID: {available_ids}")
            # 只使用存在的ID
            data_ids = [id for id in data_ids if id in available_ids]
            if not data_ids:
                temp_logger.error("没有可用的数据ID")
                return None, None
        
        # 拼接指定数据ID的特征数据
        feature_dfs = []
        for data_id in data_ids:
            df = feature_dict[data_id].copy()
            feature_dfs.append(df)
        
        # 按行拼接（concat along axis=0），保持特征列数不变
        if len(feature_dfs) == 1:
            concatenated_df = feature_dfs[0]
        else:
            concatenated_df = pd.concat(feature_dfs, axis=0, ignore_index=False)
        
        total_features = len(concatenated_df.columns)
        total_rows = len(concatenated_df)
        temp_logger.info(f"特征拼接成功，使用数据ID: {data_ids}，共 {total_features} 个特征，{total_rows} 行数据。")
        return concatenated_df, path_to_load.name
                
    except Exception:
        # 回退到旧格式
        feature_df, _ = _load_feature_file(path_to_load)
        
        if feature_df.empty:
            return None, None
        
        # 对于旧格式，只能返回单个DataFrame（相当于数据ID "0"）
        if "0" in data_ids:
            temp_logger.info(f"特征加载成功（旧格式），共 {len(feature_df.columns)} 个特征。")
            return feature_df, path_to_load.name
        else:
            temp_logger.warning(f"旧格式特征文件只支持数据ID '0'，但请求的是 {data_ids}")
            return None, None

In [11]:
# --- [Train&Infer] 特征工具 ---
def extract_raw_features(feat):
    raw_parts = []
    trans_mode_to_run = []
    for func_name in TRANSFORM_REGISTRY.keys():
        trans_mode_to_run.extend(TRANSFORM_REGISTRY[func_name]["output_mode_names"])
    
    # 找到所有mode_flag的位置
    flag_positions = []
    for flag in trans_mode_to_run:
        start = 0
        while True:
            pos = feat.find(f'_{flag}_', start)
            if pos == -1:
                break
            flag_positions.append((pos + 1, flag))  # +1 to skip the leading underscore
            start = pos + 1
    
    # 按位置排序
    flag_positions.sort()
    
    # 根据位置切分特征
    for i, (pos, flag) in enumerate(flag_positions):
        # 确定当前特征的结束位置
        if i + 1 < len(flag_positions):
            end_pos = flag_positions[i + 1][0] - 1  # -1 to exclude the underscore
            raw_part = feat[pos:end_pos]
        else:
            raw_part = feat[pos:]
        raw_parts.append(raw_part)
    
    # print(raw_parts)
    return raw_parts

def extract_trans_funcs_dict(
        trans_mode_to_run: list = None, 
    ):
    if trans_mode_to_run is None:
        trans_mode_to_run = []
        for func_name in TRANSFORM_REGISTRY.keys():
            trans_mode_to_run.extend(TRANSFORM_REGISTRY[func_name]["output_mode_names"])
        logger.warning(f'变换模式: {trans_mode_to_run}')

    # 1. 提取trans-funcs对
    trans_funcs_dict = {}
    trans_feats_dict = {}
    raw_feat_name = []
    # 提取原始特征名-去除交互操作
    operator_flags = sorted(config.OPERATOR_FLAGS, key=len, reverse=True)
    for feat in config.REMAIN_FEATURES:
        matched_flag = next((flag for flag in operator_flags if feat.startswith(flag)), None)
        if matched_flag is not None:
            raw_parts = extract_raw_features(feat)
            raw_feat_name.extend(raw_parts)
        else:
            raw_feat_name.append(feat)

    # 提取原始特征名-去除特征计算操作
    feats_flags = sorted(config.FEAT_FLAGS, key=len, reverse=True)
    sorted_feats_flags = sorted(feats_flags, key=len, reverse=True)
    def clean_end(feat: str) -> str:
        for flag in sorted_feats_flags:
            if feat.endswith(flag):
                return feat[: -len(flag)]
        return feat  # 如果没有匹配到后缀，原样返回
    raw_feat_name = [clean_end(feat) for feat in raw_feat_name]

    # 记录
    for feat in raw_feat_name:
        parts = feat.split('_')
        trans_mode, func_mode = parts[0], parts[1]
        feat_name = '_'.join(parts[2:])
        if trans_mode in trans_mode_to_run:
            trans_funcs_dict.setdefault(trans_mode, set()).add(func_mode)
            trans_feats_dict.setdefault(trans_mode, set()).add(feat_name)
    # trans_funcs_dict = {k: sorted(list(v)) for k, v in trans_funcs_dict.items()}
    # trans_feats_dict = {k: sorted(list(v)) for k, v in trans_feats_dict.items()}
    logger.warning(f'变换-特征函数匹配: {trans_funcs_dict}')
    logger.warning(f'变换-特征名称匹配: {trans_feats_dict}')
    return trans_funcs_dict, trans_feats_dict

In [12]:
# --- [Train] 加载已计算特征 ---
def load_precalculated_features(model_directory_path):
    """Load all LightGBM model files saved with joblib and prepare them for ensemble"""
    # feature
    feature_file_name = None
    data_ids = None
    feature_df, loaded_feature_name = load_features(feature_file_name, data_ids=data_ids)
    logger.info(f"Successfully loaded features from: {loaded_feature_name}")
    if feature_df is not None:
        missing_features = [f for f in config.REMAIN_FEATURES if f not in feature_df.columns]
        if missing_features:
            logger.warning(f"Missing REMAIN_FEATURES in <{data_id}> before filter: {missing_features}")
        feature_df = feature_df[config.REMAIN_FEATURES]
    else:
        feature_df = pd.DataFrame()

    # y_train
    try:
        y_train_loaded = pd.read_parquet(f"{model_directory_path}/y_train_head9901.parquet")
    except:
        y_train_loaded = pd.Series()

    return feature_df, loaded_feature_name, y_train_loaded

def filter_unloaded_ids(X_train, y_train, loaded_index):
    """过滤掉已计算过特征的 id，同时保持 MultiIndex 格式 (X_train) 和对齐的 y_train。"""
    remaining_ids = X_train.index.get_level_values("id").unique().difference(loaded_index)

    X_train_filtered = X_train.loc[remaining_ids]
    y_train_filtered = y_train.loc[remaining_ids]

    return X_train_filtered, y_train_filtered

In [13]:
# --- [Train] 生成特征 ---
def generate_features(
        X_data, 
        funcs_to_run: list = None, 
        trans_to_run: list = None, 
        use_tqdm: bool = False,
        parallel: bool = False,
        trans_funcs_dict: dict = None,
        trans_feats_dict: dict = None,
    ):
    """
    生成指定的特征，或者如果未指定，则生成所有已注册的特征。
    可以基于一个现有的特征文件进行增量更新。
    现在支持字典格式的输入数据和特征存储。

    Args:
        X_data: 输入数据，可以是:
            - pd.DataFrame: 单个数据框（向后兼容）
            - dict: 字典格式，键为数据ID（"0"表示原始数据，"1"、"2"等表示增强数据），值为对应的数据框
        funcs_to_run (list, optional): 要运行的特征函数名称列表。
            如果为 None，则运行所有在 `FEATURE_REGISTRY` 中注册的、且不在 `EXPERIMENTAL_FEATURES` 中的函数。
        trans_to_run (list, optional): 要运行的变换函数名称列表。
        base_feature_file (str, optional): 基础特征文件名。如果提供，
            将加载此文件并在此基础上添加或更新特征。否则，将创建一个新的特征集。
    """
    # utils.ensure_feature_dirs()
    
    # 处理输入数据格式
    if isinstance(X_data, pd.DataFrame):
        # 向后兼容：单个数据框转换为字典格式
        X_data_dict = {"0": X_data}
        logger.info("输入为单个数据框，已转换为字典格式（数据ID: '0'）")
    elif isinstance(X_data, dict):
        X_data_dict = X_data
        logger.info(f"输入为字典格式，包含数据ID: {list(X_data_dict.keys())}")
    else:
        raise ValueError("X_data必须是pd.DataFrame或dict类型")
    
    if funcs_to_run is None:
        # 如果未指定函数，则运行所有非实验性特征
        funcs_to_run = [
            f for f in FEATURE_REGISTRY.keys() 
            if f not in config.EXPERIMENTAL_FEATURES
        ]
        logger.info(f"未指定特征函数，将运行所有 {len(funcs_to_run)} 个非实验性特征。")
    
    # 验证请求的函数是否都已注册
    valid_funcs_to_run = []
    for func_name in funcs_to_run:
        if func_name not in FEATURE_REGISTRY:
            logger.warning(f"函数 {func_name} 未在注册表中找到，已跳过。")
        else:
            valid_funcs_to_run.append(func_name)
    
    funcs_to_run = valid_funcs_to_run
    feature_dict, metadata = {}, {}
    
    # 确保每个数据ID都有对应的特征DataFrame
    for data_id in X_data_dict.keys():
        if data_id not in feature_dict:
            # 获取该数据ID的唯一ID列表
            unique_ids = X_data_dict[data_id].index.get_level_values('id').unique()
            feature_dict[data_id] = pd.DataFrame(index=unique_ids)
            logger.info(f"为数据ID '{data_id}' 创建新的特征DataFrame，包含 {len(unique_ids)} 个样本")
    
    logger.info(f"基础特征字典包含数据ID: {list(feature_dict.keys())}")
    for data_id, df in feature_dict.items():
        logger.info(f"  数据ID '{data_id}': {df.shape}")

    if trans_funcs_dict is None or trans_feats_dict is None:
        trans_funcs_dict, trans_feats_dict = extract_trans_funcs_dict()

    # 3. 为每个数据ID生成特征
    initial_feature_counts = {data_id: len(df.columns) for data_id, df in feature_dict.items()}
    
    for data_id, X_df in X_data_dict.items():
        logger.info(f"=== 开始为数据ID '{data_id}' 生成特征 ===")
        
        # 时序分解
        logger.info(f"--- 开始时序分解（数据ID: {data_id}） ---")
        transformed_data = apply_transformation(X_df, trans_to_run)
        logger.info(f"分解完成，共生成 {len(transformed_data)} 个模态: {list(transformed_data.keys())}")
        
        # 获取当前数据ID的特征DataFrame
        current_feature_df = feature_dict[data_id]
        loaded_features = current_feature_df.columns.tolist()
        
        # 逐个生成新特征并更新
        for mode_name, mode_df in transformed_data.items():
            logger.info(f"=== 开始为数据ID '{data_id}' 的模态 '{mode_name}' 生成特征 ===")
            selected_features = trans_feats_dict.get(mode_name, set())
            for func_name in funcs_to_run:
                logger.info(f"--- 开始生成特征: {func_name} ---")
                start_time = time.time()
                
                feature_info = FEATURE_REGISTRY[func_name]
                func = feature_info['func']
                is_parallelizable = feature_info['parallelizable']
                func_id = feature_info['func_id']
                if func_id not in trans_funcs_dict[mode_name]:
                    logger.info(f"函数 '{func_name}' 已跳过。")
                    continue
                
                if is_parallelizable and parallel:
                    new_features_df = _apply_feature_func_parallel(func, mode_df, use_tqdm, selected_features=selected_features)
                else:
                    logger.info(f"函数 '{func_name}' 不可并行化，将顺序执行。")
                    new_features_df = _apply_feature_func_sequential(func, mode_df, use_tqdm, selected_features=selected_features)
                new_features_df.columns = [f"{mode_name}_{func_id}_{col}" for col in new_features_df.columns]
                new_features_df = clean_feature_names(new_features_df)

                # 记录日志
                duration = time.time() - start_time
                logger.info(f"'{func_name}' 生成完毕，耗时: {duration:.2f} 秒。")
                logger.info(f"  新生成特征列名: {new_features_df.columns.tolist()}")
                
                for col in new_features_df.columns:
                    null_ratio = new_features_df[col].isnull().sum() / len(new_features_df)
                    zero_ratio = (new_features_df[col] == 0).sum() / len(new_features_df)
                    logger.info(f"    - '{col}': 空值比例={null_ratio:.2%}, 零值比例={zero_ratio:.2%}")

                # 删除旧版本特征（如果存在），然后合并
                current_feature_df = current_feature_df.drop(columns=new_features_df.columns, errors='ignore')
                current_feature_df = current_feature_df.merge(new_features_df, left_index=True, right_index=True, how='left')
                loaded_features = current_feature_df.columns.tolist()
        
        # 更新特征字典
        feature_dict[data_id] = current_feature_df
        logger.info(f"数据ID '{data_id}' 特征生成完成，最终特征数: {len(current_feature_df.columns)}")

    return feature_dict, metadata

In [14]:
# --- [Train&Infer] 特征交互核心逻辑 ---
def extract_and_generate_interaction_features(
        feature_dict: dict, 
    ):
    """
    根据特征重要性文件生成交互特征。
    支持字典格式的特征数据。

    Args:
        feature_dict (dict): 特征数据框。
    """
    # 1. 提取交互对
    raw_feat_name = []
    interaction_pairs = {}
    operator_flags = sorted(config.OPERATOR_FLAGS, key=len, reverse=True)
    for flag in operator_flags:
        interaction_pairs[flag] = []
    for feat in config.REMAIN_FEATURES:
        matched_flag = next((flag for flag in operator_flags if feat.startswith(flag)), None)
        if matched_flag is not None:
            raw_parts = extract_raw_features(feat)
            raw_feat_name.extend(raw_parts)
            interaction_pairs[matched_flag].append(tuple(raw_parts))  # 转为元组
        else:
            raw_feat_name.append(feat)

    # 2. 检查是否有特征缺失
    for data_id, feature_df in feature_dict.items():
        missing_features = [f for f in raw_feat_name if f not in feature_df.columns]
        if missing_features:
            logger.warning(f"Missing 【RAW FEATURES】 in <{data_id}>: {missing_features}")

    # 3. 为每个数据ID生成交互特征
    updated_feature_dict = {}
    all_interaction_features = []
    epsilon = 1e-6
    zscore_cache = {}
    asinh_feature_cache = {}
    asinh_zscore_cache = {}
    def compute_zscore(series: pd.Series) -> pd.Series:
        std = series.std(ddof=0)
        if pd.isna(std) or std < epsilon:
            return pd.Series(0.0, index=series.index)
        return (series - series.mean()) / std
    def get_zscore(name: str) -> pd.Series:
        if name not in zscore_cache:
            zscore_cache[name] = compute_zscore(feature_df[name].astype(float))
        return zscore_cache[name]
    def get_asinh_series(name: str) -> pd.Series:
        if name not in asinh_feature_cache:
            asinh_feature_cache[name] = np.arcsinh(feature_df[name].astype(float))
            asinh_zscore_cache[name] = compute_zscore(asinh_feature_cache[name])
        return asinh_feature_cache[name]
    def get_asinh_zscore(name: str) -> pd.Series:
        if name not in asinh_feature_cache:
            get_asinh_series(name)
        return asinh_zscore_cache[name]

    for data_id, feature_df in feature_dict.items():
        logger.info(f"\n为数据ID '{data_id}' 生成交互特征...")
        
        # 创建交互特征 - 使用字典收集所有特征，避免DataFrame碎片化
        interaction_features_dict = {}

        # 根据提取的交互对进行高效特征交互
        for operator, pairs in interaction_pairs.items():
            for f1, f2 in pairs:
                if operator == 'mul':
                    interaction_features_dict[f'mul_{f1}_{f2}'] = feature_df[f1] * feature_df[f2]
                elif operator == 'sqmul':
                    interaction_features_dict[f'sqmul_{f1}_{f2}'] = feature_df[f1] * (feature_df[f2] ** 2)
                elif operator == 'sub':
                    interaction_features_dict[f'sub_{f1}_{f2}'] = feature_df[f1] - feature_df[f2]
                elif operator == 'add':
                    interaction_features_dict[f'add_{f1}_{f2}'] = feature_df[f1] + feature_df[f2]
                elif operator == 'div':
                    interaction_features_dict[f'div_{f1}_{f2}'] = feature_df[f1] / (feature_df[f2] + epsilon)
                elif operator == 'sq':
                    interaction_features_dict[f'sq_{f1}'] = feature_df[f1] ** 2
                elif operator == 'cross_mul':
                    interaction_features_dict[f'cross_mul_{f1}_{f2}'] = feature_df[f1] * feature_df[f2]
                elif operator == 'cross_sqmul':
                    interaction_features_dict[f'cross_sqmul_{f1}_{f2}'] = feature_df[f1] * (feature_df[f2] ** 2)
                elif operator == 'cross_add':
                    interaction_features_dict[f'cross_add_{f1}_{f2}'] = feature_df[f1] + feature_df[f2]
                elif operator == 'cross_sub':
                    interaction_features_dict[f'cross_sub_{f1}_{f2}'] = feature_df[f1] - feature_df[f2]
                elif operator == 'cross_div':
                    interaction_features_dict[f'cross_div_{f1}_{f2}'] = feature_df[f1] / (feature_df[f2] + epsilon)
                elif operator == 'asinh_add':
                    asinh_z_f1 = get_asinh_zscore(f1)
                    asinh_z_f2 = get_asinh_zscore(f2)
                    interaction_features_dict[f'asinh_add_{f1}_{f2}'] = asinh_z_f1 + asinh_z_f2
                elif operator == 'asinh_sub':
                    asinh_z_f1 = get_asinh_zscore(f1)
                    asinh_z_f2 = get_asinh_zscore(f2)
                    interaction_features_dict[f'asinh_sub_{f1}_{f2}'] = asinh_z_f1 - asinh_z_f2
                elif operator == 'asinh_add_raw':
                    asinh_z_f1 = get_asinh_zscore(f1)
                    z_f2 = get_zscore(f2)
                    interaction_features_dict[f'asinh_add_raw_{f1}_{f2}'] = asinh_z_f1 + z_f2
                elif operator == 'asinh_sub_raw':
                    asinh_z_f1 = get_asinh_zscore(f1)
                    z_f2 = get_zscore(f2)
                    interaction_features_dict[f'asinh_sub_raw_{f1}_{f2}'] = asinh_z_f1 - z_f2
                elif operator == 'raw_add_asinh':
                    z_f1 = get_zscore(f1)
                    asinh_z_f2 = get_asinh_zscore(f2)
                    interaction_features_dict[f'raw_add_asinh_{f1}_{f2}'] = z_f1 + asinh_z_f2
                elif operator == 'raw_sub_asinh':
                    z_f1 = get_zscore(f1)
                    asinh_z_f2 = get_asinh_zscore(f2)
                    interaction_features_dict[f'raw_sub_asinh_{f1}_{f2}'] = z_f1 - asinh_z_f2
                elif operator == 'asinh_mul':
                    asinh_f1 = get_asinh_series(f1)
                    interaction_features_dict[f'asinh_mul_{f1}_{f2}'] = asinh_f1 * feature_df[f2]
                elif operator == 'raw_mul_asinh':
                    asinh_f2 = get_asinh_series(f2)
                    interaction_features_dict[f'raw_mul_asinh_{f1}_{f2}'] = feature_df[f1] * asinh_f2
                elif operator == 'norm_add':
                    norm_f1 = get_zscore(f1)
                    norm_f2 = get_zscore(f2)
                    interaction_features_dict[f'norm_add_{f1}_{f2}'] = norm_f1 + norm_f2
                elif operator == 'norm_sub':
                    norm_f1 = get_zscore(f1)
                    norm_f2 = get_zscore(f2)
                    interaction_features_dict[f'norm_sub_{f1}_{f2}'] = norm_f1 - norm_f2

        # 一次性创建DataFrame，避免碎片化
        if interaction_features_dict:
            interaction_features = pd.DataFrame(interaction_features_dict, index=feature_df.index)
        else:
            interaction_features = pd.DataFrame(index=feature_df.index)
        
        if interaction_features.empty:
            logger.info(f"数据ID '{data_id}' 没有选择任何交互项类型，跳过。")
            updated_feature_dict[data_id] = feature_df.copy()
            continue
        
        logger.info(f"  数据ID '{data_id}' 成功创建 {len(interaction_features.columns)} 个交互特征")
        
        # 合并特征
        updated_feature_df = feature_df.drop(columns=interaction_features.columns, errors='ignore')
        updated_feature_df = updated_feature_df.merge(interaction_features, left_index=True, right_index=True, how='left')
        updated_feature_df = clean_feature_names(updated_feature_df, prefix="f_inter")
        
        updated_feature_dict[data_id] = updated_feature_df

    return updated_feature_dict

In [15]:
def train(
        X_train: pd.DataFrame,
        y_train: pd.Series,
        model_directory_path: str,
    ): 
    global logger, log_file_path
    logger, log_file_path = get_logger('Train', Path(os.path.join(model_directory_path, 'train_logs')), verbose=False)
    global config
    config.PROJECT_ROOT = Path(model_directory_path)
    config.FEATURE_DIR = config.PROJECT_ROOT
    run_output_dir = Path(model_directory_path)

    try:
        # precalculated features
        loaded_feature_df, loaded_feature_name, loaded_y_train = load_precalculated_features(model_directory_path)
        loaded_id = loaded_feature_df.index
        
        # data.py
        X_data = {}
        y_data = {}
        X_train, y_train = filter_unloaded_ids(X_train, y_train, loaded_id)
        X_data["0"] = X_train
        if isinstance(y_train, pd.Series):
            y_train = y_train.to_frame('structural_breakpoint')
        if isinstance(loaded_y_train, pd.Series):
            loaded_y_train = loaded_y_train.to_frame('structural_breakpoint')
        y_data["0"] = y_train
        logger.warning(f"训练数据切片. Load -> feature shape: {loaded_feature_df.shape}, y shape: {loaded_y_train.shape}")
        logger.warning(f"训练数据切片. New -> X shape: {X_train.shape}, y shape: {y_train.shape}")

        # feature.py
        if X_train.shape[0] > 0:
            feature_dict, metadata = generate_features(X_data, use_tqdm=True, parallel=True)
            feature_dict = extract_and_generate_interaction_features(feature_dict)
            for data_id, feature_df in feature_dict.items():
                missing_features = [f for f in config.REMAIN_FEATURES if f not in feature_df.columns]
                if missing_features:
                    logger.warning(f"Missing REMAIN_FEATURES in <{data_id}> before filter: {missing_features}")

            # 拼接特征数据
            data_ids = list(feature_dict.keys())
            feature_dfs = []
            for data_id in data_ids:
                df = feature_dict[data_id].copy()
                feature_dfs.append(df)
            if len(feature_dfs) == 1:
                concatenated_df = feature_dfs[0]
            else:
                concatenated_df = pd.concat(feature_dfs, axis=0, ignore_index=False)
            feature_df = concatenated_df[config.REMAIN_FEATURES]
        else:
            feature_df = pd.DataFrame(index=X_train.index)
            data_ids = ["0"]

        if loaded_feature_df.shape[0] > 0:
            logger.warning(f"加载特征id范围: {loaded_id.min()} ~ {loaded_id.max()}")
            logger.warning(f"新增特征id范围: {feature_df.index.min()} ~ {feature_df.index.max()}")
            feature_df = pd.concat([loaded_feature_df, feature_df], axis=0, ignore_index=False)
        logger.warning("--- 完整特征形状 ---")
        logger.warning(feature_df.shape)
        logger.info("-----------------------------")
        logger.info(f"生成/更新完成。总特征数: {len(feature_df.columns)}")

        start_time = time.time()
        logger.info("Starting training and evaluation pipeline...")
        logger.info(f"Model Parameters: {json.dumps(config.LGBM_PARAMS, indent=4)}")

        # train.py
        # 1. 加载特征和标签
        y_train = pd.concat(list(y_data.values()), axis=0, ignore_index=False)
        if loaded_y_train.shape[0] > 0:
            logger.warning(f"加载标签id范围: {loaded_y_train.index.min()} ~ {loaded_y_train.index.max()}")
            logger.warning(f"新增标签id范围: {y_train.index.min()} ~ {y_train.index.max()}")
            y_train = pd.concat([loaded_y_train, y_train], axis=0, ignore_index=False)
        # 确保对齐
        common_index = feature_df.index.intersection(y_train.index)
        feature_df = feature_df.loc[common_index]
        y_train = y_train.loc[common_index]['structural_breakpoint'].astype(int)
        logger.warning(f"训练数据已对齐. X shape: {feature_df.shape}, y shape: {y_train.shape}")
        
        # # 2. 
        # 特征选择
        if len(config.REMAIN_FEATURES) > 0:
            feature_df = feature_df[config.REMAIN_FEATURES]
            for col in feature_df.columns:
                null_ratio = feature_df[col].isnull().sum() / len(feature_df)
                zero_ratio = (feature_df[col] == 0).sum() / len(feature_df)
                if null_ratio > 0.0 or zero_ratio > 0.5:
                    logger.warning(f"    - '{col}': 空值比例={null_ratio:.2%}, 零值比例={zero_ratio:.2%}")
        if feature_df is None:
            logger.error("特征加载失败，训练中止。")
            return None, None

        logger.info(f"--- 使用的特征列表 (共 {len(feature_df.columns)} 个) ---")
        logger.info(feature_df.columns.tolist())
        logger.info("-" * min(50, len(str(feature_df.columns.tolist()))))
        
        # 3. 模型训练
        oof_preds_dict = {}
        for n, model_name in enumerate(config.MODEL):
            logger.warning(f"Model: {model_name}")
            # 3. 交叉验证
            if config.TRAIN_STRATEGY == 'cv':
                logger.info("Starting 5-fold cross-validation with enhanced data strategy...")

                oof_preds = np.zeros(len(feature_df))
                models = []
                feature_importances = pd.DataFrame(index=feature_df.columns)
                permutation_results = pd.DataFrame(index=feature_df.columns)
                fold_metrics = []
                
                # 使用增强数据交叉验证策略
                cv_params = config.CV_PARAMS[n]
                cv_iterator = StratifiedKFold(
                    n_splits=cv_params['n_splits'],
                    shuffle=cv_params['shuffle'],
                    random_state=cv_params['random_state']
                ).split(feature_df, y_train)
                for fold, (train_idx, val_idx) in enumerate(cv_iterator):
                    logger.info(f"--- Fold {fold+1}/{config.CV_PARAMS[n]['n_splits']} ---")
                    fold_start_time = time.time()

                    X_train_fold, y_train_fold = feature_df.iloc[train_idx], y_train.iloc[train_idx]
                    X_val_fold, y_val_fold = feature_df.iloc[val_idx], y_train.iloc[val_idx]
                    logger.warning(f"训练数据: {X_train_fold.shape}, 验证数据: {X_val_fold.shape}")

                    # 配置模型
                    if model_name == 'LGB':
                        model = lgb.LGBMClassifier(**config.LGBM_PARAMS)
                        callbacks = []
                        if getattr(config, 'EARLY_STOPPING_ROUNDS', 0) and config.EARLY_STOPPING_ROUNDS > 0:
                            callbacks.append(lgb.early_stopping(config.EARLY_STOPPING_ROUNDS, verbose=False))
                        model.fit(
                            X_train_fold, y_train_fold,
                            eval_set=[(X_train_fold, y_train_fold), (X_val_fold, y_val_fold)],
                            eval_names=['train', 'valid'],
                            eval_metric='auc',
                            callbacks=callbacks
                        )
                        train_auc = model.best_score_['train']['auc']
                    elif model_name == 'CAT':
                        model = cat.CatBoostClassifier(**config.CAT_PARAMS)
                        model.fit(
                            X_train_fold, y_train_fold, 
                            eval_set=[(X_val_fold, y_val_fold)],
                            early_stopping_rounds=(config.EARLY_STOPPING_ROUNDS if getattr(config, 'EARLY_STOPPING_ROUNDS', 0) and config.EARLY_STOPPING_ROUNDS > 0 else None),
                            verbose=False
                        )
                        train_preds = model.predict_proba(X_train_fold)[:, 1]
                        # 确保y_train_fold是NumPy格式，兼容cuDF
                        y_train_fold_numpy = y_train_fold.to_numpy() if hasattr(y_train_fold, 'to_numpy') else y_train_fold
                        train_auc = roc_auc_score(y_train_fold_numpy, train_preds)
                    elif model_name == 'XGB':
                        if getattr(config, 'EARLY_STOPPING_ROUNDS', 0) and config.EARLY_STOPPING_ROUNDS > 0:
                            config.XGB_PARAMS['early_stopping_rounds'] = config.EARLY_STOPPING_ROUNDS
                        model = xgb.XGBClassifier(**config.XGB_PARAMS)
                        model.fit(
                            X_train_fold, y_train_fold, 
                            eval_set=[(X_train_fold, y_train_fold), (X_val_fold, y_val_fold)],
                            verbose=False
                        )
                        train_preds = model.predict_proba(X_train_fold)[:, 1]
                        train_auc = roc_auc_score(y_train_fold, train_preds)
                    else:
                        raise ValueError("Unknown model_name")

                    # 预测验证集
                    preds = model.predict_proba(X_val_fold)[:, 1]
                    if hasattr(preds, 'get'):
                        preds = preds.get()

                    oof_preds[val_idx] = preds
                    models.append(model)
                    feature_importances[f'fold_{fold+1}'] = model.feature_importances_
                    
                    fold_auc = roc_auc_score(y_val_fold, preds)
                    logger.warning(f"Fold {fold+1} Train AUC: {train_auc:.5f}, Val AUC: {fold_auc:.5f}")

                    # 记录早停的 step（best_iteration）
                    best_iteration = None
                    if model_name == 'LGB':
                        best_iteration = getattr(model, 'best_iteration_', None)
                    elif model_name == 'CAT':
                        try:
                            best_iteration = model.get_best_iteration()
                        except Exception:
                            best_iteration = getattr(model, 'best_iteration_', None)
                    elif model_name == 'XGB':
                        best_iteration = getattr(model, 'best_iteration', None)
                    logger.warning(f"Fold {fold+1} Early stopping step (best_iteration): {best_iteration}")

                    # 保存到元数据结构中
                    fold_metrics.append({
                        'fold': fold + 1,
                        'train_auc': float(train_auc),
                        'val_auc': float(fold_auc),
                        'best_iteration': int(best_iteration) if best_iteration is not None else None,
                    })

                    fold_duration = time.time() - fold_start_time
                    logger.warning(f"Fold {fold+1} finished in {fold_duration:.2f}s")

                overall_oof_auc = roc_auc_score(y_train, oof_preds)
                logger.warning(f"Overall OOF AUC: {overall_oof_auc:.5f}")
                oof_preds_dict[model_name] = oof_preds

                # 6. 保存模型
                for i, model in tqdm(enumerate(models), total=len(models), desc="Saving models"):
                    joblib.dump(model, run_output_dir / f'online_{model_name}_model_fold_{i+1}.pkl')
                logger.info("Models saved.")
        
            # 3. 多个全量模型
            elif config.TRAIN_STRATEGY == 'multi':
                logger.info("Starting single model training...")
                
                # 配置模型
                if model_name == 'LGB':
                    for i, params in enumerate(config.LGBM_PARAMS):
                        model = lgb.LGBMClassifier(**params)
                        model.fit(
                            feature_df, y_train,
                            eval_set=[(feature_df, y_train)],
                            eval_names=['train', 'valid'],
                            eval_metric='auc',
                        )
                        train_auc = model.best_score_['train']['auc']
                        logger.warning(f"Train AUC: {train_auc:.5f}")
                        joblib.dump(model, run_output_dir / f'online_{model_name}_model_{i}.pkl')
                        logger.info(f"Model {model_name} {i} saved.")
                elif model_name == 'CAT':
                    for i, params in enumerate(config.CAT_PARAMS):
                        model = cat.CatBoostClassifier(**params)
                        model.fit(
                            feature_df, y_train, 
                            eval_set=[(feature_df, y_train)],
                            verbose=False
                        )
                        train_preds = model.predict_proba(feature_df)[:, 1]
                        # 确保y_train_fold是NumPy格式，兼容cuDF
                        y_train_numpy = y_train.to_numpy() if hasattr(y_train, 'to_numpy') else y_train
                        train_auc = roc_auc_score(y_train_numpy, train_preds)
                        logger.warning(f"Train AUC: {train_auc:.5f}")
                        joblib.dump(model, run_output_dir / f'online_{model_name}_model_{i}.pkl')
                        logger.info(f"Model {model_name} {i} saved.")
                elif model_name == 'XGB':
                    for i, params in enumerate(config.XGB_PARAMS):
                        model = xgb.XGBClassifier(**params)
                        model.fit(
                            feature_df, y_train, 
                            eval_set=[(feature_df, y_train)],
                            verbose=False
                        )
                        train_preds = model.predict_proba(feature_df)[:, 1]
                        train_auc = roc_auc_score(y_train, train_preds)
                        logger.warning(f"Train AUC: {train_auc:.5f}")
                        joblib.dump(model, run_output_dir / f'online_{model_name}_model_{i}.pkl')
                        logger.info(f"Model {model_name} {i} saved.")
                else:
                    raise ValueError("Unknown model_name")

        # ensemble
        if config.TRAIN_STRATEGY == 'cv':
            oof_preds = np.zeros(len(feature_df))
            for model_name in config.MODEL:
                oof_preds += oof_preds_dict[model_name] / len(config.MODEL)
            overall_oof_auc = roc_auc_score(y_train, oof_preds)
            logger.warning(f"Ensemble: Overall OOF AUC: {overall_oof_auc:.5f}")
        
        duration = time.time() - start_time
        logger.warning(f"训练流程结束，总耗时: {duration:.2f} 秒。")
    
    except Exception as e:
        logger.error(f"训练过程中发生错误: {e}")

In [16]:
# --- [Infer] 加载模型 ---
def load_models(model_directory_path):
    """Load all LightGBM model files saved with joblib and prepare them for ensemble"""
    local_models = []
    online_models = []
    dirpath = Path(model_directory_path)
    model_files = list(dirpath.glob('*.pkl'))
    
    if not model_files:
        logger.warning(f"Warning: No model files found under {model_directory_path}!")
        return 
    logger.warning(f"Found a total of {len(model_files)} model files.")
    
    for model_path in model_files:
        try:
            logger.warning(f"Loading model: {model_path}")
            model = joblib.load(model_path)
            if 'local' in model_path.name:
                local_models.append(model)
            elif 'online' in model_path.name:
                online_models.append(model)
        except Exception as e:
            logger.warning(f"Error loading model {model_path}: {e}")
    
    if len(online_models) > 0:
        logger.warning(f"Loaded {len(online_models)} online models.")
        return online_models
    else:
        logger.warning("Warning: No online models loaded!")
        logger.warning(f"Loaded {len(local_models)} local models.")
        return local_models


In [17]:
# --- [Infer] 生成特征 ---
def generate_features_infer_parallel(
        X_data,
        funcs_to_run: list = None,
        trans_to_run: list = None,
        use_tqdm: bool = False,
        trans_funcs_dict: dict = None,
        trans_feats_dict: dict = None,
    ):
    """
    推理阶段：以“特征函数”为并行单元生成特征（一次只处理当前批的数据，通常只有一个样本）。
    返回结构与 generate_features 保持一致：{data_id: feature_df}, metadata
    """
    # 输入规范化
    if isinstance(X_data, pd.DataFrame):
        X_data_dict = {"0": X_data}
    elif isinstance(X_data, dict):
        X_data_dict = X_data
    else:
        raise ValueError("X_data必须是pd.DataFrame或dict类型")

    # 选择要运行的特征函数（默认：跳过实验性函数）
    if funcs_to_run is None:
        funcs_to_run = [
            f for f in FEATURE_REGISTRY.keys()
            if f not in config.EXPERIMENTAL_FEATURES
        ]
    valid_funcs_to_run = [f for f in funcs_to_run if f in FEATURE_REGISTRY]

    # trans-funcs 对齐
    if trans_funcs_dict is None or trans_feats_dict is None:
        trans_funcs_dict, trans_feats_dict = extract_trans_funcs_dict()

    feature_dict = {}
    metadata = {}

    for data_id, X_df in X_data_dict.items():
        unique_ids = X_df.index.get_level_values('id').unique()
        current_feature_df = pd.DataFrame(index=unique_ids)

        # 先进行时序变换（按变换函数顺序执行，计算量主要在后续特征函数）
        transformed_data = apply_transformation(X_df, trans_to_run)

        # for each mode 按(func)并行地生成特征
        for mode_name, mode_df in transformed_data.items():
            # s = time.time()
            allowed_func_ids = trans_funcs_dict.get(mode_name, {})
            funcs_for_mode = [
                fname for fname in valid_funcs_to_run
                if FEATURE_REGISTRY[fname]['func_id'] in allowed_func_ids
            ]
            if not funcs_for_mode:
                continue
            selected_features = trans_feats_dict.get(mode_name, set())

            def run_single_feature(func_name):
                feature_info = FEATURE_REGISTRY[func_name]
                func = feature_info['func']
                func_id = feature_info['func_id']
                try:
                    # 逐 id 顺序计算，避免在小样本上产生额外进程/序列化开销
                    df_res = _apply_feature_func_sequential(func, mode_df, use_tqdm=False, selected_features=selected_features)
                    df_res.columns = [f"{mode_name}_{func_id}_{col}" for col in df_res.columns]
                    df_res = clean_feature_names(df_res)
                except Exception as e:
                    logger.warning(f"特征函数 {func_name} 失败: {e}")
                    df_res = pd.DataFrame(index=mode_df.index.get_level_values('id').unique())
                return df_res

            new_feature_dfs = Parallel(n_jobs=config.N_JOBS, prefer="threads")(
                delayed(run_single_feature)(fname) for fname in funcs_for_mode
            )

            if len(new_feature_dfs) > 0:
                merged_mode_df = pd.concat(new_feature_dfs, axis=1)
                current_feature_df = current_feature_df.drop(columns=merged_mode_df.columns, errors='ignore')
                current_feature_df = current_feature_df.merge(merged_mode_df, left_index=True, right_index=True, how='left')
            # e = time.time()
            # print(f"数据ID '{data_id}' 的模态 '{mode_name}' 特征生成完成，耗时: {e - s:.2f} 秒，共生成 {len(merged_mode_df.columns) if len(new_feature_dfs) > 0 else 0} 个特征")

        feature_dict[data_id] = current_feature_df

        # # 按(mode, func)并行地生成特征
        # tasks = []
        # for mode_name, mode_df in transformed_data.items():
        #     allowed_func_ids = set(trans_funcs_dict.get(mode_name, []))
        #     funcs_for_mode = [
        #         fname for fname in valid_funcs_to_run
        #         if FEATURE_REGISTRY[fname]['func_id'] in allowed_func_ids
        #     ]
        #     for func_name in funcs_for_mode:
        #         tasks.append((mode_name, mode_df, func_name))

        # def run_mode_func(mode_name, mode_df, func_name):
        #     feature_info = FEATURE_REGISTRY[func_name]
        #     func = feature_info['func']
        #     func_id = feature_info['func_id']
        #     try:
        #         df_res = _apply_feature_func_sequential(func, mode_df, use_tqdm=False)
        #         df_res.columns = [f"{mode_name}_{func_id}_{col}" for col in df_res.columns]
        #         df_res = clean_feature_names(df_res)
        #     except Exception as e:
        #         logger.warning(f"特征函数 {func_name} 在模态 {mode_name} 上失败: {e}")
        #         df_res = pd.DataFrame(index=mode_df.index.get_level_values('id').unique())
        #     return df_res

        # # 并行执行 (mode, func) 任务
        # # s = time.time()
        # new_feature_dfs = Parallel(n_jobs=config.N_JOBS, prefer="threads")(
        #     delayed(run_mode_func)(mode_name, mode_df, func_name) for mode_name, mode_df, func_name in tasks
        # )

        # # 合并结果
        # if len(new_feature_dfs) > 0:
        #     merged_mode_df = pd.concat(new_feature_dfs, axis=1)
        #     current_feature_df = current_feature_df.drop(columns=merged_mode_df.columns, errors='ignore')
        #     current_feature_df = current_feature_df.merge(
        #         merged_mode_df, left_index=True, right_index=True, how='left'
        #     )
        # # e = time.time()
        # # print(f"数据ID '{data_id}' 全部模态特征生成完成，耗时: {e - s:.2f} 秒，"
        # #       f"共生成 {len(merged_mode_df.columns) if len(new_feature_dfs) > 0 else 0} 个特征")
        
        # feature_dict[data_id] = current_feature_df

    return feature_dict, metadata

In [18]:
def infer(
        X_test: typing.Iterable[pd.DataFrame],
        model_directory_path: str,
    ):
    global logger, log_file_path
    logger, log_file_path = get_logger('Inference', Path(os.path.join(model_directory_path, 'infer_logs')), verbose=False)
    global config
    config.PROJECT_ROOT = Path(model_directory_path)
    config.FEATURE_DIR = config.PROJECT_ROOT
    
    # 加载模型
    models = load_models(model_directory_path)
    # 加载各变换应运行的函数映射
    trans_funcs_dict, trans_feats_dict = extract_trans_funcs_dict()

    yield  # Ready

    # X_test 只能迭代一次；拿到一条就立刻算、立刻推理
    for X_df in tqdm(X_test, desc="Inference Progress"):
        X_data = {"0": X_df}
        # st = time.time()
        feature_dict, metadata = generate_features_infer_parallel(
            X_data, use_tqdm=False, trans_funcs_dict=trans_funcs_dict, trans_feats_dict=trans_feats_dict
        )
        feature_dict = extract_and_generate_interaction_features(feature_dict)
        for data_id, feature_df in feature_dict.items():
            missing_features = [f for f in config.REMAIN_FEATURES if f not in feature_df.columns]
            if missing_features:
                logger.warning(f"Missing REMAIN_FEATURES in <{data_id}> before filter: {missing_features}")
        # et = time.time()
        # print(f'特征用时 {et-st}')

        # 拼接特征数据
        data_ids = list(feature_dict.keys())
        feature_dfs = [feature_dict[data_id].copy() for data_id in data_ids]
        concatenated_df = feature_dfs[0] if len(feature_dfs) == 1 else pd.concat(feature_dfs, axis=0, ignore_index=False)
        feature_df = concatenated_df[config.REMAIN_FEATURES]
        logger.info(feature_df)
        logger.info("--- 生成后完整特征列表 ---")
        logger.info(f"{feature_df.columns.tolist()}")
        logger.info("-----------------------------")
        logger.info(f"生成/更新完成。总特征数: {len(feature_df.columns)}")

        def ensemble_predict(models, X):
            preds = [model.predict_proba(X)[:, 1] for model in models]
            if len(preds) == 0:
                logger.warning("No predictions generated, returning zeros.")
                return np.zeros(len(X))
            return np.mean(preds, axis=0)
        # st = time.time()
        prediction = ensemble_predict(models, feature_df)
        # et = time.time()
        # print(f'推理用时 {et-st}')
        # prediction = 1 - prediction
        yield prediction

In [19]:
crunch.test(
    # Uncomment to disable the train
    force_first_train=True,

    # Uncomment to disable the determinism check
    # no_determinism_check=True,
)

[32m20:13:57[0m [33mno forbidden library found[0m
[32m20:13:57[0m [33m[0m
[32m20:14:02[0m started
[32m20:14:02[0m running local test
[32m20:14:02[0m [33minternet access isn't restricted, no check will be done[0m
[32m20:14:02[0m 
[32m20:14:05[0m starting unstructured loop...
[32m20:14:05[0m executing - command=train


data\X_train.parquet: download from https:crunchdao--competition--production.s3-accelerate.amazonaws.com/data-releases/146/X_train.parquet (204327238 bytes)
data\X_train.parquet: already exists, file length match
data\X_test.reduced.parquet: download from https:crunchdao--competition--production.s3-accelerate.amazonaws.com/data-releases/146/X_test.reduced.parquet (2380918 bytes)
data\X_test.reduced.parquet: already exists, file length match
data\y_train.parquet: download from https:crunchdao--competition--production.s3-accelerate.amazonaws.com/data-releases/146/y_train.parquet (61003 bytes)
data\y_train.parquet: already exists, file length match
data\y_test.reduced.parquet: download from https:crunchdao--competition--production.s3-accelerate.amazonaws.com/data-releases/146/y_test.reduced.parquet (2655 bytes)
data\y_test.reduced.parquet: already exists, file length match


未指定特征文件，将尝试加载最新版本。
正在从 features_20250926_220622.parquet 加载特征...
加载字典格式特征文件成功，包含数据ID: ['0']
特征拼接成功，使用数据ID: ['0']，共 107 个特征，9901 行数据。




Running distribution_stats_features (parallel): 100%|██████████| 100/100 [00:04<00:00, 22.00it/s]
Running test_stats_features_first (parallel): 100%|██████████| 100/100 [00:00<00:00, 354.51it/s]
Running test_stats_features_second (parallel): 100%|██████████| 100/100 [00:00<00:00, 101.67it/s]
Running trend_features (parallel): 100%|██████████| 100/100 [00:00<00:00, 255.08it/s]
Running entropy_features_first (parallel): 100%|██████████| 100/100 [00:21<00:00,  4.69it/s]
Running entropy_features_second (parallel): 100%|██████████| 100/100 [00:00<00:00, 367.37it/s]
Running tsfresh_features_first (parallel): 100%|██████████| 100/100 [00:01<00:00, 52.81it/s]
Running rupture_cost_features (parallel): 100%|██████████| 100/100 [00:01<00:00, 66.12it/s]
Running distribution_stats_features (parallel): 100%|██████████| 100/100 [00:04<00:00, 22.93it/s]
Running test_stats_features_first (parallel): 100%|██████████| 100/100 [00:00<00:00, 293.04it/s]
Running test_stats_features_second (parallel): 100%|█



Saving models: 100%|██████████| 5/5 [00:02<00:00,  2.34it/s]








Saving models: 100%|██████████| 5/5 [00:00<00:00,  6.46it/s]








Saving models: 100%|██████████| 5/5 [00:00<00:00, 63.27it/s]




[32m20:27:53[0m executing - command=infer




Inference Progress: 101it [01:01,  1.65it/s]
[32m20:28:59[0m checking determinism by executing the inference again with 30% of the data (tolerance: 1e-08)
[32m20:28:59[0m executing - command=infer




Inference Progress: 30it [00:18,  1.60it/s]
[32m20:29:21[0m determinism check: passed
[32m20:29:21[0m [33msave prediction - path=data\prediction.parquet[0m
[32m20:29:21[0m ended
[32m20:29:21[0m [33mduration - time=00:15:19[0m
[32m20:29:21[0m [33mmemory - before="474.94 MB" after="512.97 MB" consumed="38.02 MB"[0m


In [20]:
prediction = pd.read_parquet("data/prediction.parquet")
prediction

Unnamed: 0_level_0,prediction
id,Unnamed: 1_level_1
10001,0.117964
10002,0.036871
10003,0.069379
10004,0.127808
10005,0.425484
...,...
10097,0.165459
10098,0.017593
10099,0.212351
10100,0.017096


In [21]:
# Load the targets
target = pd.read_parquet("data/y_test.reduced.parquet")["structural_breakpoint"]

# Call the scoring function
sklearn.metrics.roc_auc_score(
    target,
    prediction,
)

np.float64(0.9305164319248826)