In [1]:
import json
import os
from collections import Counter

import numpy as np

from alphagen.data.expression import *
from alphagen.models.alpha_pool import AlphaPool
from alphagen.utils.correlation import batch_pearsonr, batch_spearmanr
from alphagen.utils.pytorch_utils import normalize_by_day
from alphagen.utils.random import reseed_everything
from alphagen_generic.operators import funcs as generic_funcs
from alphagen_generic.features import *
from alphagen_qlib.calculator import QLibStockDataCalculator
from gplearn.fitness import make_fitness
from gplearn.functions import make_function
from gplearn.genetic import SymbolicRegressor


# funcs = [make_function(**func._asdict()) for func in generic_funcs]

instruments = '15min_symbols'
seed = 4
reseed_everything(seed)

cache = {}
# device = torch.device('cuda:0')
device = torch.device('cpu')
close = Feature(FeatureType.CLOSE)
target = Ref(close, -1) / close - 1
data_train = StockData(instrument=instruments,
                        start_time='2022-11-01 00:00:00',
                        end_time='2024-02-29 23:45:00')
data_valid = StockData(instrument=instruments,
                        start_time='2024-03-01 00:00:00',
                        end_time='2024-05-31 23:45:00')
data_test = StockData(instrument=instruments,
                        start_time='2024-06-01 00:00:00',
                        end_time='2024-08-31 23:45:00')
calculator_train = QLibStockDataCalculator(data_train, target)
calculator_valid = QLibStockDataCalculator(data_valid, target)
calculator_test = QLibStockDataCalculator(data_test, target)

pool = AlphaPool(capacity=10,
                 calculator=calculator_train,
                 ic_lower_bound=None,
                 l1_alpha=5e-3)


# def _metric(x, y, w):
#     key = y[0]

#     if key in cache:
#         return cache[key]
#     token_len = key.count('(') + key.count(')')
#     if token_len > 20:
#         return -1.

#     expr = eval(key)
#     try:
#         ic = calculator_train.calc_single_IC_ret(expr)
#     except OutOfDataRangeError:
#         ic = -1.
#     if np.isnan(ic):
#         ic = -1.
#     cache[key] = ic
#     return ic


# Metric = make_fitness(function=_metric, greater_is_better=True)


def try_single():
    top_key = Counter(cache).most_common(1)[0][0]
    expr = eval(top_key)
    ic_valid, ric_valid = calculator_valid.calc_single_all_ret(expr)
    ic_test, ric_test = calculator_test.calc_single_all_ret(expr)
    return {'ic_test': ic_test,
            'ic_valid': ic_valid,
            'ric_test': ric_test,
            'ric_valid': ric_valid}


def try_pool(capacity):
    pool = AlphaPool(capacity=capacity,
                     calculator=calculator_train,
                     ic_lower_bound=None)

    exprs = []
    for key in dict(Counter(cache).most_common(capacity)):
        exprs.append(eval(key))
    pool.force_load_exprs(exprs)
    pool._optimize(alpha=5e-3, lr=5e-4, n_iter=2000)

    ic_test, ric_test = pool.test_ensemble(calculator_test)
    ic_valid, ric_valid = pool.test_ensemble(calculator_valid)
    return {'ic_test': ic_test,
            'ic_valid': ic_valid,
            'ric_test': ric_test,
            'ric_valid': ric_valid}


generation = 0

def ev():
    global generation
    generation += 1
    res = (
        [{'pool': 0, 'res': try_single()}] +
        [{'pool': cap, 'res': try_pool(cap)} for cap in (10, 20, 50, 100)]
    )
    print(res)
    dir_ = './path/1110'
    os.makedirs(dir_, exist_ok=True)
    if generation % 2 == 0:
        with open(f'{dir_}/{generation}.json', 'w') as f:
            json.dump({'cache': cache, 'res': res}, f)

[527955:MainThread](2024-11-10 12:06:25,054) INFO - qlib.Initialization - [config.py:416] - default_conf: client.
[527955:MainThread](2024-11-10 12:06:25,362) INFO - qlib.Initialization - [__init__.py:74] - qlib successfully initialized based on client settings.
[527955:MainThread](2024-11-10 12:06:25,363) INFO - qlib.Initialization - [__init__.py:76] - data_path={'__DEFAULT_FREQ': PosixPath('/root/jupyter/CTA/alphagen/my_data/qlib')}


In [2]:
funcs = [make_function(**func._asdict()) for func in generic_funcs]
len(funcs)

78

In [3]:
import ast
import astor
import re

class FormulaTransformer(ast.NodeTransformer):
    def visit_Call(self, node):
        # 检查是否是函数调用
        if isinstance(node.func, ast.Name):
            # 提取函数名称
            func_name = node.func.id

            match = re.match(r'(Ref|Mean|Sum|Std|Var|Max|Min|Med|Mad|Delta|WMA|EMA|Cov|Corr)(\d+)', func_name)
            if match:
                base_name = match.group(1)  # 提取函数名称 'mean' 或 'std'
                window_size = int(match.group(2))  # 提取数字部分

                # 创建新的函数名称节点
                node.func.id = base_name

                # 添加第二个参数（窗口大小）到参数列表
                node.args.append(ast.Constant(value=window_size, kind=None))

        # 递归访问子节点
        self.generic_visit(node)
        return node

In [4]:
from deap import base, creator, tools, gp, algorithms
from functools import partial
import random


features = ['open_', 'high', 'low', 'close', 'volume', 
            'quote_asset_volume', 'number_of_trades', 
            'taker_buy_base_asset_volume', 'taker_buy_quote_asset_volume']
# features = [open_, high, low, close, volume, 
#             quote_asset_volume, number_of_trades, 
#             taker_buy_base_asset_volume, taker_buy_quote_asset_volume]
constants = [f'Constant({v})' for v in [-30., -10., -5., -2., -1., -0.5, -0.01, 0.01, 0.5, 1., 2., 5., 10., 30.]]
# constants = []

terminals = features + constants

# Define custom fitness function for DEAP
def custom_fitness(individual):
    expression_tree = ast.parse(str(individual), mode='eval')
    transformer = FormulaTransformer()
    modified_tree = transformer.visit(expression_tree)
    new_expr = astor.to_source(modified_tree)
    new_expr = new_expr.replace("$", "")
    # print(new_expr)
    tmp = 'calculator_train.calc_single_IC_ret({})'.format(new_expr)
    # ic = eval(tmp)
    try:
        ic = eval(tmp)
    except OutOfDataRangeError:
        # print(new_expr)
        ic = -0.
    if np.isnan(ic):
        ic = -0.
    cache[new_expr] = ic
    return [abs(ic)]

# Define DEAP creator
creator.create("FitnessMax", base.Fitness, weights=(1.0,))  # Minimize fitness for regression
creator.create("Individual", gp.PrimitiveTree, fitness=creator.FitnessMax)

# Initialize DEAP primitive set
pset = gp.PrimitiveSet("MAIN", arity=len(features))  # Set number of input variables based on your requirements
for i, feature in enumerate(terminals):
    pset.renameArguments(**{f'ARG{i}':feature})
# Add customized operation
for func in generic_funcs:
    pset.addPrimitive(func.function, func.arity, name=func.name)
    
# pset.addEphemeralConstant("randnum",  partial(random.choice, [-30., -10., -5., -2., -1., -0.5, -0.01, 0.01, 0.5, 1., 2., 5., 10., 30.]))

# Register necessary operators for DEAP's genetic programming
toolbox = base.Toolbox()
toolbox.register("expr", gp.genHalfAndHalf, pset=pset, min_=2, max_=6)
toolbox.register("individual", tools.initIterate, creator.Individual, toolbox.expr)
toolbox.register("population", tools.initRepeat, list, toolbox.individual)
toolbox.register("evaluate", custom_fitness)
toolbox.register("select", tools.selTournament, tournsize=600)
toolbox.register("mate", gp.cxOnePoint)
toolbox.register("expr_mut", gp.genFull, min_=2, max_=6)
toolbox.register("mutate", gp.mutUniform, expr=toolbox.expr_mut, pset=pset)

In [5]:
import random
import numpy


def main():
    # random.seed(328)
    pop = toolbox.population(n=1000)
    hof = tools.HallOfFame(20)

    stats = tools.Statistics(lambda ind: ind.fitness.values)
    # stats_size = tools.Statistics(len)
    # mstats = tools.MultiStatistics(fitness=stats_fit, size=stats_size)
    stats.register("avg", numpy.mean)
    stats.register("std", numpy.std)
    stats.register("min", numpy.min)
    stats.register("max", numpy.max)

    pop, log = algorithms.eaSimple(pop, toolbox, cxpb=0.6, mutpb=0.3, ngen=40, stats=stats,
                                   halloffame=hof, verbose=True)
    # print log
    return pop, log, hof

In [6]:
pop, log, hof = main()

gen	nevals	avg       	std       	min	max      
0  	1000  	0.00217513	0.00285279	0  	0.0138404
1  	709   	0.00712168	0.00577287	0  	0.0143444
2  	712   	0.0078907 	0.00631827	0  	0.0319867
3  	734   	0.0118437 	0.0113524 	0  	0.0512869
4  	722   	0.0294278 	0.0245362 	0  	0.051287 
5  	710   	0.0294683 	0.0243855 	0  	0.0512873
6  	699   	0.0296041 	0.0244541 	0  	0.0512873
7  	744   	0.0293958 	0.0243963 	0  	0.0512873
8  	725   	0.0284858 	0.0246659 	0  	0.0512873
9  	706   	0.0299309 	0.0243767 	0  	0.0512873
10 	727   	0.029364  	0.0244559 	0  	0.0512873
11 	745   	0.0292942 	0.0244621 	0  	0.0512873
12 	705   	0.0286998 	0.0247412 	0  	0.0512873
13 	744   	0.0273011 	0.0246208 	0  	0.0512873
14 	736   	0.0283814 	0.0245334 	0  	0.0512873
15 	734   	0.0288667 	0.0244585 	0  	0.0512873
16 	710   	0.0297667 	0.0244333 	0  	0.0512873
17 	707   	0.0291929 	0.0244308 	0  	0.0512873
18 	701   	0.0285306 	0.0245375 	0  	0.0512873
19 	707   	0.0299601 	0.0243961 	0  	0.0512873
20 	755   	0.

In [7]:
for i, individual in enumerate(hof):
    print(f"Expression {i + 1}: {str(individual)}")

Expression 1: Div(WMA20(Less(Less(WMA30(Add(taker_buy_base_asset_volume, close)), close), close)), Less(WMA30(WMA30(Add(Ref20(taker_buy_base_asset_volume), WMA30(Add(taker_buy_base_asset_volume, close))))), Less(WMA30(WMA30(Add(taker_buy_base_asset_volume, WMA30(Add(taker_buy_base_asset_volume, low))))), Less(WMA30(WMA30(Add(Ref20(taker_buy_base_asset_volume), WMA30(Add(taker_buy_base_asset_volume, close))))), Less(WMA30(Add(Std50(Ref20(low)), WMA30(Add(taker_buy_base_asset_volume, close)))), close)))))
Expression 2: Div(WMA20(Less(WMA30(Add(taker_buy_base_asset_volume, Add(taker_buy_base_asset_volume, WMA20(Less(quote_asset_volume, close))))), close)), Less(WMA30(WMA30(Add(Ref20(taker_buy_base_asset_volume), WMA30(Add(taker_buy_base_asset_volume, close))))), Less(WMA30(WMA30(Add(taker_buy_base_asset_volume, WMA30(Add(taker_buy_base_asset_volume, low))))), close)))
Expression 3: Div(WMA20(Less(WMA30(Add(taker_buy_base_asset_volume, close)), close)), Less(WMA30(WMA30(Add(Ref20(taker_buy

In [8]:
# data_df = data_test.make_dataframe(expr.evaluate(data_test))
# data_df.reset_index(inplace=True)
# data_df.rename(columns={'0':expr}, inplace=True)