<a href="https://colab.research.google.com/github/Sam-ai904/Huatai-Model/blob/main/%E9%81%97%E4%BC%A0%E8%A7%84%E5%88%92.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [28]:
# !pip install Tushare
# !pip install deap
import tushare as ts
import pandas as pd
import numpy as np
from deap import base, creator, tools, algorithms,gp
import operator
import random
import datetime
from scipy.stats import pearsonr


In [29]:
ts.set_token('2876ea85cb005fb5fa17c809a98174f2d5aae8b1f830110a5ead6211')  # 替换为你的 Tushare Token
pro = ts.pro_api()

In [30]:
def get_stock_data(stock_code, start_date, end_date):
    """获取 A 股日线数据"""
    df = pro.daily(ts_code=stock_code, start_date=start_date, end_date=end_date)
    df = df.sort_values('trade_date')  # 按日期升序
    df['return'] = df['close'].pct_change().shift(-1)  # 未来1日收益率
    df = df.dropna()  # 移除缺失值
    return df[['trade_date', 'open', 'high', 'low', 'close', 'vol', 'amount', 'return']]

In [31]:
def get_hs300_data(start_date, end_date):
    """获取沪深300成分股数据"""
    hs300 = pro.index_weight(index_code='399300.SZ', start_date=start_date, end_date=end_date)
    stock_codes = hs300['con_code'].unique()[:50]  # 取前50只股票（可调整）
    data = {}
    for code in stock_codes:
        try:
            df = get_stock_data(code, start_date, end_date)
            if not df.empty:
                data[code] = df
        except:
            continue
    return data

In [32]:
# ------------------- 遗传规划设置 -------------------
# 定义操作符
def safe_div(x, y):
    """安全除法，避免除零"""
    return x / y if y != 0 else 1.0

pset = gp.PrimitiveSet("MAIN", 4)  # 输入4个变量：open, close, vol, amount
pset.addPrimitive(operator.add, 2) #定义加法 传输两个值
pset.addPrimitive(operator.sub, 2)
pset.addPrimitive(operator.mul, 2)
pset.addPrimitive(safe_div, 2)
pset.addPrimitive(np.log1p, 1)  # log(1+x)
pset.addPrimitive(np.sin, 1)
pset.addEphemeralConstant("rand", lambda: random.uniform(-1, 1))  # 随机常数 这里用了匿名函数，但其实用正常函数也行
pset.renameArguments(ARG0='open', ARG1='close', ARG2='vol', ARG3='amount') #重命名变量

# 定义适应度函数和个体
creator.create("FitnessMax", base.Fitness, weights=(1.0,))  # 最大化IC create函数参数为 名称 继承基类 属性 这里的属性weight主要是给Fitness用的
creator.create("Individual", gp.PrimitiveTree, fitness=creator.FitnessMax) #create函数参数为 名称 继承基类 属性 这里定义了舒适度fitness 使用了上面定义的FitnessMax

toolbox = base.Toolbox()
#toolbox的register函数 需要传递名称，函数 关键参数 非关键参数
#其目的是注册一个新函数 简化调用 但其实不注册直接调里面涉及的genGrow，initIterate initRepeat compile函数也是可以的，可能这样写好看

# toolbox.register("expr", gp.genGrow, pset=pset, min_= 1, max_= 3 )

# 初始种群用 genHalfAndHalf，混合 genGrow 和 genFull
toolbox.register("expr", gp.genHalfAndHalf, pset=pset, min_=1, max_=3)
# 变异仍用 genFull 确保完整性
toolbox.register("expr_mut", gp.genFull, pset=pset, min_=1, max_=3)

toolbox.register("individual", tools.initIterate, creator.Individual, toolbox.expr)
toolbox.register("population", tools.initRepeat, list, toolbox.individual)
toolbox.register("compile", gp.compile, pset=pset)
toolbox.register("select", tools.selTournament, tournsize=3) #选三个个体打比赛
toolbox.register("mate", gp.cxOnePoint)  # 使用 gp.cxOnePoint实现交叉操作
toolbox.register("expr_mut", gp.genFull, pset=pset, min_=1, max_=3)  # 使用 genFull实现变异表达式 其专注于生成新子树 是构成变异操作的一环 但不是变异操作！
toolbox.register("mutate", gp.mutUniform, expr=toolbox.expr_mut, pset=pset) #使用mutUniform实现变异操作

# 添加深度限制
toolbox.decorate("mate", gp.staticLimit(key=operator.attrgetter("height"), max_value=3))
toolbox.decorate("mutate", gp.staticLimit(key=operator.attrgetter("height"), max_value=3))



In [33]:
# ------------------- 因子评估 -------------------
def evaluate_factor(individual, data):
    """计算因子的 IC（信息系数）"""
    func = toolbox.compile(expr=individual)
    ic_scores = []

    for stock, df in data.items():
        try:
            # 计算因子值
            factor = [func(row.open, row.close, row.vol, row.amount)
                     for _, row in df.iterrows()]
            factor = np.array(factor)
            returns = df['return'].values

            # 计算 IC（皮尔逊相关系数）
            if len(factor) == len(returns) and np.std(factor) > 0:
                ic, _ = pearsonr(factor, returns)
                ic_scores.append(ic)
        except:
            continue

    # 返回平均 IC（忽略 NaN）
    return np.nanmean(ic_scores) if ic_scores else -1.0,


In [None]:
def main():
    # 设置时间范围
    start_date = '20230101'
    end_date = '20241231'

    # 获取数据
    print("获取数据...")
    data = get_hs300_data(start_date, end_date)
    if not data:
        print("无可用数据！")
        return

    # 定义包装函数
    def evaluate(individual):
        return evaluate_factor(individual, data)

    # 注册 evaluate
    toolbox.register("evaluate", evaluate)

    # 初始化种群
    pop = toolbox.population(n=100)
    hof = tools.HallOfFame(1)  # 保存最佳个体

    # 遗传规划参数
    stats = tools.Statistics(lambda ind: ind.fitness.values)
    stats.register("avg", np.mean)
    stats.register("max", np.max)

    # 运行遗传规划
    print("运行遗传规划...")
    pop, log = algorithms.eaSimple(pop, toolbox,
                                  cxpb=0.7, mutpb=0.2,
                                  ngen=20, stats=stats,
                                  halloffame=hof, verbose=True)

    # 输出最佳因子
    best_individual = hof[0]
    print("\n最佳因子表达式:", str(best_individual))
    print("最佳因子 IC:", best_individual.fitness.values[0])

if __name__ == "__main__":
    main()

获取数据...
运行遗传规划...


  ic, _ = pearsonr(factor, returns)
  return np.nanmean(ic_scores) if ic_scores else -1.0,


gen	nevals	avg	max
0  	100   	nan	nan
1  	81    	nan	nan
2  	72    	nan	nan
3  	77    	nan	nan
