In [1]:
import operator
import random
import pickle
from functools import partial
from collections import Counter

import numpy as np
import pandas as pd
import tqdm

from deap import base, creator, tools, gp

In [4]:
EPS = 1e-12

# ---------- 基础工具 ----------
def _to_arr(x):
    return np.asarray(x, dtype=float)

def _win_from(x, default=10, lo=2, hi=60):
    """把第三个参数/窗口参数变成标量窗口长度。"""
    x = _to_arr(x)
    try:
        n = int(np.nanmedian(np.abs(x)))
    except Exception:
        n = int(abs(float(x)))
    if not np.isfinite(n):
        n = default
    return min(max(n, lo), hi)

def safe_truediv(a, b):
    a, b = _to_arr(a), _to_arr(b)
    out = np.zeros_like(a, dtype=float)
    m = np.abs(b) > EPS
    out[m] = a[m] / b[m]
    return out

# ---------- 时序延迟/差分 ----------
def _delay(x, k):
    x = _to_arr(x)
    if x.size == 0: return x
    y = np.empty_like(x)
    y[:k] = np.nan
    y[k:] = x[:-k]
    return y

def gp_delay_1(x):  return _delay(x, 1)
def gp_delay_2(x):  return _delay(x, 2)
def gp_delay_3(x):  return _delay(x, 3)
def gp_delay_4(x):  return _delay(x, 4)
def gp_delay_5(x):  return _delay(x, 5)
def gp_delay_6(x):  return _delay(x, 6)
def gp_delay_7(x):  return _delay(x, 7)
def gp_delay_8(x):  return _delay(x, 8)

def gp_delta(x):
    x = _to_arr(x)
    y = np.empty_like(x)
    y[:] = np.nan
    y[1:] = x[1:] - x[:-1]
    return y

def gp_signedpower(x):
    x = _to_arr(x)
    return np.sign(x) * np.power(np.abs(x) + EPS, 0.5)  # 可按需改幂次

# ---------- 平滑/衰减 ----------
def _ema(x, span):
    s = pd.Series(_to_arr(x))
    return s.ewm(span=int(span), min_periods=1, adjust=False).mean().values

def gp_decay_05(x): return _ema(x, 5)
def gp_decay_10(x): return _ema(x, 10)
def gp_decay_20(x): return _ema(x, 20)

# ---------- 逻辑/比较 ----------
def gp_and(a, b): return ((_to_arr(a) > 0) & (_to_arr(b) > 0)).astype(float)
def gp_or(a, b):  return ((_to_arr(a) > 0) | (_to_arr(b) > 0)).astype(float)
def gp_lt(a, b):  return (_to_arr(a) < _to_arr(b)).astype(float)
def gp_gt(a, b):  return (_to_arr(a) > _to_arr(b)).astype(float)

def gp_if(cond, x, y):
    c = _to_arr(cond)
    x, y = _to_arr(x), _to_arr(y)
    return np.where(c > 0, x, y)

def gp_if_then_else(a, b, x, y):
    aa, bb = _to_arr(a), _to_arr(b)
    x, y = _to_arr(x), _to_arr(y)
    return np.where(aa > bb, x, y)

# ---------- 聚合/统计 ----------
def gp_max(a, b): return np.maximum(_to_arr(a), _to_arr(b))
def gp_min(a, b): return np.minimum(_to_arr(a), _to_arr(b))
def gp_log(x):    return np.log(np.abs(_to_arr(x)) + EPS)
def gp_abs(x):    return np.abs(_to_arr(x))
def gp_vneg(x):   return -_to_arr(x)
def gp_sign(x):   return np.sign(_to_arr(x))

def _rolling(x, win, func):
    s = pd.Series(_to_arr(x))
    return s.rolling(win, min_periods=1).apply(func, raw=True).values

def gp_stddev_05(x): return pd.Series(_to_arr(x)).rolling(5, 1).std().values
def gp_stddev_10(x): return pd.Series(_to_arr(x)).rolling(10, 1).std().values
def gp_stddev_15(x): return pd.Series(_to_arr(x)).rolling(15, 1).std().values

def gp_prod_05(x):   return pd.Series(_to_arr(x)).rolling(5, 1).apply(lambda v: np.prod(v), raw=True).values
def gp_prod_10(x):   return pd.Series(_to_arr(x)).rolling(10,1).apply(lambda v: np.prod(v), raw=True).values
def gp_prod_20(x):   return pd.Series(_to_arr(x)).rolling(20,1).apply(lambda v: np.prod(v), raw=True).values

def gp_wma(x, w, n):
    x, w = _to_arr(x), _to_arr(w)
    win = _win_from(n, default=10)
    xs, ws = pd.Series(x), pd.Series(np.abs(w) + EPS)
    num = (xs * ws).rolling(win, min_periods=1).sum()
    den = ws.rolling(win, min_periods=1).sum()
    return (num / den).values

def gp_cov_05(a, b):
    a, b = pd.Series(_to_arr(a)), pd.Series(_to_arr(b))
    return a.rolling(5, 1).cov(b).values

def gp_cov_10(a, b):
    a, b = pd.Series(_to_arr(a)), pd.Series(_to_arr(b))
    return a.rolling(10, 1).cov(b).values

def gp_cov_20(a, b):
    a, b = pd.Series(_to_arr(a)), pd.Series(_to_arr(b))
    return a.rolling(20, 1).cov(b).values

def gp_corr_05(a, b):
    a, b = pd.Series(_to_arr(a)), pd.Series(_to_arr(b))
    return a.rolling(5, 1).corr(b).values

def gp_corr_10(a, b):
    a, b = pd.Series(_to_arr(a)), pd.Series(_to_arr(b))
    return a.rolling(10, 1).corr(b).values

def gp_corr_20(a, b):
    a, b = pd.Series(_to_arr(a)), pd.Series(_to_arr(b))
    return a.rolling(20, 1).corr(b).values

def gp_mean2(a, b):                 return np.nanmean(np.vstack([_to_arr(a), _to_arr(b)]), axis=0)
def gp_mean3(a, b, c):              return np.nanmean(np.vstack([_to_arr(a), _to_arr(b), _to_arr(c)]), axis=0)
def gp_mean4(a, b, c, d):           return np.nanmean(np.vstack([_to_arr(a), _to_arr(b), _to_arr(c), _to_arr(d)]), axis=0)
def gp_mean5(a, b, c, d, e):        return np.nanmean(np.vstack([_to_arr(a), _to_arr(b), _to_arr(c), _to_arr(d), _to_arr(e)]), axis=0)
def gp_mean6(a, b, c, d, e, f):     return np.nanmean(np.vstack([_to_arr(a), _to_arr(b), _to_arr(c), _to_arr(d), _to_arr(e), _to_arr(f)]), axis=0)
def gp_mean7(a, b, c, d, e, f, g):  return np.nanmean(np.vstack([_to_arr(a), _to_arr(b), _to_arr(c), _to_arr(d), _to_arr(e), _to_arr(f), _to_arr(g)]), axis=0)

def gp_clear_by_cond(x, y, cond):
    """cond>0 取 y，否则取 x（一个通用的按条件清洗/替换）。"""
    return np.where(_to_arr(cond) > 0, _to_arr(y), _to_arr(x))

def rank_x1(x, w1, w2, w3):
    """把 x 做一个滚动百分位 rank（窗口来自 w1）；这是通用实现，和你私有版可能略有差异。"""
    n = _win_from(w1, default=10)
    s = pd.Series(_to_arr(x))
    def _pct_rank(a):
        ser = pd.Series(a)
        return ser.rank(pct=True).iloc[-1]
    return s.rolling(n, min_periods=1).apply(lambda v: _pct_rank(v), raw=False).values

In [6]:
# =========================
# 配置全局参数
# =========================
# # """
# N_GENERATIONS = 7
# POP_SIZE = 2000
# MUT_PB, MUT_Point, CXPB = 0.05, 0.4, 0.5  # 子树变异率/点变异率/交叉率
# NUM_HOF = 200  # 名人堂：精英个体进入名人堂的数目
# hall_of_fame = tools.HallOfFame(NUM_HOF)
# NUM_ELITE = 200  # 精英策略：精英个体跳过自然选择的数目
# tournament_size = 5  # 锦标赛压力
# # """

# 试运行版

N_GENERATIONS = 2
POP_SIZE = 10
MUT_PB, MUT_Point, CXPB = 0.05, 0.4, 0.5
NUM_HOF = 1
hall_of_fame = tools.HallOfFame(NUM_HOF)
NUM_ELITE = 1
tournament_size = 5


# 配置文件提取储存路径
Ver = 'V6'
path01 = 'Trees/Factors_selected.csv'  # 选取因子代码提取路径
path02 = 'GP/V6/'                     # 种群储存路径
path03 = 'Trees/alphas191_data_V2.csv'  # 因子数据提取路径
path04 = 'GP/V6/HOF.csv'              # 名人堂结果存储路径

# =========================
# 定义因子
# =========================
# factors = pd.read_csv(path01)
# new_factors = factors['Alpha_Index'].iloc[20:28].to_list()  # 新引入因子

# 加入优秀因子（前代高重要性因子/量价数据）
exellent = [
    'open_interest', 'alpha054', 'alpha013', 'amount', 'alpha002', 'alpha191',
    'alpha135', 'alpha022', 'volume', 'alpha096', 'alpha162', 'alpha038',
    'alpha165', 'alpha049', 'alpha166', 'alpha066', 'alpha161', 'open', 'alpha183'
]
factors_selected = exellent #+ new_factors
FACTOR_COUNT = len(factors_selected)

print(f"共计{FACTOR_COUNT}个因子")
print(f"保留{len(exellent)}个优秀因子：{exellent}")
#print(f"新引入{len(new_factors)}个新因子：{new_factors}")

# =========================
# 创建 DEAP 的基本结构
# =========================
creator.create("FitnessMulti", base.Fitness, weights=(1.0, -1.0, -1.0, -1.0))  # 多目标：IC均值绝对值最大化，IC标准差最小化，空值最小化, 复杂度最小化
creator.create("Individual", gp.PrimitiveTree, fitness=creator.FitnessMulti)   # 定义个体是符号树

# 注册遗传算法工具
toolbox = base.Toolbox()

# 定义 Primitive Set（用于定义操作符 primitive 和变量 terminal）
pset = gp.PrimitiveSet("MAIN", FACTOR_COUNT)
for factor in factors_selected:
    pset.renameArguments(**{f'ARG{factors_selected.index(factor)}': factor})

# —— 基础算子与常量（自定义原语函数请确保在环境中可用） ——
pset.addPrimitive(operator.add, 2)
pset.addPrimitive(operator.sub, 2)
pset.addPrimitive(operator.mul, 2)
pset.addPrimitive(safe_truediv, 2)  # 除法（需自定义：安全除法）
pset.addPrimitive(np.sin, 1)
pset.addPrimitive(np.cos, 1)
pset.addPrimitive(np.tan, 1)
pset.addEphemeralConstant("rand", partial(np.random.uniform, 0, 20.0))  # 随机常量
# pset.addEphemeralConstant("rand_int", partial(random.randint, 1, 10))  # 随机整数

# —— 你的时序与统计原语（需保证已实现） ——
pset.addPrimitive(gp_delay_1, 1)
pset.addPrimitive(gp_delay_2, 1)
pset.addPrimitive(gp_delay_3, 1)
pset.addPrimitive(gp_delay_4, 1)
pset.addPrimitive(gp_delay_5, 1)
pset.addPrimitive(gp_delay_6, 1)
pset.addPrimitive(gp_delay_7, 1)
pset.addPrimitive(gp_delay_8, 1)
pset.addPrimitive(gp_delta, 1)
pset.addPrimitive(gp_signedpower, 1)
pset.addPrimitive(gp_decay_05, 1)
pset.addPrimitive(gp_decay_10, 1)
pset.addPrimitive(gp_decay_20, 1)
pset.addPrimitive(gp_and, 2)
pset.addPrimitive(gp_or, 2)
pset.addPrimitive(gp_lt, 2)
pset.addPrimitive(gp_gt, 2)
pset.addPrimitive(gp_if, 3)
pset.addPrimitive(gp_max, 2)
pset.addPrimitive(gp_min, 2)
pset.addPrimitive(gp_stddev_05, 1)
pset.addPrimitive(gp_stddev_10, 1)
pset.addPrimitive(gp_stddev_15, 1)
pset.addPrimitive(gp_log, 1)
pset.addPrimitive(gp_abs, 1)
pset.addPrimitive(gp_prod_05, 1)
pset.addPrimitive(gp_prod_10, 1)
pset.addPrimitive(gp_prod_20, 1)
pset.addPrimitive(gp_wma, 3)
pset.addPrimitive(gp_vneg, 1)
pset.addPrimitive(gp_sign, 1)
pset.addPrimitive(gp_cov_05, 2)
pset.addPrimitive(gp_cov_10, 2)
pset.addPrimitive(gp_cov_20, 2)
pset.addPrimitive(gp_corr_05, 2)
pset.addPrimitive(gp_corr_10, 2)
pset.addPrimitive(gp_corr_20, 2)
pset.addPrimitive(gp_mean2, 2)
pset.addPrimitive(gp_mean3, 3)
pset.addPrimitive(gp_mean4, 4)
pset.addPrimitive(gp_mean5, 5)
pset.addPrimitive(gp_mean6, 6)
pset.addPrimitive(gp_mean7, 7)
pset.addPrimitive(gp_clear_by_cond, 3)
pset.addPrimitive(gp_if_then_else, 4)
pset.addPrimitive(rank_x1, 4)

# =========================
# 适应度评估（总适应度，多合约合并）
# =========================
def evaluate_individual(individual, func, datasets):
    """
    评估全局个体适应度（多个合约的数据）。
    :param individual: 当前的符号树（GP 表达式）。
    :param func: 编译后的符号树公式。
    :param datasets: 包含多个合约的训练数据 [(X1, y1), (X2, y2), ...]
    """
    try:
        # 保存每个合约计算出的新因子值和对应的收益率，用于计算IC
        new_factors_dict = {}
        returns_dict = {}

        # 将符号树编译为可执行函数
        compiled_func = func(individual)

        # 检查公式树是否包含选定因子
        tree_str = str(individual)
        factors_in_func = 0
        for factor in factors_selected:
            if factor in tree_str:
                factors_in_func += 1
        if factors_in_func == 0:  # 惩罚公式中不包含选定因子的因子
            return (float("-inf"), float("inf"), float("inf"), float("inf"))

        for name, X_train, Y_train in datasets:
            X_sample = X_train.values.T
            Y_sample = Y_train.values

            # compiled_func 接受若干 numpy 数组作为输入
            predictions = compiled_func(*X_sample)
            new_factors_dict[name] = pd.Series(predictions, index=X_train.index)
            returns_dict[name] = pd.Series(Y_sample, index=Y_train.index)

        new_factors_df = pd.DataFrame(new_factors_dict)
        returns_df = pd.DataFrame(returns_dict)

        ic_values = []
        nan_count = 0  # 空值计数器
        for time in new_factors_df.index:
            ic = new_factors_df.loc[time].corr(returns_df.loc[time])
            ic = pd.to_numeric(ic, errors='coerce')
            if np.isnan(ic):
                nan_count += 1
            else:
                ic_values.append(ic)

        # 惩罚过大的空值: 若空值个数超过 1% 则惩罚
        threshold = 0.01
        if len(new_factors_df) - len(ic_values) >= threshold * len(new_factors_df):
            return (float("-inf"), float("inf"), float("inf"), float("inf"))
        else:
            if np.abs(np.nanmean(ic_values)) < 0.001:  # 惩罚过低的 IC 绝对值
                return (float("-inf"), float("inf"), float("inf"), float("inf"))
            else:
                avg_ic = np.nanmean(ic_values)   # 平均 IC
                abs_avr_ic = np.abs(avg_ic)
                std_ic = np.nanvar(ic_values)    # 方差（原注释写 ICIR）
                complexity = len(individual)     # 个体复杂度
                return (abs_avr_ic, std_ic, nan_count, complexity)

    except Exception:
        # 如果计算错误，返回无穷大作为惩罚
        return (float("-inf"), float("inf"), float("inf"), float("inf"))

# =========================
# 无效个体处理 & 多样性/表现监控
# =========================
def is_valid(ind):
    return ind.fitness.values[0] != float("-inf")

def is_valid_and_unique(new_ind, population_set):
    if not is_valid(new_ind):
        return False
    if str(new_ind) in population_set:
        return False
    return True

def ineffective_processing(population):
    """
    种群无效个体处理：持续生成有效个体来替换种群中无效个体，直到种群中所有个体都有效；
    新生成的有效个体需要不同于原始个体以保证多样性
    """
    population_set = set(str(ind) for ind in population if is_valid(ind))
    for i, ind in enumerate(population):
        if not is_valid(ind):
            while True:
                new_ind = toolbox.individual()
                new_ind.fitness.values = toolbox.evaluate(new_ind)
                if is_valid_and_unique(new_ind, population_set):
                    population[i] = new_ind
                    population_set.add(str(new_ind))
                    break
    return population

def effective_ind(population):
    return len([ind for ind in population if ind.fitness.values[0] != float("-inf")]) / len(population)

def genotype_diversity(population):
    unique_individuals = set(str(ind) for ind in population if ind.fitness.values[0] != float("-inf"))
    denom = len([ind for ind in population if ind.fitness.values[0] != float("-inf")])
    return len(unique_individuals) / denom if denom > 0 else 0

def phenotype_diversity(population):
    fitness_values = [ind.fitness.values[0] for ind in population if ind.fitness.values[0] != float("-inf")]
    if len(fitness_values) <= 1:
        return 0
    return np.std(fitness_values)

def hamming_distance(ind1, ind2):
    return sum(c1 != c2 for c1, c2 in zip(str(ind1), str(ind2)))

def average_hamming_distance(population):
    total_distance = 0
    comparisons = 0
    for i in range(len(population)):
        for j in range(i + 1, len(population)):
            if population[i].fitness.values[0] == float("-inf") or population[j].fitness.values[0] == float("-inf"):
                continue
            total_distance += hamming_distance(population[i], population[j])
            comparisons += 1
    return total_distance / comparisons if comparisons > 0 else 0

def population_avg_IC(population):
    vals = [ind.fitness.values[0] for ind in population if ind.fitness.values[0] != float("-inf")]
    return float(np.mean(vals)) if len(vals) else 0.0

def population_avg_IC_std(population):
    vals = [ind.fitness.values[1] for ind in population if ind.fitness.values[0] != float("-inf")]
    return float(np.mean(vals)) if len(vals) else 0.0

def population_avg_nan_count(population):
    vals = [ind.fitness.values[2] for ind in population if ind.fitness.values[0] != float("-inf")]
    return float(np.mean(vals)) if len(vals) else 0.0

def population_avg_complexity(population):
    vals = [ind.fitness.values[3] for ind in population if ind.fitness.values[0] != float("-inf")]
    return float(np.mean(vals)) if len(vals) else 0.0

# =========================
# 0-1 标准化 / 反归一化
# =========================
def normalizing_population(population):
    valid = [ind for ind in population if ind.fitness.values[0] != float("-inf")]
    min_f1 = min(ind.fitness.values[0] for ind in valid)
    max_f1 = max(ind.fitness.values[0] for ind in valid)
    min_f2 = min(ind.fitness.values[1] for ind in valid)
    max_f2 = max(ind.fitness.values[1] for ind in valid)
    min_f3 = min(ind.fitness.values[2] for ind in valid)
    max_f3 = max(ind.fitness.values[2] for ind in valid)
    min_f4 = min(ind.fitness.values[3] for ind in valid)
    max_f4 = max(ind.fitness.values[3] for ind in valid)

    for ind in population:
        f1 = ind.fitness.values[0]
        f2 = ind.fitness.values[1]
        f3 = ind.fitness.values[2]
        f4 = ind.fitness.values[3]
        f1_normalized = (f1 - min_f1) / (max_f1 - min_f1) if max_f1 != min_f1 else 1
        f2_normalized = (f2 - min_f2) / (max_f2 - min_f2) if max_f2 != min_f2 else 0
        f3_normalized = (f3 - min_f3) / (max_f3 - min_f3) if max_f3 != min_f3 else 0
        f4_normalized = (f4 - min_f4) / (max_f4 - min_f4) if max_f4 != min_f4 else 0
        ind.fitness.values = (f1_normalized, f2_normalized, f3_normalized, f4_normalized)

    Recover_Pac = [[min_f1, max_f1], [min_f2, max_f2], [min_f3, max_f3], [min_f4, max_f4]]
    return population, Recover_Pac

def denormalizing_population(population, Recover_Pac):
    out = []
    for ind in population:
        f1 = ind.fitness.values[0] * (Recover_Pac[0][1] - Recover_Pac[0][0]) + Recover_Pac[0][0]
        f2 = ind.fitness.values[1] * (Recover_Pac[1][1] - Recover_Pac[1][0]) + Recover_Pac[1][0]
        f3 = ind.fitness.values[2] * (Recover_Pac[2][1] - Recover_Pac[2][0]) + Recover_Pac[2][0]
        f4 = ind.fitness.values[3] * (Recover_Pac[3][1] - Recover_Pac[3][0]) + Recover_Pac[3][0]
        ind.fitness.values = (f1, f2, f3, f4)
        out.append(ind)
    return out

# =========================
# 选择函数（非支配排序 + 锦标赛）
# =========================
def tournament_selection(population, tournament_size):
    # 非支配排序
    fronts = tools.sortNondominated(population, len(population), first_front_only=False)

    # 高支配层级的个体获得更多参与锦标赛的机会
    probabilities = np.zeros(len(population))
    for i, front in enumerate(fronts):
        prob = 1.0 / (i + 1)
        for ind in front:
            probabilities[population.index(ind)] = prob
    probabilities /= probabilities.sum()  # 归一化

    selected_individuals = []
    for _ in range(POP_SIZE - NUM_ELITE):  # 种群个数 - 精英个体个数
        competitors = random.choices(population, weights=probabilities, k=tournament_size)
        best_individual = tools.selNSGA2(competitors, 1)[0]
        selected_individuals.append(best_individual)
    return selected_individuals

# =========================
# 因子重要性监控
# =========================
def factors_importance(offspring, pset):
    """
    因子重要性：自然选择出的个体中因子出现的频率（后代中包含该因子的个体个数/后代总个体数）
    """
    factors_counter = Counter()
    for individual in offspring:
        factors = []
        for node in individual:
            if isinstance(node, gp.Terminal) and not isinstance(node, (float, int)):
                if node.name[:3] == 'ARG':  # 只提取变量（因子），排除常数
                    index = int(node.name[3:])
                    renamed_name = pset.arguments[index]
                    factors.append(renamed_name)
        factors = set(factors)
        factors_counter.update(factors)
    factors_counter = dict(factors_counter)

    total_selections = len(offspring)
    factor_probabilities = {factor: count / total_selections for factor, count in factors_counter.items()}
    sorted_factors = sorted(factor_probabilities.items(), key=lambda x: x[1], reverse=True)  # 降序

    print("因子重要性:")
    for factor, prob in sorted_factors:
        print(f"{factor}: {prob:.2%}")

    return factor_probabilities

# =========================
# 点变异
# =========================
def point_mutation(individual, pset):
    """
    对个体进行点变异：选择一个随机节点，替换为新节点。
    """
    node = random.randint(0, len(individual) - 1)
    # 终端 -> 新终端
    if isinstance(individual[node], gp.Terminal):
        available_terminals = [t for t in pset.terminals[pset.ret] if isinstance(t, gp.Terminal)]
        individual[node] = random.choice(available_terminals)
    # 原始操作符 -> 同元数新操作符
    elif isinstance(individual[node], gp.Primitive):
        available_primitives = [p for p in pset.primitives[pset.ret] if p.arity == individual[node].arity]
        individual[node] = random.choice(available_primitives)
    return individual

# =========================
# 名人堂结果整理
# =========================
def df_HOF(hall_of_fame):
    ind_name = []
    IC_list = []
    IC_std_list = []
    ICIR_list = []
    nan_list = []
    complexity_list = []

    for i, ind in enumerate(hall_of_fame):
        ind_name.append(str(ind))
        IC_list.append(ind.fitness.values[0])
        IC_std_list.append(ind.fitness.values[1])
        ICIR_list.append(ind.fitness.values[0] / ind.fitness.values[1] if ind.fitness.values[1] != 0 else np.nan)
        nan_list.append(ind.fitness.values[2])
        complexity_list.append(ind.fitness.values[3])

    f_stats = pd.DataFrame({
        '个体': ind_name,
        'IC绝对值': IC_list,
        'IC_std': IC_std_list,
        'ICIR': ICIR_list,
        '空值个数': nan_list,
        '复杂度': complexity_list
    })
    return f_stats

# =========================
# 主训练流程
# =========================
def run_global_training(datasets):
    """
    针对全局数据训练符号树。
    :param datasets: [(X1, y1), (X2, y2), ...]，包含所有合约的数据。
    :return: 最优个体列表、其适应度、最终种群、名人堂、名人堂因子重要性
    """
    # 初始化种群的方法
    toolbox.register("expr", gp.genHalfAndHalf, pset=pset, min_=1, max_=6)
    toolbox.register("individual", tools.initIterate, creator.Individual, toolbox.expr)
    toolbox.register("population", tools.initRepeat, list, toolbox.individual)

    # 注册遗传函数
    toolbox.register("compile", gp.compile, pset=pset)
    toolbox.register("evaluate", partial(evaluate_individual, func=toolbox.compile, datasets=datasets))
    toolbox.register("mate", gp.cxOnePoint)
    toolbox.register("mutate", gp.mutUniform, expr=partial(gp.genHalfAndHalf, min_=1, max_=4), pset=pset)  # 子树变异
    toolbox.register("mutate_point", point_mutation, pset=pset)  # 点变异
    toolbox.register("select", tools.selTournament, tournsize=3)  # 锦标赛选择

    # 初始化种群
    population = toolbox.population(n=POP_SIZE)

    # 进化过程
    for gen in tqdm.tqdm(range(N_GENERATIONS)):
        print(f"Generation {gen + 1}:")

        # 评估适应度
        fitnesses = list(map(toolbox.evaluate, population))
        for ind, fit in zip(population, fitnesses):
            ind.fitness.values = fit

        if gen == 0:
            # 初代种群无效个体处理
            print("初始种群无效个体处理中...")
            population = ineffective_processing(population)

            # 初始种群多样性检测
            effective_ind_pct = effective_ind(population)
            genotype_diversity_value = genotype_diversity(population)
            phenotype_diversity_value = phenotype_diversity(population)
            average_hamming_distance_value = average_hamming_distance(population)
            avg_ic = population_avg_IC(population)
            avg_ic_std = population_avg_IC_std(population)
            avg_nan = population_avg_nan_count(population)
            avg_complexity = population_avg_complexity(population)
            best_individual = tools.selNSGA2(population, 1)[0]

            print("种群多样性检测：")
            print(f"初始种群有效个体占比：{effective_ind_pct:.2%}")
            print(f"初始种群基因型多样性：{genotype_diversity_value:.2%}")
            print(f"初始种群表型(abs_IC_mean)多样性：{phenotype_diversity_value:.4}")
            print(f"初始种群平均汉明距离：{average_hamming_distance_value}")
            print("种群表现：")
            print(f"初始种群平均IC：{avg_ic:.2%}")
            print(f"初始种群平均IC_std：{avg_ic_std:.2%}")
            print(f"初始种群平均ICIR：{avg_ic / avg_ic_std:.4}" if avg_ic_std != 0 else "初始种群平均ICIR：NaN")
            print(f"初始种群平均空值个数：{avg_nan}")
            print(f"初始种群平均复杂度：{avg_complexity:.2f}")
            print(
                f"初始种群最优个体 : {best_individual}, "
                f"IC绝对值: {best_individual.fitness.values[0]:.2%}, "
                f"IC_std: {best_individual.fitness.values[1]:.2%}, "
                f"ICIR: {best_individual.fitness.values[0] / best_individual.fitness.values[1]:.4f} "
                if best_individual.fitness.values[1] != 0 else "ICIR: NaN"
                f", 空值个数: {best_individual.fitness.values[2]}, "
                f"复杂度: {best_individual.fitness.values[3]:.2f},"
            )

            with open(path02 + f"{Ver}_gen{gen + 1}.pkl", 'wb') as file:
                pickle.dump(population, file)

        else:
            # 后续各代统计
            effective_ind_pct = effective_ind(population)
            genotype_diversity_value = genotype_diversity(population)
            phenotype_diversity_value = phenotype_diversity(population)
            average_hamming_distance_value = average_hamming_distance(population)
            avg_ic = population_avg_IC(population)
            avg_ic_std = population_avg_IC_std(population)
            avg_nan = population_avg_nan_count(population)
            avg_complexity = population_avg_complexity(population)
            best_individual = tools.selNSGA2(population, 1)[0]

            print("种群多样性检测：")
            print(f"第{gen + 1}代种群有效个体占比：{effective_ind_pct:.2%}")
            print(f"第{gen + 1}代种群基因型多样性：{genotype_diversity_value:.2%}")
            print(f"第{gen + 1}代种群表型多样性：{phenotype_diversity_value:.4}")
            print(f"第{gen + 1}代种群平均汉明距离：{average_hamming_distance_value}")
            print("种群表现：")
            print(f"第{gen + 1}代种群平均IC:{avg_ic:.2%}")
            print(f"第{gen + 1}代种群平均IC_std:{avg_ic_std:.2%}")
            print(f"第{gen + 1}代种群平均ICIR:{avg_ic / avg_ic_std:.4}" if avg_ic_std != 0 else f"第{gen + 1}代种群平均ICIR: NaN")
            print(f"第{gen + 1}代种群平均空值个数：{avg_nan}")
            print(f"第{gen + 1}代种群平均复杂度：{avg_complexity:.2f}")
            print(
                f"第{gen + 1}代种群最优个体 : {best_individual}, "
                f"IC绝对值: {best_individual.fitness.values[0]:.2%}, "
                f"IC_std: {best_individual.fitness.values[1]:.2%}, "
                f"ICIR: {best_individual.fitness.values[0] / best_individual.fitness.values[1]:.4f} "
                if best_individual.fitness.values[1] != 0 else "ICIR: NaN"
                f", 空值个数: {best_individual.fitness.values[2]}, "
                f"复杂度: {best_individual.fitness.values[3]:.2f},"
            )

            with open(path02 + f"{Ver}_gen{gen + 1}.pkl", 'wb') as file:
                pickle.dump(population, file)

        # 删除无效个体
        population = [ind for ind in population if ind.fitness.values[0] != float("-inf")]

        # 标准化多目标值
        population, Recover_Pac = normalizing_population(population)

        # 使用非支配排序将种群分成多个前沿
        fronts = tools.sortNondominated(population, len(population), first_front_only=False)
        print(f"有效个体可以被分为{len(fronts)}个非支配前沿")
        for rank, front in enumerate(fronts):
            print(f"第{rank + 1}层非支配层有{len(front)}个个体")

        # 更新 Hall of Fame
        best_individuals = []
        for front in fronts:
            if len(best_individuals) + len(front) > NUM_HOF:
                remaining = NUM_HOF - len(best_individuals)
                best_individuals.extend(front[:remaining])
                break
            best_individuals.extend(front)

        best_individuals = denormalizing_population(best_individuals, Recover_Pac)
        hall_of_fame.update(best_individuals)

        print("进化阶段开始...")
        # 选择父代：基于非支配排序与锦标赛选择的选择方法
        offspring = tournament_selection(population, tournament_size)

        # 克隆后代
        offspring = list(map(toolbox.clone, offspring))

        # 因子重要性检测
        _ = factors_importance(offspring, pset=pset)

        # 交叉操作
        for child1, child2 in zip(offspring[::2], offspring[1::2]):  # 后代两两配对
            if np.random.rand() < CXPB:  # CXPB 交叉概率
                toolbox.mate(child1, child2)
                if hasattr(child1.fitness, "values"):
                    del child1.fitness.values
                if hasattr(child2.fitness, "values"):
                    del child2.fitness.values

        # 变异操作(子树变异和点变异)
        for mutant in offspring:
            if np.random.rand() < MUT_PB:  # 子树变异概率
                toolbox.mutate(mutant)
                if hasattr(mutant.fitness, "values"):
                    del mutant.fitness.values
            else:
                if np.random.rand() < MUT_Point:
                    toolbox.mutate_point(mutant)
                    if hasattr(mutant.fitness, "values"):
                        del mutant.fitness.values

        # 精英策略
        elites = best_individuals[:NUM_ELITE]
        population[:] = offspring + elites  # 更新种群

        print(f"第{gen + 1}代种群进化完毕，正在计算第{gen + 2}代种群统计值...")

    # =========================
    # 最终代评估与输出
    # =========================
    print(f"第{N_GENERATIONS + 1}代（最终代）适应度计算中...")

    fitnesses = list(map(toolbox.evaluate, population))
    for ind, fit in zip(population, fitnesses):
        ind.fitness.values = fit

    # 删除无效个体
    population = [ind for ind in population if ind.fitness.values[0] != float("-inf")]

    # 标准化多目标值
    population, Recover_Pac = normalizing_population(population)

    # 使用非支配排序将种群分成多个前沿
    fronts = tools.sortNondominated(population, len(population), first_front_only=False)
    print(f"有效个体可以被分为{len(fronts)}个非支配前沿")
    for rank, front in enumerate(fronts):
        print(f"第{rank + 1}层非支配层有{len(front)}个个体")

    # 更新 Hall of Fame
    best_individuals = []
    for front in fronts:
        if len(best_individuals) + len(front) > NUM_HOF:
            remaining = NUM_HOF - len(best_individuals)
            best_individuals.extend(front[:remaining])
            break
        best_individuals.extend(front)

    best_individuals = denormalizing_population(best_individuals, Recover_Pac)
    hall_of_fame.update(best_individuals)

    best_individuals = hall_of_fame[:NUM_HOF]
    # 名人堂因子出现频率检测
    hof_importance = factors_importance(best_individuals, pset=pset)

    # 打印 Top 结果
    if len(best_individuals) > 0:
        top = best_individuals[0]
        denom = top.fitness.values[1]
        icir = (top.fitness.values[0] / denom) if denom != 0 else np.nan

    for i, ind in enumerate(best_individuals):
        print(
            f"Top {i + 1} 个体 : {ind}, "
            f"IC绝对值: {ind.fitness.values[0]}, "
            f"IC_std: {ind.fitness.values[1]}, "
            f"ICIR: {ind.fitness.values[0] / ind.fitness.values[1] if ind.fitness.values[1] != 0 else np.nan}, "
            f"空值个数: {ind.fitness.values[2]}, "
            f"复杂度: {ind.fitness.values[3]},"
        )

    return best_individuals, [ind.fitness.values for ind in best_individuals], population, hall_of_fame, hof_importance







共计19个因子
保留19个优秀因子：['open_interest', 'alpha054', 'alpha013', 'amount', 'alpha002', 'alpha191', 'alpha135', 'alpha022', 'volume', 'alpha096', 'alpha162', 'alpha038', 'alpha165', 'alpha049', 'alpha166', 'alpha066', 'alpha161', 'open', 'alpha183']


