gplearn官方源代码：  https://github.com/trevorstephens/gplearn/tree/main/gplearn  
gplearn官方文档说明： https://gplearn.readthedocs.io/

In [81]:
import numpy as np
import pandas as pd
from scipy.stats import rankdata
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_squared_error
from gplearn import genetic
from gplearn.functions import make_function
from gplearn.genetic import SymbolicTransformer, SymbolicRegressor
from gplearn.fitness import make_fitness
from sklearn.utils import check_random_state
from sklearn.model_selection import train_test_split
import warnings

warnings.filterwarnings("ignore")

In [82]:
# 提取数据
stock = pd.read_csv("./play_data.csv")
stock = stock[stock[ "InnerCode" ].isin( [3]) ].drop(columns=['InnerCode',"Unnamed: 0","Ifsuspend" ], axis=1)
stock['TradingDay'] = pd.to_datetime( stock['TradingDay'] )

In [83]:
# 取对数
for i in [ 'TurnoverVolume', 'TurnoverValue', 'TotalMV', 'NegotiableMV' ]:
    stock[i] = stock[i].apply(lambda x: np.log(x) if x > 0 else x)
    
# 计算回报率return
stock['Return'] = np.log(stock['ClosePrice'] / stock['ClosePrice'].shift(1))
stock['Return'].fillna(0, inplace=True)

# 确定特征列和目标列
Xcols = [ 'TurnoverVolume','TurnoverValue','TotalMV','NegotiableMV','OpenPrice','HighPrice','LowPrice','ClosePrice','AvgPrice']
ycols = ['Return']

# 数据标准化
transfer = StandardScaler()
for var in Xcols:
    stock[var] = transfer.fit_transform(stock[var].values.reshape(-1, 1))
    
# 解释变量滞后一期
for var in Xcols:
    stock[var] = stock[var].shift(1)
    
# 删除缺失值行
stock.dropna(inplace=True)

# 重置索引
stock.reset_index(drop=True, inplace=True)

stock.head()

Unnamed: 0,TradingDay,OpenPrice,HighPrice,LowPrice,ClosePrice,TurnoverVolume,TurnoverValue,AvgPrice,TotalMV,NegotiableMV,Return
0,2015-01-06,0.704338,0.70912,0.67001,0.707771,2.014822,1.930008,0.696756,-0.511968,-0.693998,-0.015095
1,2015-01-07,0.668012,0.736948,0.656677,0.64557,1.548267,1.551071,0.691633,-0.557227,-0.733502,-0.019194
2,2015-01-08,0.592766,0.59528,0.590012,0.567819,1.141346,1.18376,0.576401,-0.614779,-0.783734,-0.034169
3,2015-01-09,0.577197,0.529505,0.483348,0.433051,0.824489,0.893755,0.477318,-0.71723,-0.873155,0.007989
4,2015-01-12,0.421514,0.605399,0.432682,0.464151,1.794404,1.693472,0.522257,-0.693275,-0.852247,-0.020771


In [84]:
X = stock[Xcols].values
y = stock[ycols].values.ravel()

X.shape, y.shape

((1641, 9), (1641,))

# 自定义算子

In [85]:
def _ts_max(data): # 历史滚动最大
    with np.errstate(divide ='ignore', invalid='ignore', over='ignore'):
        df = pd.DataFrame({'0':data})
        value = df['0'].rolling(10).max()
        return np.nan_to_num(value.values)
    
def _ts_min(data): # 历史滚动最小
    with np.errstate(divide ='ignore', invalid='ignore', over='ignore'):
        df = pd.DataFrame({'0':data})
        value = df['0'].rolling(10).min()
        return np.nan_to_num(value.values)

def _ts_std(data): # 历史滚动标准差
    with np.errstate(divide ='ignore', invalid='ignore', over='ignore'):
        df = pd.DataFrame({'0':data})
        value = df['0'].rolling(10).std()
        return np.nan_to_num(value.values)

def _ts_rank(data): # 历史滚动排名
    with np.errstate(divide ='ignore', invalid='ignore', over='ignore'):
        df = pd.DataFrame({'0':data})
        value = df['0'].rolling(10).apply(lambda x: rankdata(x)[-1],raw=True)    
        return np.nan_to_num(value.values)

def _ts_argmax(data): # 历史滚动最大值距离
    with np.errstate(divide ='ignore', invalid='ignore', over='ignore'):
        df = pd.DataFrame({'0':data})
        value = df['0'].rolling(10).apply(lambda x: 10-np.argmax(x)-1,raw=True)
        return np.nan_to_num(value.values)

def _ts_argmin(data): # 历史滚动最小值距离
    with np.errstate(divide ='ignore', invalid='ignore', over='ignore'):
        df = pd.DataFrame({'0':data})
        value = df['0'].rolling(10).apply(lambda x: 10-np.argmin(x)-1,raw=True)
        return np.nan_to_num(value.values)

def _ts_corr(df1,df2): # 历史滚动相关系数
    with np.errstate(divide ='ignore', invalid='ignore', over='ignore'):
        df = pd.DataFrame({'0':df1,'1':df2})
        value = df[['0','1']].rolling(10).corr()
        value.index.names = ['index','key']
        value = value.query('key==\'0\'')['1']
        return np.nan_to_num(value.values)

def _ts_cov(df1,df2): # 历史滚动协方差
    with np.errstate(divide ='ignore', invalid='ignore', over='ignore'):
        df = pd.DataFrame({'0':df1,'1':df2})
        value = df[['0','1']].rolling(10).cov()
        value.index.names = ['index','key']
        value = value.query('key==\'0\'')['1']
        return np.nan_to_num(value.values)

def _ts_sum(data): # 历史滚动求和
    with np.errstate(divide ='ignore', invalid='ignore', over='ignore'):
        df = pd.DataFrame({'0':data})
        value = df['0'].rolling(10).sum()
        return np.nan_to_num(value.values)

def _ts_prod(data): # 历史滚动累乘
    with np.errstate(divide ='ignore', invalid='ignore', over='ignore'):
        df = pd.DataFrame({'0':data})
        value = df['0'].rolling(10).apply(lambda x: x.prod(),raw=True)
        return np.nan_to_num(value.values)

def _ts_delay(data): # 滞后
    with np.errstate(divide ='ignore', invalid='ignore', over='ignore'):
        df = pd.DataFrame({'0':data})
        value = df['0'].shift(10)
        return np.nan_to_num(value.values)

def _ts_delta(data): # 滞后差值
    with np.errstate(divide ='ignore', invalid='ignore', over='ignore'):
        df = pd.DataFrame({'0':data})
        delay = df['0'].shift(10).values
        value = df['0'].values-delay    
        return np.nan_to_num(value)

def _ts_wma(data): # 历史滚动加权平均
    with np.errstate(divide ='ignore', invalid='ignore', over='ignore'):
        df = pd.DataFrame({'0':data})
        day = np.arange(1,10+1,1)
        value = df['0'].rolling(10).apply(lambda x: (x*day/day.sum()).sum(),raw=True)
        return np.nan_to_num(value.values)

In [86]:
ts_max = make_function(function=_ts_max, name='ts_max', arity=1)
ts_min = make_function(function=_ts_min, name='ts_min', arity=1)
ts_wma = make_function(function=_ts_wma, name='ts_wma', arity=1)
ts_std = make_function(function=_ts_std, name='ts_std', arity=1)
ts_rank = make_function(function=_ts_rank, name='ts_rank', arity=1)
ts_argmax = make_function(function=_ts_argmax, name='ts_argmax', arity=1)
ts_argmin = make_function(function=_ts_argmin, name='ts_argmin', arity=1)
ts_corr = make_function(function=_ts_corr, name='ts_corr', arity=2)
ts_cov = make_function(function=_ts_cov, name='ts_cov', arity=2)
ts_sum = make_function(function=_ts_sum, name='ts_sum', arity=1)
ts_prod = make_function(function=_ts_prod, name='ts_prod', arity=1)
ts_delay = make_function(function=_ts_delay, name='ts_delay', arity=1)
ts_delta = make_function(function=_ts_delta, name='ts_delta', arity=1)

custom_function = [ ts_max, ts_min, ts_wma, ts_std, ts_rank, ts_argmax, ts_argmin, ts_corr, \
                   ts_cov, ts_sum, ts_prod, ts_delay, ts_delta ]

In [87]:
init_function = ['add', 'sub', 'mul', 'div', 'sqrt', 'log', 'abs', 'neg', 'inv', 'max', 'min', 'sin', 'cos', 'tan' ]

# Symbolic Regressor

In [121]:
%%time

predictions = []

for i in range(len(stock) - 1600):                 
    # 获取训练数据
    X_train = X[i:i+1600]
    y_train = y[i:i+1600]
    
    # 初始化模型
    generations = 3 # 进化世代数
    population_size = 1000 # 每一代中的公式数量
    tournament_size = 20 # 每一代中被随机选中计算适应度的公式数
    const_range = (-10.0,10.0) 
    function_set = init_function + custom_function # 函数算子
    random_state = 1 # 设置随机种子
    factor_gp = SymbolicRegressor(feature_names=Xcols, 
                                    function_set=function_set, 
                                    generations=generations, 
                                    population_size=population_size, 
                                    tournament_size=tournament_size, 
                                    const_range=const_range, 
                                    random_state=random_state,
                                    metric='mse',             # 设定适应度fitness
                                    init_depth=(1,4),
                                    p_crossover=0.4,          # 交叉概率
                                    p_subtree_mutation=0.01,  # 子树变异概率
                                    p_hoist_mutation=0,       # 提升变异概率
                                    p_point_mutation=0.01,    # 点变异概率
                                    p_point_replace=0.4 )     # 点变异中父代每个节点变异概率 
    
    # 模型训练和预测
    factor_gp.fit(X_train, y_train)
    prediction=factor_gp.predict(X[i+1600].reshape(1, -1))
    
    predictions.append(prediction.item())    

CPU times: total: 9min 49s
Wall time: 29min 47s


In [122]:
# 计算预测误差
true_values = y[1600:]
msfe = mean_squared_error(true_values, predictions)

print(f"Mean Squared Forecasting Error (MSFE): {msfe}")

Mean Squared Forecasting Error (MSFE): 0.0007230980094704057


# SymbolicTransformer

In [88]:
X_train = stock[Xcols].iloc[:1441,:].values
y_train = stock[ycols].iloc[:1441,:].values.ravel()
X_test = stock[Xcols].iloc[1441:1642,:].values
y_test = stock[ycols].iloc[1441:1642,:].values.ravel()

X_train.shape, y_train.shape, X_test.shape, y_test.shape

((1441, 9), (1441,), (200, 9), (200,))

## 单独使用Ridge回归

In [114]:
%%time

from sklearn.linear_model import Ridge

# ridge
ridge_model = Ridge(alpha=0.1)
ridge_model.fit(X_train, y_train)
pred = ridge_model.predict(X_test)

# 计算预测误差
msfe = mean_squared_error(y_test, pred)

print(f"Ridge's MSFE: {msfe}")

Ridge's MSFE: 0.0006759091018275882
CPU times: total: 0 ns
Wall time: 3.01 ms


## SymbolicTransformer因子挖掘＋Ridge

In [94]:
%%time

generations = 3 # 进化世代数
population_size = 1000 # 每一代中的公式数量
tournament_size = 20 # 每一代中被随机选中计算适应度的公式数
const_range = (-10.0,10.0) 
function_set = init_function + custom_function # 函数算子
random_state = 1 # 设置随机种子
factor_gp2 = SymbolicTransformer(feature_names=Xcols, 
                                function_set=function_set, 
                                generations=generations, 
                                population_size=population_size, 
                                tournament_size=tournament_size, 
                                const_range=const_range, 
                                random_state=random_state,
                                parsimony_coefficient=0.0005,  # 惩罚项
                                metric='spearman',        # 设定适应度fitness
                                n_components=5,           # 因子挖掘个数
                                init_depth=(1,4),
                                verbose=1,
                                p_crossover=0.4,          # 交叉概率
                                p_subtree_mutation=0.01,  # 子树变异概率
                                p_hoist_mutation=0,       # 提升变异概率
                                p_point_mutation=0.01,    # 点变异概率
                                p_point_replace=0.4 )     # 点变异中父代每个节点变异概率 
factor_gp2.fit(X_train, y_train)

    |   Population Average    |             Best Individual              |
---- ------------------------- ------------------------------------------ ----------
 Gen   Length          Fitness   Length          Fitness      OOB Fitness  Time Left
   0     4.91        0.0226607        4        0.0887113              N/A     43.44s
   1     4.57        0.0449912        4        0.0887113              N/A     24.32s
   2     4.43        0.0674172        6         0.102192              N/A      0.00s
CPU times: total: 14.3 s
Wall time: 52.2 s


In [126]:
best_programs = factor_gp2._best_programs
best_programs_dict = {}
for p in best_programs:
    factor_name = 'alpha_'+str(best_programs.index(p)+1)
    best_programs_dict[factor_name] = {'fitness':p.fitness_, 'expression':str(p), 'depth':p.depth_, 'length':p.length_}
best_programs_dict = pd.DataFrame(best_programs_dict).T
best_programs_dict = best_programs_dict.sort_values(by='fitness',ascending=False)
best_programs_dict

Unnamed: 0,fitness,expression,depth,length
alpha_1,0.099192,"add(ts_delta(AvgPrice), div(HighPrice, LowPrice))",2,6
alpha_2,0.096116,ts_rank(log(sin(ts_argmax(AvgPrice)))),4,5
alpha_3,0.090575,"div(HighPrice, LowPrice)",1,3
alpha_4,0.087471,"log(sin(mul(ts_argmin(ClosePrice), ts_argmax(H...",4,7
alpha_5,0.086711,log(sin(ts_argmax(AvgPrice))),3,4


In [96]:
gp_features = factor_gp2.transform(X)          # 生成新特征
gp_X = np.hstack( (X, gp_features))            # 合并
gp_X_train = gp_X[:1441,:]
gp_X_test = gp_X[1441:1642,:]

gp_X_train.shape, gp_X_test.shape

((1441, 14), (200, 14))

In [115]:
# ridge
ridge_model = Ridge(alpha=0.1)
ridge_model.fit(gp_X_train, y_train)
pred = ridge_model.predict(gp_X_test)

# 计算预测误差
msfe = mean_squared_error(y_test, pred)

print(f"Ridge's MSFE: {msfe}")

Ridge's MSFE: 0.0006733253033652069


## 实证应用-样本内

In [123]:
# ridge
ridge_model = Ridge(alpha=0.1)
ridge_model.fit(X, y)
r2 = ridge_model.score(X, y)
n = len(y)  # 样本数量
k = X.shape[1]  # 特征数量
adjusted_r2 = 1 - ((1 - r2) * (n - 1) / (n - k - 1))
print("adjusted_r2:",adjusted_r2)

adjusted_r2: 0.008934392318252438


In [124]:
# ridge
ridge_model = Ridge(alpha=0.1)
ridge_model.fit(gp_X, y)
r2 = ridge_model.score(gp_X, y)
n = len(y)  # 样本数量
k = gp_X.shape[1]  # 特征数量
adjusted_r2 = 1 - ((1 - r2) * (n - 1) / (n - k - 1))
print("adjusted_r2:",adjusted_r2)

adjusted_r2: 0.021167078671585737
