In [7]:
import pandas as pd
import numpy as np
import random
import copy

from scipy.stats import spearmanr, pearsonr

from collections import deque

import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec

from typing import Optional

In [8]:
ENTIRE_DF = pd.read_csv('dataset.csv')

FEATURES = ["return", "turnoverrate"]  #"open", "close", "high", "low", "volume", "amount"

OPERATIONS = ["add", "sub", "mul", "div", "rolling_mean", "rolling_std", "cum_ret"]

def add(a: pd.Series, b: pd.Series):
    return a + b

def sub(a: pd.Series, b: pd.Series):
    return a - b

def mul(a: pd.Series, b: pd.Series):
    return a * b

def div(a: pd.Series, b: pd.Series):
    return np.divide(a, b, out=np.full_like(a, np.nan, dtype=float), where=b!=0)

def by_stock(x: pd.Series, df: pd.DataFrame, func):
    result = np.empty_like(x)
    for stock, idx in df.groupby("stockid").groups.items():
        series = x.loc[idx].ffill().fillna(0)
        result[idx] = func(series)
    return result

def rolling_mean(x: pd.Series, df: pd.DataFrame, w=20):
    return by_stock(x, df, lambda g: g.rolling(w).mean())

def rolling_std(x: pd.Series, df, w=20):
    return by_stock(x, df, lambda g: g.rolling(w).std(ddof=1))

def cum_ret(x: pd.Series, df, w=20):
    return by_stock(x, df, lambda g: (1 + g.fillna(0)).rolling(w).apply(np.prod, raw=True)) - 1

In [9]:
def winsorize_series(s, lower_q=0.01, upper_q=0.99):
    if s.isna().all():
        return s

    low = s.quantile(lower_q)
    high = s.quantile(upper_q)
    return s.clip(lower=low, upper=high)

class AlphaNode:
    NAMESPACE = {
        "add": add,
        "sub": sub,
        "mul": mul,
        "div": div,
        "rolling_mean": rolling_mean,
        "rolling_std": rolling_std,
        "cum_ret": cum_ret,
    }

    def __init__(self, val: str, left_child: Optional['AlphaNode'], right_child: Optional['AlphaNode'], param=None):
        self.val = val
        self.left = left_child
        self.right = right_child
        self.param = param

    def get_formula(self):
        if self.val in FEATURES:  # feature (leaf)
            return f"df['{self.val}']"

        elif self.val in ["rolling_mean", "rolling_std", "cum_ret"]:
            left = self.left.get_formula()
            w = self.param if self.param else 20
            return f"{self.val}({left}, df, {w})"
    
        elif self.val in self.NAMESPACE:  # binary ops
            left, right = self.left.get_formula(), self.right.get_formula()
            return f"{self.val}({left}, {right})"

        else:
            raise ValueError(f"Unknown operation {self.val}")
    
    def evaluate(self, df: pd.DataFrame):
        formula = self.get_formula()
        namespace = dict(self.NAMESPACE)
        namespace['df'] = df
        result = eval(formula, namespace)

        result = winsorize_series(pd.Series(result))

        return result.values
    
    def get_height(self):
        if not self.left and not self.right:
            return 1
        
        return 1 + max(self.left.get_height() if self.left else 0, self.right.get_height() if self.right else 0)

In [10]:
def calculate_IC(df: pd.DataFrame, indicators: list[str]):
    grouped = df.groupby("date", sort=False)

    ic_pearson = {}
    ic_spearman = {}
    for i in indicators:
        pearsons, spearmans = [], []
        
        for _, g in grouped:
            data = g[[i, "return"]].dropna()
            x = winsorize_series(data[i])
            y = data["return"].shift(-1)
            x, y = x[~y.isna()], y[~y.isna()]

            if len(data) < 5 or x.std(ddof=0) == 0 or y.std(ddof=0) == 0:
                pearsons.append(np.nan)
                spearmans.append(np.nan)
                continue

            pearson_corr = np.corrcoef(x, y)[0, 1]
            pearsons.append(pearson_corr)

            spearman_corr, _ = spearmanr(x, y)
            spearmans.append(spearman_corr)

        ic_pearson[i], ic_spearman[i] = pearsons, spearmans

    dates = df["date"].drop_duplicates().reset_index(drop=True)

    ic_pearson_df = pd.DataFrame(ic_pearson, index=dates).astype("float32").round(4)
    ic_spearman_df = pd.DataFrame(ic_spearman, index=dates).astype("float32").round(4)

    cum_ic_pearson_df = ic_pearson_df.cumsum().round(4)
    cum_ic_spearman_df = ic_spearman_df.cumsum().round(4)

    ic_pearson_df.to_csv("ic_pearson.csv")
    ic_spearman_df.to_csv("ic_spearman.csv")
    cum_ic_pearson_df.to_csv("cum_ic_pearson.csv")
    cum_ic_spearman_df.to_csv("cum_ic_spearman.csv")

    return ic_pearson_df, ic_spearman_df, cum_ic_pearson_df, cum_ic_spearman_df

In [11]:
def random_alpha_tree(max_depth=3, leaf_threshold = 0.2):
    if max_depth == 0 or (max_depth > 1 and random.random() < leaf_threshold):
        return AlphaNode(random.choice(FEATURES), None, None)
    
    func = random.choice(OPERATIONS)
    
    if func in ["rolling_mean", "rolling_std", "cum_ret", "rank"]:
        left = random_alpha_tree(max_depth-1)
        return AlphaNode(func, left, None, param=random.choice([10,20,60, 80]))
    else:
        left, right = random_alpha_tree(max_depth-1), random_alpha_tree(max_depth-1)
        return AlphaNode(func, left, right)

def mutate(node: AlphaNode, mutation_rate=0.2, max_depth=3):
    if max_depth <= 0:
        return node

    if random.random() < mutation_rate:
        return random_alpha_tree(max_depth)

    if node.left:
        node.left = mutate(node.left, mutation_rate, max_depth-1)
    if node.right:
        node.right = mutate(node.right, mutation_rate, max_depth-1)
    
    return node

def crossover(p1: AlphaNode, p2: AlphaNode):
    depth1, depth2 = random.randint(0, p1.get_height() - 1), random.randint(0, p2.get_height() - 1)

    if depth1 == 0:
        return p2

    def get_level_nodes(root: AlphaNode, l: int):
        if not root:
            return []
        
        nodes = []
        queue = deque([(root, None, 0)])

        while queue:
            curr, parent, depth = queue.popleft()

            if depth == l:
                nodes.append((curr, parent))
            elif depth < l:
                if curr.left:
                    queue.append((curr.left, curr, depth + 1))
                if curr.right:
                    queue.append((curr.right, curr, depth + 1))

        return nodes

    p1_subtree, p1_parent = random.choice(get_level_nodes(p1, depth1))
    p2_subtree, _ = random.choice(get_level_nodes(p2, depth2))

    if p1_parent.left is p1_subtree:
        p1_parent.left = copy.deepcopy(p2_subtree)
    else:
        p1_parent.right = copy.deepcopy(p2_subtree)

    return p1
    

def fitness(alpha: AlphaNode, df: pd.DataFrame):
    try:
        df["_factor"] = alpha.evaluate(df)
        
        ic_pearson_df, ic_spearman_df, _, _ = calculate_IC(df, indicators=["_factor"])
        
        mean_icp, std_icp = ic_pearson_df["_factor"].mean(skipna=True), ic_pearson_df["_factor"].std(skipna=True, ddof=0)
        mean_ics, std_ics = ic_spearman_df["_factor"].mean(skipna=True), ic_spearman_df["_factor"].std(skipna=True, ddof=0)
        
        if std_icp == 0 or std_ics == 0:
            return -np.inf
        
        score = 0.3 * abs(mean_icp) + 0.6 * abs(mean_ics) + 0.05 * abs(mean_icp / std_icp) + 0.05* abs(mean_ics / std_ics)
        return score, mean_ics if not np.isnan(score) else -np.inf, 0
    
    except Exception:
        return -np.inf

def evolve(population: list[AlphaNode], df: pd.DataFrame, generations=10, retain=0.3, mutation_rate=0.3):
    for g in range(generations):
        scored = [(fitness(alpha, df), alpha) for alpha in population]
        scored.sort(reverse=True, key=lambda x: x[0][0])
        print(scored)
        print(f"Gen {g}: Best fitness = {scored[0][0][0]:.4f}, Spearman IC = {scored[0][0][1]:.4f}")
        
        retain_length = int(len(scored) * retain)
        survivors = [alpha for _, alpha in scored[:retain_length]]
        
        # Generate offspring via crossover + mutation
        offspring = []
        while len(offspring) < len(population) - retain_length:
            if random.random() < 0.5 and len(survivors) >= 2:
                p1, p2 = random.sample(survivors, 2)
                child = crossover(copy.deepcopy(p1), copy.deepcopy(p2))  
            else:
                p = random.choice(survivors)
                child = mutate(copy.deepcopy(p), mutation_rate)
                
            offspring.append(child)
        
        population = survivors + offspring
    
    return population

In [None]:
population = [random_alpha_tree() for _ in range(15)]

evolved_population = evolve(population, ENTIRE_DF, generations=10, retain=0.3, mutation_rate=0.3)
final_scores = [(fitness(alpha, ENTIRE_DF), alpha) for alpha in evolved_population]
best_tree = max(final_scores, key=lambda x: x[0])
print(best_tree[0])

  spearman_corr, _ = spearmanr(x, y)
  spearman_corr, _ = spearmanr(x, y)
