In [1]:
import yfinance as yf
import pandas as pd
import numpy as np
from deap import base, creator, tools, gp
from scipy.stats import pearsonr

In [2]:
def download_data(tickers, start, end):
    data = {}
    for ticker in tickers:
        df = yf.download(ticker, start=start, end=end)
        if not df.empty:
            data[ticker] = df[['Open', 'High', 'Low', 'Close', 'Volume']]
    return data

def preprocess_data(data):
    processed_data = {}
    for ticker, df in data.items():
        df['CloseMomentum'] = ts_mean(df['Close'], 10) - df['Close']
        df['VolumeMomentum'] = ts_mean(df['Volume'], 10) - df['Volume']
        df['CloseStd'] = ts_std(df['Close'], 10)
        df['PriceVolumeCorr'] = ts_corr(df['Close'], df['Volume'], 10)
        df['Channel'] = ts_max(df['High'], 10) - ts_min(df['Low'], 10)
        df['Target'] = df['Close'].shift(-1) / df['Close'] - 1
        processed_data[ticker] = df.dropna()
    return processed_data

def ts_mean(series, window):
    return series.rolling(window).mean().fillna(0)

def ts_rank(series, window):
    return series.rolling(window).apply(lambda x: pd.Series(x).rank().iloc[-1]).fillna(0)

def ts_std(series, window):
    return series.rolling(window).std().fillna(0)

def ts_corr(series1, series2, window):
    return series1.rolling(window).corr(series2).fillna(0)

def ts_max(series, window):
    return series.rolling(window).max().fillna(0)

def ts_min(series, window):
    return series.rolling(window).min().fillna(0)

def safe_divide(left, right):
    return np.divide(left, right, out=np.zeros_like(left), where=right != 0)

def setup_gp():
    pset = gp.PrimitiveSet("MAIN", 5)
    pset.renameArguments(ARG0="CloseMomentum", ARG1="VolumeMomentum", ARG2="CloseStd", ARG3="PriceVolumeCorr", ARG4="Channel")
    
    pset.addPrimitive(np.add, 2, name="add")
    pset.addPrimitive(np.subtract, 2, name="subtract")
    pset.addPrimitive(np.multiply, 2, name="multiply")
    pset.addPrimitive(safe_divide, 2, name="div")
    pset.addPrimitive(np.maximum, 2, name="max")
    pset.addPrimitive(np.minimum, 2, name="min")
    pset.addPrimitive(np.negative, 1, name="neg")
    
    pset.addPrimitive(lambda x: ts_mean(x, 5), 1, name="ts_mean_5")
    pset.addPrimitive(lambda x: ts_std(x, 5), 1, name="ts_std_5")
    pset.addPrimitive(lambda x: ts_rank(x, 5), 1, name="ts_rank_5")
    pset.addPrimitive(lambda x, y: ts_corr(x, y, 5), 2, name="ts_corr_5")
    pset.addPrimitive(lambda x: ts_max(x, 5), 1, name="ts_max_5")
    pset.addPrimitive(lambda x: ts_min(x, 5), 1, name="ts_min_5")

    pset.addEphemeralConstant("rand", lambda: np.random.uniform(-1, 1))
    
    creator.create("FitnessMax", base.Fitness, weights=(1.0,))
    creator.create("Individual", gp.PrimitiveTree, fitness=creator.FitnessMax)
    toolbox = base.Toolbox()
    toolbox.register("expr", gp.genHalfAndHalf, pset=pset, min_=1, max_=3)
    toolbox.register("individual", tools.initIterate, creator.Individual, toolbox.expr)
    toolbox.register("population", tools.initRepeat, list, toolbox.individual)
    toolbox.register("compile", gp.compile, pset=pset)
    
    toolbox.register("mate", gp.cxOnePoint)
    toolbox.register("mutate", gp.mutUniform, expr=toolbox.expr, pset=pset)
    toolbox.register("select", tools.selTournament, tournsize=3)
    return toolbox, pset

def run_evolution(population, toolbox, cxpb, mutpb, ngen, stats=None, halloffame=None, verbose=True):
    logbook = tools.Logbook()
    logbook.header = ['gen', 'nevals'] + (stats.fields if stats else [])

    invalid_ind = [ind for ind in population if not ind.fitness.valid]
    fitnesses = map(toolbox.evaluate, invalid_ind)
    for ind, fit in zip(invalid_ind, fitnesses):
        ind.fitness.values = fit

    if halloffame is not None:
        halloffame.update(population)

    record = stats.compile(population) if stats else {}
    logbook.record(gen=0, nevals=len(invalid_ind), **record)
    if verbose:
        print(logbook.stream)

    for gen in range(1, ngen + 1):
        offspring = toolbox.select(population, len(population))
        offspring = list(map(toolbox.clone, offspring))

        for child1, child2 in zip(offspring[::2], offspring[1::2]):
            if np.random.random() < cxpb:
                toolbox.mate(child1, child2)
                del child1.fitness.values
                del child2.fitness.values

        for mutant in offspring:
            if np.random.random() < mutpb:
                toolbox.mutate(mutant)
                del mutant.fitness.values

        invalid_ind = [ind for ind in offspring if not ind.fitness.valid]
        fitnesses = map(toolbox.evaluate, invalid_ind)
        for ind, fit in zip(invalid_ind, fitnesses):
            ind.fitness.values = fit

        population[:] = offspring
        if halloffame is not None:
            halloffame.update(population)

        record = stats.compile(population) if stats else {}
        logbook.record(gen=gen, nevals=len(invalid_ind), **record)
        if verbose:
            print(logbook.stream)

    return population, logbook

def run():
    tickers = ["AAPL", "MSFT", "GOOGL", "AMZN", "TSLA"]
    start = "2023-10-01"
    end = "2023-12-01"
    raw_data = download_data(tickers, start, end)
    data = preprocess_data(raw_data)
    
    toolbox, pset = setup_gp()
    population = toolbox.population(n=50)
    generations = 10
    hof = tools.HallOfFame(1)

    def fitness(individual):
        compiled_expr = toolbox.compile(expr=individual)
        all_corr = []
        for ticker, df in data.items():
            try:
                feature_values = compiled_expr(df['CloseMomentum'], df['VolumeMomentum'], df['CloseStd'], df['PriceVolumeCorr'], df['Channel'])
                corr, _ = pearsonr(feature_values, df['Target'])
                all_corr.append(corr)
            except:
                all_corr.append(0)
        return np.mean(all_corr),
    
    toolbox.register("evaluate", fitness)
    
    run_evolution(population, toolbox, cxpb=0.5, mutpb=0.2, ngen=generations, stats=None, halloffame=hof, verbose=True)

    print("Best Expression:", hof[0])
    print("Best Fitness:", fitness(hof[0]))
    return hof

hof = run()

[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
  xm = x.astype(dtype) - xmean
  ret = umr_sum(arr, axis, dtype, out, keepdims, where=where)


gen	nevals
0  	50    
1  	23    
2  	42    
3  	33    
4  	26    
5  	32    
6  	30    
7  	31    
8  	28    
9  	28    
10 	32    
Best Expression: ts_mean_5(ts_mean_5(ts_min_5(CloseMomentum)))
Best Fitness: (0.30842405188284305,)
