### Trying the `gplearn` module

In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import gplearn as gpl
from gplearn.genetic import SymbolicRegressor
from gplearn.functions import make_function, _Function
from sklearn.model_selection import train_test_split
from joblib import wrap_non_picklable_objects

In [2]:
%matplotlib inline

In [3]:
def rolling_window(arr, window):
    shape = arr.shape[:-1] + (arr.shape[-1] - window + 1, window)
    strides = arr.strides + (arr.strides[-1],)
    return np.lib.stride_tricks.as_strided(arr, shape=shape, strides=strides)

In [4]:
# Define the function set

@wrap_non_picklable_objects
def rank_by_stock_func(x):
    sorted_idx = np.argsort(x)
    ranks = np.empty_like(sorted_idx, dtype=float)
    ranks[sorted_idx] = np.arange(1, len(x) + 1) / len(x)
    return ranks

@wrap_non_picklable_objects
def ts_rank_func(x):
    window = 2  # Fixed window size
    ranks = np.zeros_like(x)
    for i in range(len(x)):
        if i >= window - 1:
            sub_arr = x[i - window + 1:i + 1]
            ranks[i] = np.argsort(np.argsort(sub_arr))[-1] / len(sub_arr)
    return ranks

@wrap_non_picklable_objects
def ts_sum_func(x):
    window = 10  # Fixed window size
    return np.convolve(x, np.ones(window), mode='same')

@wrap_non_picklable_objects
def ts_min_func(x):
    window = 10  # Fixed window size
    if len(x) < window:
        # Handle case where array is shorter than the window size
        return np.full_like(x, np.nan)

    # Calculate the rolling minimum
    result = np.full_like(x, np.inf)
    for i in range(len(x)):
        if i >= window - 1:
            result[i] = np.min(x[i - window + 1:i + 1])
    return np.where(np.isfinite(result), result, 0)  # Replace inf with 0 for closure

@wrap_non_picklable_objects
def ts_max_func(x):
    window = 10  # Fixed window size
    if len(x) < window:
        # Handle case where array is shorter than the window size
        return np.full_like(x, np.nan)

    # Calculate the rolling maximum
    result = np.full_like(x, -np.inf)
    for i in range(len(x)):
        if i >= window - 1:
            result[i] = np.max(x[i - window + 1:i + 1])
    return np.where(np.isfinite(result), result, 0)  # Replace -inf with 0 for closure

@wrap_non_picklable_objects
def ts_delay_func(x):
    delay = 10  # Fixed delay
    return np.roll(x, delay)

@wrap_non_picklable_objects
def sma_func(x):
    window = 10  # Fixed window size
    return np.convolve(x, np.ones(window) / window, mode='same')

@wrap_non_picklable_objects
def stddev_func(x):
    window = 10  # Fixed rolling window size
    if len(x) < window:
        # Handle case where the array length is shorter than the window
        return np.full_like(x, np.nan)

    # Calculate the rolling standard deviation
    result = np.full_like(x, np.nan)
    for i in range(len(x)):
        if i >= window - 1:
            sub_arr = x[i - window + 1:i + 1]
            result[i] = np.std(sub_arr)

    # Replace NaN with 0 to ensure closure against invalid inputs
    return np.where(np.isnan(result), 0, result)

@wrap_non_picklable_objects
def correlation_func(x, y):
    window = 10  # Fixed rolling window size
    if len(x) < window or len(y) < window:
        # Handle cases where arrays are shorter than the window size
        return np.full_like(x, np.nan)

    result = np.full_like(x, np.nan)
    for i in range(len(x)):
        if i >= window - 1:
            x_window = x[i - window + 1:i + 1]
            y_window = y[i - window + 1:i + 1]
            if np.std(x_window) == 0 or np.std(y_window) == 0:
                # Correlation is undefined if one window has zero standard deviation
                result[i] = 0
            else:
                result[i] = np.corrcoef(x_window, y_window)[0, 1]

    # Replace NaN values with 0 for closure
    return np.where(np.isnan(result), 0, result)

@wrap_non_picklable_objects
def covariance_func(x, y):
    window = 10  # Fixed rolling window size
    if len(x) < window or len(y) < window:
        # Handle cases where arrays are shorter than the window size
        return np.full_like(x, np.nan)

    result = np.full_like(x, np.nan)
    for i in range(len(x)):
        if i >= window - 1:
            x_window = x[i - window + 1:i + 1]
            y_window = y[i - window + 1:i + 1]
            if np.all(x_window == x_window[0]) or np.all(y_window == y_window[0]):
                # Covariance is 0 if either window is constant
                result[i] = 0
            else:
                result[i] = np.cov(x_window, y_window)[0, 1]

    # Replace NaN values with 0 for closure
    return np.where(np.isnan(result), 0, result)

In [5]:
rank_by_stock_gp = make_function(function=rank_by_stock_func, name="rank_by_stock", arity=1)
ts_rank_gp = make_function(function=ts_rank_func, name="ts_rank", arity=1)
ts_sum_gp = make_function(function=ts_sum_func, name="ts_sum", arity=1)
ts_min_gp = make_function(function=ts_min_func, name="ts_min", arity=1)
ts_max_gp = make_function(function=ts_max_func, name="ts_max", arity=1)
ts_delay_gp = make_function(function=ts_delay_func, name="ts_delay", arity=1)
sma_gp = make_function(function=sma_func, name="sma", arity=1)
stddev_gp = make_function(function=stddev_func, name="stddev", arity=1)
correlation_gp = make_function(function=correlation_func, name="correlation", arity=2)
covariance_gp = make_function(function=covariance_func, name="covariance", arity=2)

In [6]:
def _protected_division(x1, x2):
    """Closure of division (x1/x2) for zero denominator."""
    with np.errstate(divide='ignore', invalid='ignore'):
        return np.where(np.abs(x2) > 0.001, np.divide(x1, x2), 1.)

def _protected_sqrt(x1):
    """Closure of square root for negative arguments."""
    return np.sqrt(np.abs(x1))

def _protected_log(x1):
    """Closure of log for zero and negative arguments."""
    with np.errstate(divide='ignore', invalid='ignore'):
        return np.where(np.abs(x1) > 0.001, np.log(np.abs(x1)), 0.)

def _protected_inverse(x1):
    """Closure of inverse for zero arguments."""
    with np.errstate(divide='ignore', invalid='ignore'):
        return np.where(np.abs(x1) > 0.001, 1. / x1, 0.)

def _sigmoid(x1):
    """Special case of logistic function to transform to probabilities."""
    with np.errstate(over='ignore', under='ignore'):
        return 1 / (1 + np.exp(-x1))

In [7]:
add2 = _Function(function=np.add, name='add', arity=2)
sub2 = _Function(function=np.subtract, name='sub', arity=2)
mul2 = _Function(function=np.multiply, name='mul', arity=2)
div2 = _Function(function=_protected_division, name='div', arity=2)
sqrt1 = _Function(function=_protected_sqrt, name='sqrt', arity=1)
log1 = _Function(function=_protected_log, name='log', arity=1)
neg1 = _Function(function=np.negative, name='neg', arity=1)
inv1 = _Function(function=_protected_inverse, name='inv', arity=1)
abs1 = _Function(function=np.abs, name='abs', arity=1)
max2 = _Function(function=np.maximum, name='max', arity=2)
min2 = _Function(function=np.minimum, name='min', arity=2)
sin1 = _Function(function=np.sin, name='sin', arity=1)
cos1 = _Function(function=np.cos, name='cos', arity=1)
tan1 = _Function(function=np.tan, name='tan', arity=1)
sig1 = _Function(function=_sigmoid, name='sig', arity=1)

In [8]:
custom_function_set = [
    # Default functions
    add2, sub2, mul2, div2, sqrt1, log1, abs1, neg1, inv1, max2, min2, sin1, cos1, tan1, sig1,
    
    # Custom functions
    rank_by_stock_gp, ts_rank_gp, ts_sum_gp, ts_min_gp, ts_max_gp,
    ts_delay_gp, sma_gp, stddev_gp, correlation_gp, covariance_gp
]

In [9]:
df = pd.read_parquet("./alpha_mine_data.parquet")
df["Intra_diff"] = df["Intra_diff"].shift(-1)
df_filtered = df[["Open", "High", "Low", "Close", "Volume", "Intra_diff"]].dropna()
df_filtered

Unnamed: 0,Open,High,Low,Close,Volume,Intra_diff
6,54.80,55.95,54.80,55.95,16950,-4.850746
36,53.60,53.60,51.00,52.00,15195,0.775194
43,51.60,52.20,51.60,52.00,1860,-2.681992
154,54.00,54.00,50.00,51.00,24045,2.654110
167,58.00,63.25,58.00,59.95,76415,1.311475
...,...,...,...,...,...,...
1036,575.00,610.40,564.30,607.95,1336879,1.185517
1037,610.00,646.50,607.80,625.05,1281453,-1.543821
1038,629.05,636.55,619.70,625.00,450561,-5.677321
1042,640.00,666.35,613.65,632.00,728091,-1.434379


In [10]:
df_filtered.describe()

Unnamed: 0,Open,High,Low,Close,Volume,Intra_diff
count,343820.0,343820.0,343820.0,343820.0,343820.0,343820.0
mean,502.637462,512.247824,492.828136,501.583748,2133318.0,-0.173648
std,752.963019,767.686468,737.988421,751.802312,8337993.0,2.404046
min,0.3,0.35,0.0,0.3,80.0,-31.416743
25%,97.3,99.45,95.2375,97.0,108158.2,-1.395349
50%,248.9,254.3,243.3,248.15,347600.5,-0.349846
75%,598.2,609.5125,586.5,596.6125,1260230.0,0.790193
max,16249.95,16524.95,15355.1,16406.35,887102600.0,43.113772


In [11]:
df_filtered.to_csv("alpha_mine_data.csv", index=False)

In [12]:
data = pd.read_csv("./alpha_mine_data.csv")
features = data.drop(columns=["Intra_diff"])
target = data["Intra_diff"]

In [13]:
# Split your data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(features, target, test_size=0.2, shuffle=False)

In [19]:
# Create and train the Symbolic Regressor (Genetic Programming model)
est_gp = SymbolicRegressor(population_size=100, 
                           generations=3, 
                           tournament_size=20,
                           stopping_criteria=0.01, 
                           function_set=custom_function_set, 
                           metric='mean absolute error', 
                           init_depth=(3, 5),
                           verbose=1,
                           n_jobs=8,
                           )

In [20]:
# Fit the model to the training data
est_gp.fit(X_train, y_train)

    |   Population Average    |             Best Individual              |
---- ------------------------- ------------------------------------------ ----------
 Gen   Length          Fitness   Length          Fitness      OOB Fitness  Time Left
   0     8.07        4.861e+12       19          1.64338              N/A      9.72m
   1     4.88           141270        5          1.61942              N/A      1.99m
   2     3.35          23293.5        5          1.61803              N/A      0.00s


In [21]:
print("Best Program (Final Formula):")
print(est_gp._program)

Best Program (Final Formula):
log(log(ts_rank(log(-0.521))))


In [37]:
for i, program in enumerate(est_gp._programs):
    print(f"Program {i}:")
    for node in program:
        if node!=None:
            print(node)
    print()

Program 0:
ts_delay(cos(ts_sum(sin(-0.499))))
sma(stddev(tan(inv(X0))))
correlation(ts_max(X4), ts_max(sin(sig(X4))))
inv(inv(log(sig(X1))))
ts_min(inv(div(X3, 0.261)))
ts_max(tan(sig(neg(X2))))
ts_min(sin(inv(X3)))
log(ts_rank(sig(X2)))
stddev(-0.521)
sin(neg(cos(correlation(X3, X0))))

Program 1:
log(sig(X1))
log(-0.521)
log(sig(X4))
log(ts_rank(inv(div(X3, 0.261))))
sig(neg(X2))
stddev(-0.521)
ts_max(neg(cos(correlation(X3, X0))))
stddev(-0.521)
log(sig(X1))
stddev(-0.521)
log(ts_rank(inv(log(sig(X1)))))
log(sig(X1))
inv(ts_max(X4))
inv(inv(log(stddev(-0.521))))
log(log(ts_rank(sig(X2))))
stddev(-0.521)
stddev(-0.521)
log(ts_rank(stddev(-0.521)))
log(ts_rank(stddev(-0.521)))
ts_min(inv(stddev(-0.521)))
stddev(-0.521)

Program 2:
log(ts_rank(inv(log(-0.521))))
stddev(-0.521)
div(X3, 0.261)
log(log(log(-0.521)))
stddev(-0.521)
abs(X2)
ts_rank(sig(X2))
stddev(-0.521)
log(ts_rank(log(-0.521)))
stddev(-0.521)
X1
log(ts_rank(neg(cos(correlation(X3, X0)))))
ts_max(ts_rank(stddev(-0.521)))
