### Gp learning in alpha research

### Step 1: import numpy, pandas and gplearn

In [None]:
import numpy as np
import pandas as pd
from gplearn.functions import make_function

# Define custom functions
def rank(x):
    return pd.Series(x).rank().values

def delay(x, d):
    """ Return the array x delayed by d time steps. Handles array inputs for d."""
    # Ensure d is a scalar even if passed as an array by gplearn
    d = int(d[0]) if isinstance(d, np.ndarray) and d.size > 0 else int(d)
    
    # If delay is zero or negative, return x as is
    if d <= 0:
        return x
    
    # If delay is greater than or equal to the length of x, return an array of zeros
    if d >= len(x):
        return np.zeros_like(x)
    
    # Otherwise, prepend zeros and truncate the last d elements
    return np.concatenate([np.zeros(d), x[:-d]])

def correlation(x, y, d):
    """Computes rolling correlation with protection against non-positive window size."""
    d = int(np.round(d[0])) if isinstance(d, np.ndarray) and d.size > 0 else int(d)
    if d <= 0:
        return np.zeros_like(x)  # Return zero correlation if window size is non-positive
    else:
        return pd.Series(x).rolling(max(d, 1)).corr(pd.Series(y)).fillna(0).values


def covariance(x, y, d):
    """Computes rolling covariance, ensuring non-negative window size."""
    d = max(int(np.round(d[0])) if isinstance(d, np.ndarray) else int(d), 1)
    return pd.Series(x).rolling(d).cov(pd.Series(y)).fillna(0).values

def scale(x, a):
    """Scales the input x by a factor a, normalized by the sum of the absolute values of x."""
    if np.sum(np.abs(x)) == 0:
        return np.zeros_like(x)
    return a * x / np.sum(np.abs(x))

def delta(x, d):
    """Computes the difference of x and its delayed version by d steps."""
    d = max(int(np.round(d[0])) if isinstance(d, np.ndarray) else int(d), 1)
    return x - delay(x, d)

def signedpower(x, a):
    """Applies a signed power transformation to x with power a."""
    return np.sign(x) * (np.abs(x) ** a)

def decay_linear(x, d):
    """Applies a linear decay function over a rolling window of size d."""
    d = max(int(np.round(d[0])) if isinstance(d, np.ndarray) else int(d), 1)
    weights = np.arange(1, d + 1)[::-1]
    if len(x) < d:
        return np.zeros_like(x)
    return pd.Series(x).rolling(window=d).apply(lambda x: np.dot(x, weights) / weights.sum(), raw=True).values


### Step 2: make function

In [None]:
from gplearn.functions import _Function, make_function

# Wrap custom functions for gplearn
rank_func = make_function(function=rank, name='rank', arity=1)
delay_func = make_function(function=delay, name='delay', arity=2, wrap=False)
correlation_func = make_function(function=correlation, name='correlation', arity=3)
covariance_func = make_function(function=covariance, name='covariance', arity=3)
scale_func = make_function(function=scale, name='scale', arity=2)
delta_func = make_function(function=delta, name='delta', arity=2)
signedpower_func = make_function(function=signedpower, name='signedpower', arity=2)
decay_linear_func = make_function(function=decay_linear, name='decay_linear', arity=2)
# Create a function set
function_set = [
    rank_func,
    delay_func,
    correlation_func,
    covariance_func,
    scale_func,
    delta_func,
    signedpower_func,
    decay_linear_func,
    # Add other custom functions as needed
]


### Test of the Functions

In [None]:
# 测试数据
x = np.random.rand(100)
y = np.random.rand(100)
d = -5  # 非法的 d 值

# 测试 covariance 函数
print("Covariance with d = -5:", covariance(x, y, d))

d = 5  # 合法的 d 值
print("Covariance with d = 5:", covariance(x, y, d))


### Step 3: preprocess of data

In [None]:
# Process of dataset
def load_data(filepath):
    data = pd.read_csv(filepath, parse_dates=True, index_col='Date')
    return data

def filter_st_pt(data, stock_type_column='StockType'):
    return data[~data[stock_type_column].isin(['ST', 'PT'])]

def handle_missing_values(data, method='drop'):
    if method == 'drop':
        return data.dropna()
    elif method == 'fill':
        return data.fillna(method='ffill')  # 前向填充
    return data

def restrict_date_range(data, start_date, end_date):
    return data.loc[start_date:end_date]

def prepare_stock_data(filepath, start_date, end_date):
    data = load_data(filepath)
    data = filter_st_pt(data)
    data = handle_missing_values(data)
    data = restrict_date_range(data, start_date, end_date)
    return data

# 用VWAP计算目标向量y，此处日频数据考虑1日vwap收益 @27/6/2024
""" 
data['1d_return_vwap'] = data['vwap'].pct_change(1).shift(-1)
data.dropna(inplace=True)
print(data[['vwap', '1d_return_vwap']].head())
 """



### Step 4: Fitness Function (RankIC)

In [32]:
def winsorize_series(series, limits=[0.05, 0.95]):
    """中位数去极值化，通常使用5%和95%的分位数去极值。"""
    quantiles = series.quantile(limits)
    return series.clip(quantiles.iloc[0], quantiles.iloc[1])

def neutralize_series(series, by):
    """中性化处理"""

def standardize_series(series):
    """将序列标准化，使均值为0，标准差为1。"""
    return (series - series.mean()) / series.std()

def compute_rank_ic(factor, returns):
    """计算因子和收益率的 RankIC"""
    return factor.corr(returns, method='spearman') # 用spearman系数，比Pearson效果好

def rank_ic_fitness(estimator, X, y, sample_weight):
    """
    自定义适应度函数，计算 RankIC 作为适应度值。
    X 是因子矩阵，y 是收益率矩阵。
    """
    factor_df = pd.DataFrame(X)
    returns_df = pd.DataFrame(y)
    
    # 处理因子
    factor_winsorized = factor_df.apply(winsorize_series)
    factor_standardized = factor_winsorized.apply(standardize_series)
    
    rank_ic_values = []
    for i in range(factor_standardized.shape[1]):
        rank_ic = compute_rank_ic(factor_standardized.iloc[:, i], returns_df.iloc[:, i])
        rank_ic_values.append(rank_ic)
    
    # 取平均 RankIC 作为适应度值
    average_rank_ic = np.mean(rank_ic_values)
    return average_rank_ic



In [33]:
from gplearn.genetic import SymbolicTransformer, _fitness_map
from gplearn.functions import make_function

# 注册自定义适应度函数
_fitness_map['rank_ic'] = rank_ic_fitness

# 定义自定义 SymbolicTransformer 类
class CustomSymbolicTransformer(SymbolicTransformer):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    def _evaluate(self, program, X, y, sample_weight):
        """Evaluate the program on the fitness criterion."""
        y_pred = self._program.execute(X)
        return self._metric(y_pred, y, sample_weight)


### Step 5:Build the Model

In [None]:
from gplearn.genetic import SymbolicRegressor

data = pd.read_csv()
# 准备模型输入输出
features = data[['open', 'close', 'high', 'low', 'vwap']].dropna()
target = data['1d_return_vwap'].dropna()

# 确保特征和目标行对齐
features, target = features.align(target, join='inner')
# Parameters
gp = SymbolicRegressor(population_size=1000,
                       generations=3,
                       tournament_size=20,
                       stopping_criteria=0.01,
                       function_set=function_set,
                       metric='rank_ic',  
                       p_crossover=0.4,
                       p_subtree_mutation=0.01,
                       p_hoist_mutation=0,
                       p_point_mutation=0.01,
                       p_point_replace=0.4,
                       max_samples=0.9,
                       verbose=1,
                       random_state=0,
                       n_jobs=-1)




### Train


In [None]:
gp.fit(features.values, target.values)

### Result

In [None]:
# 提取最佳公式
best_programs = gp._best_programs
for program in best_programs:
    print(program)