# Notebook used to transform the raw financial data and to add columns such as MA and lagged values to the data.

In [None]:
import yfinance as yf
import pandas as pd
import random
import numpy as np
import pendulum
import importlib
from fitness_functions import *
from utils.units import Volume, Dollar

In [None]:
directory_path = r"C:\Users\khann\Documents\Data Science and Financial Technology\Final project\GP_trading_strategies"

In [None]:
#Import the data.
btc_ts = yf.Ticker("BTC-USD").history(start='2016-01-01', end='2023-06-30')


In [None]:
base_columns_list = ['Open', 'High', 'Low', 'Close', 'Volume']

In [None]:
def create_ma_columns(df,column, lag):
    df[f"ma_{column.lower()}_{lag}"]= df[column].rolling(window = lag).mean()
    return df

def create_lagged_columns(df, column, lag):
    df[f"lag_{column.lower()}_{lag}"]= df[column].shift(lag)
    return df

def create_percentage_of_value(df, column, percentage:float = random.random()):
    df[f"percentage_{column.lower()}_{(round(percentage*100))}"]= df[column]*percentage
    return df

def create_difference(df, column1:str = "low", column2:str = "high"):
    df[f"difference_{column1.lower()}_{column2.lower()}"]= df[column2]-df[column1]
    return df

In [None]:
# Add columns to df:
for column in base_columns_list:
    for lag in range(2,50):
        create_ma_columns(df = btc_ts,column=column, lag = lag)
    for lag in range(1,50):
        create_lagged_columns(df = btc_ts,column=column, lag = lag)
    for lag in range(10):
        create_percentage_of_value(df = btc_ts,column=column)

In [None]:
def pre_process(df):
    """Remove nulls and columns from the DF."""
    null_rows = df.isna().sum(axis=1)[df.isna().sum(axis=1)!=0]
    null_row_indices = list(null_rows.index)
    df.drop(null_row_indices,inplace=True)
    # sum(df.isna().sum(axis=1))
    df.drop(['Dividends','Stock Splits'], axis=1,inplace = True)
    return df

In [None]:
btc_ts = pre_process(btc_ts)

In [None]:
btc_ts.head()

In [None]:
volume_columns = [col  for col in btc_ts.columns if "volume" in col.lower()]
dollar_columns = [col  for col in btc_ts.columns if "volume" not in col.lower()]

In [None]:
btc_train = btc_ts[:"2021-06-30"]
btc_test = btc_ts["2021-06-30":]

# Genetic Program

In [None]:
from deap import gp, creator, base, tools
from deap.gp import Terminal
import operator
from operator import or_, and_, gt
from fitness_functions import *
import fitness_functions
from utils.plot_decision_trees import plot_tree



In [None]:
arg_names = list(btc_train.columns)
vol_args = [arg for arg in arg_names if "volume" in arg.lower()]
dol_args = [arg for arg in arg_names if "volume" not in arg.lower()]

In [None]:
# --- CREATE PRIMITIVE SETS AND TOOLS -----

n_args=len(arg_names)
pset = gp.PrimitiveSetTyped("main",[Volume]*len(vol_args) + [Dollar]*len(dol_args),bool)
#Rename the arguments:
arg_vol_mapping = {f"ARG{ind}": val for ind,val in enumerate(vol_args)}
pset.renameArguments(**arg_vol_mapping)
arg_dol_mapping = {f"ARG{len(vol_args)+ind}": val for ind,val in enumerate(dol_args)}
pset.renameArguments(**arg_dol_mapping)
#Check that all arguments were renamed:
unnamed_args=[i for i in pset.arguments if "ARG" in i]
if  unnamed_args:
    print(f"Some arguments were not renamed: {unnamed_args}")
pset.addPrimitive(gt, [Dollar,Dollar],bool)
pset.addPrimitive(lambda x:x ,[Dollar],Dollar, name="dollar placeholder")

pset.addPrimitive(gt, [Volume,Volume],bool)
pset.addPrimitive(lambda x:x ,[Volume],Volume, name="volume placeholder")


#Boolean operators:
pset.addPrimitive(and_, [bool,bool],bool)
pset.addPrimitive(or_,[bool,bool],bool)

for v_arg in vol_args:
        pset.addTerminal(v_arg,Volume)
for d_arg in dol_args:
        pset.addTerminal(d_arg,Dollar)

# --- Remove all the ARG terminals ---
pset.terminals[Volume] = [i for i in pset.terminals[Volume] if "ARG" not in i.name]
pset.terminals[Dollar] = [i for i in pset.terminals[Dollar] if "ARG" not in i.name]

### Run the GP:

In [None]:
def generate(pset):
    run=True
    while run:
        try:
            expr = toolbox.individual()            
            #Remove all the Lambda functions:
            expr=  list(filter(lambda x: x.name!="dollar placeholder", expr))
            expr=  list(filter(lambda x: x.name!="volume placeholder", expr))
            if len(expr)>3:
                run=False
        except IndexError:
            continue
    # return gp.PrimitiveTree(expr)
    return creator.Individual(expr)

In [None]:
# --- GP OPERATORS ----

creator.create("fitness", base.Fitness, weights=(1,))
creator.create("Individual", gp.PrimitiveTree, fitness= creator.fitness)

toolbox = base.Toolbox()
toolbox.register("expr", gp.genHalfAndHalf, pset=pset, min_=1, max_=5)
toolbox.register("individual", tools.initIterate, creator.Individual, toolbox.expr)
toolbox.register("custom_individual",generate, pset)
toolbox.register("population", tools.initRepeat, list, toolbox.custom_individual)

expr1 = toolbox.custom_individual()
expr1

plot_tree(expr1)

In [None]:
pop = toolbox.population(n=20)
pop

In [None]:
def cxSubTree(ind1,ind2):
    def get_sub_trees(ind):
        """"Create a dictionary containing the terminal sub-trees and the their starting index in the decision tree."""
        ind_subs = []
        for i in range(0,len(ind)):
            if [elem.arity for elem in ind][i:i+3]==[2,0,0]:
                ind_subs.append({
                    "start_index":i,
                    "primitive":[elem for elem in ind][i:i+3]
                    })
        return ind_subs

    r1 = random.randrange(0,len(get_sub_trees(ind1)))
    r2 = random.randrange(0,len(get_sub_trees(ind2)))

    i1= get_sub_trees(ind1)[r1]["start_index"]
    i2= get_sub_trees(ind2)[r2]["start_index"]

    ind1c = ind1.copy()
    ind1[i1:i1+3] = get_sub_trees(ind2)[r2]["primitive"]
    ind2[i2:i2+3] = get_sub_trees(ind1c)[r1]["primitive"]

    return ind1, ind2

## Building Out trading strategy:

Assume purchases are done at the "Open" price of following day:

In [None]:
signal_df = pd.DataFrame(index = btc_train.index)
signal_df["Open"] = btc_train['Open']

## Calculate the MDD of a trade:

In [None]:
expr=pop[2]
expr1=pop[3]

ind=len(expr)-6
print(str(gp.PrimitiveTree(expr)))
print(str(gp.PrimitiveTree(expr[ind:])))
print([i.name for i in expr])
print([type(i) for i in expr])
print([i.arity for i in expr[ind:]])
plot_tree(expr,name="tree")
s=len(expr)

print("\n",'--'*30)

ind1=4
print(str(gp.PrimitiveTree(expr1)))
print(str(gp.PrimitiveTree(expr1[ind1:])))
print([i.name for i in expr1])
print([type(i) for i in expr1])
print([i.arity for i in expr[ind1:]])
plot_tree(expr1,name="tree1")

s1=len(expr1)

In [None]:
cross = len(expr)-3
print([type(i) for i in expr[cross:]][:2])

In [None]:
expr=pop[1]
expr1=pop[3]
sl = 1
cross = s-sl
cross1= s1-sl
print([i.name for i in expr[-sl:]])

print([i.name for i in expr1[-sl:]])


print([type(i) for i in expr[-sl:]] == [type(i) for i in expr1[-sl:]])
expr2 = expr.copy()
expr2[-sl:]=expr1[cross1:]
print(str(gp.PrimitiveTree(expr2)))

type(expr[2]) == type(expr1[3])
plot_tree(expr2, name="offspring")

In [None]:
import deap
ind =5
print(expr[ind:] )
print([i for i in expr[ind:] if type(i)==deap.gp.Primitive])
print(str(gp.PrimitiveTree(expr[ind:])))
print('-'*40)
print(expr1[ind:] )
print([i for i in expr1[ind:] if type(i)==deap.gp.Primitive])
print(str(gp.PrimitiveTree(expr1[ind:])))

if not [i for i in expr[ind:] if type(i)==deap.gp.Primitive] and not [i for i in expr1[ind:] if type(i)==deap.gp.Primitive]:
    print(True)
else:
    print(False)


## Maximum theoretical profit:

In [None]:

mtv = maximum_theoretical_value(df = btc_train)
mtv

In [None]:
from deap import base, creator, tools, gp, algorithms

In [None]:
toolbox.register("evaluate", fitness_function, df=btc_train, tc=0.01, pset=pset)

In [None]:
#Mutate entire branch:
def mutBranch(individual, pset, max_per_mutate=50):
    """Replaces a randomly chosen primitive from *individual* by a randomly
    chosen primitive with the same number of arguments from the :attr:`pset`
    attribute of the individual.

    :param individual: The normal or typed tree to be mutated.
    :returns: A tuple of one tree.
    """
    if len(individual) < 2:
        return individual,

    if (max_per_mutate*len(individual)/100) > 1 :
        "If the percentage of nodes that may be mutated is more than one randomly get the index."
        index=0
        while (len(individual) - index) > (max_per_mutate*len(individual)/100):
            "Ensure that maximum mutation portion is not exceeded."
            index = random.randrange(1, len(individual))
    else:
        index = len(individual)
    
    for i in range(index,len(individual)):
        node = individual[i]
        if node.arity == 0:  # Terminal
            term = random.choice(pset.terminals[node.ret])
            individual[i] = term
        else:  # Primitive
            prims = [p for p in pset.primitives[node.ret] if p.args == node.args]
            individual[i] = random.choice(prims)

    return individual,

def mutation_half(individual,mut_per, pset):
    if random.random()<0.5:
        return gp.mutNodeReplacement(individual, pset = pset)
    else:
        return mutBranch(individual, max_per_mutate = mut_per, pset=pset)

In [None]:
toolbox.register("mate",       cxSubTree)
toolbox.register("select",     tools.selRanked) 
toolbox.register("mutate",     mutation_half, pset=pset)

In [None]:
hof   = tools.HallOfFame(maxsize=50)
stats = tools.Statistics(lambda ind: ind.fitness.values)
stats.register("avg", np.mean, axis=0) 
stats.register("std", np.std, axis=0)
stats.register("min", np.min, axis=0)
stats.register("max", np.max, axis=0)

# pop, logbook = algorithms.eaSimple(pop, toolbox,cxpb=0.7, mutpb=0.7, ngen=ngen, stats = stats, halloffame =hof)

In [None]:
from operator import attrgetter
def GPAlgo(population, 
           toolbox, cxpb, 
           mutpb, ngen, 
           elite_pop_size, 
           stats=None,
            halloffame=None, 
            verbose=__debug__
            ):
    logbook = tools.Logbook()
    logbook.header = ['gen', 'nevals'] + (stats.fields if stats else [])

    # Evaluate the individuals with an invalid fitness
    invalid_ind = [ind for ind in population if not ind.fitness.valid]
    fitnesses = toolbox.map(toolbox.evaluate, invalid_ind)
    for ind, fit in zip(invalid_ind, fitnesses):
        ind.fitness.values = fit

    if halloffame is not None:
        halloffame.update(population)

    record = stats.compile(population) if stats else {}
    logbook.record(gen=0, nevals=len(invalid_ind), **record)
    if verbose:
        print(logbook.stream)

    store_generations = {}
    # Begin the generational process
    for gen in range(1, ngen + 1):
        # Select the next generation individuals
        offspring = toolbox.select(population, len(population)-elite_pop_size)

        offspring = cross_mut(population=offspring, toolbox=toolbox, cxp = cxpb, mutpb=mutpb)

        # Evaluate the individuals with an invalid fitness
        invalid_ind = [ind for ind in offspring if not ind.fitness.valid]
        fitnesses = toolbox.map(toolbox.evaluate, invalid_ind)
        for ind, fit in zip(invalid_ind, fitnesses):
            ind.fitness.values = fit

        # Update the hall of fame with the generated individuals
        if halloffame is not None:
            halloffame.update(offspring)

        # Replace the current population by the offspring
        store_generations[f"gen{gen}"]={
            "pop":population,
            "offspring":offspring
        }
        population[:] = offspring

        # Append the current generation statistics to the logbook
        record = stats.compile(population) if stats else {}
        logbook.record(gen=gen, nevals=len(invalid_ind), **record)
        if verbose:
            print(logbook.stream)

    return population, logbook, store_generations

In [None]:
from operator import attrgetter
len(sorted(pop, key=attrgetter("fitness"), reverse=True)[:10])

In [None]:
population, logbook, store_generations = GPAlgo(
    pop, 
    toolbox,
    cxpb=0.7, 
    mutpb=0.6, 
    ngen=3, 
    elite_pop_size= 10,
    stats = stats, 
    halloffame =hof
    )

In [None]:
function = gp.compile(expr=gp.PrimitiveTree(pop[0]),pset=pset)
function(df = btc_ts)

In [None]:
sum_same = []

for i in store_generations:
    same =[]
    for  k in range(len(store_generations[i]["pop"])):
        # print(store_generations[i]["pop"][k])
        # print(store_generations[i]["offspring"][k])
        is_same = (store_generations[i]["offspring"][k]==store_generations[i]["pop"] )
        same.append(is_same)
    pop_in_off = [ind in store_generations[i]["offspring"] for ind in store_generations[i]["pop"]]
    print('',pop_in_off)
    print(sum(pop_in_off))
    # print(same) 
    print(sum(same))
    print("----"*10)
    sum_same.append(sum(same))
print(sum_same)

In [None]:
import matplotlib.pyplot as plt

y = [i['max'] for i in logbook]
plt.plot(range(len(y)), y,'r')
plt.title("Maximum fitness of each generation.")
plt.xlabel("Generation number")
plt.ylabel("Maximum fitness")
date = pendulum.now().format('YYYY-MM-DD_hh-mm')
plt.savefig(rf'{directory_path}\graphs\max_fitness\max_fitness_{date}.png')

In [None]:
best_solution = hof.items[0]
tc = 0.01
print("TRAIN")
print("Buy and hold:",(1000/btc_train.iloc[0]['Open'])*btc_train.iloc[-1]['Open']*(1-tc)**2)
print("mtv         :",maximum_theoretical_value(btc_train))
print("strat value :",trading_strat(individual = best_solution, df=btc_train,pset=pset)[0],'\n')

print("TEST")
print("Buy and hold:",(1000/btc_test.iloc[0]['Open'])*btc_test.iloc[-1]['Open']*(1-tc)**2)
print("mtv         :",maximum_theoretical_value(btc_test))
print("strat value :",trading_strat(individual = best_solution, df=btc_test,pset=pset)[0])

In [None]:
plot_tree(best_solution)

In [None]:
#Check best solution:
print(str(best_solution))
plot_tree(expr = best_solution,
          name=f"tree_best_solution_{pendulum.now().format("MM-DD_HH-mm-ss")}")
function = gp.compile(expr=gp.PrimitiveTree(best_solution),pset=pset)
signal_df  = pd.DataFrame(index=btc_ts.index)
signal_df['Signal'] = function(df = btc_ts)
signal_df["Open"] = btc_ts['Open']
plt.figure(figsize=(15,7))
# btc_ts['Open'].plot()
x_btc = list(btc_ts.index)
y_btc = list(btc_ts['Open'])

x_sig = list(signal_df[signal_df['Signal']].index)
y_sig= list(signal_df[signal_df['Signal']]['Open'])
plt.plot(x_btc,y_btc,'k')
plt.plot(x_sig,y_sig,'y')

plt.figure(figsize=(15,7))
# plt.plot(x_btc[-500:],y_btc[-500:],'k')
# plt.plot(x_sig[-500:],y_sig[-500:],'y')

In [None]:
# Get random Profit
def random_trading(df, val = 1000):
    ts_val = [val]
    ts_df = pd.DataFrame(columns = ["value"])
    long=False

    for cnt,row in enumerate(df['Open'].to_frame().iterrows()):
        gd = random.random()
        if  gd<1/3:
            "Try to buy for 33% of the time"
            if long:
                pass
            else:
                shares = ((1-tc)*val)/row[1]['Open']
                long=True
        elif gd <2/3:
            if long:
                val = (1-tc)*shares*row[1]['Open']
                long=False
            else:
                pass       
        else:
            pass
            
            long=False
        if cnt == len(df) and long:
            val = shares*row[1]['Open']
        ts_val.append(val)
        ts_df.loc[row[0]] = val
    return val, ts_val, ts_df


In [None]:
val_r, ts_val_r, ts_df_r = random_trading(btc_ts)
print("Random trading strategy value: ",val_r)
ts_df_r.plot()

In [None]:
btc_train_open = btc_train['Open']
x = btc_train_open.reset_index()['Open']

# Get peaks and troughs
peaks, _ = find_peaks(x)
troughs, _ = find_peaks(-x)

peaks_troughs = list(peaks)+list(troughs)
#Add the enedpoints to the local maximum and minimums.
if 0 not in peaks_troughs:
    peaks_troughs.append(0)
if len(btc_train_open)-1 not in peaks_troughs:
    peaks_troughs.append(len(btc_train_open)-1)

peaks_troughs = sorted(peaks_troughs)
peaks_troughs

#Work out maximum theoretical value using an initial investement of $1000 by default.
profit=0
val = 1000
no_tc_val = 1000
for ind,price in enumerate(btc_train_open.iloc[peaks_troughs]):
    if ind>1 and price > btc_train_open.iloc[peaks_troughs].iloc[ind-1]:
        # Calculate 
        pot_val= (val*price/btc_train_open.iloc[peaks_troughs].iloc[ind-1])*(1-tc)**2
        if pot_val>val:
            val=pot_val        
# no_tc
if ind>1 and price > btc_train_open.iloc[peaks_troughs].iloc[ind-1]:
    no_tc_val = (no_tc_val*price/btc_train_open.iloc[peaks_troughs].iloc[ind-1])

val

In [None]:
eth_ts = yf.Ticker("ETH-USD").history(start='2016-01-01', end='2023-06-30')

base_columns_list = ['Open', 'High', 'Low', 'Close', 'Volume']

for column in base_columns_list:
    for lag in range(2,50):
        create_ma_columns(df = eth_ts,column=column, lag = lag)
    for lag in range(1,50):
        create_lagged_columns(df = eth_ts,column=column, lag = lag)
    for lag in range(10):
        create_percentage_of_value(df = eth_ts,column=column)

print("TEST ETH")
print("Buy and hold:",(1000/eth_ts.iloc[0]['Open'])*eth_ts.iloc[-1]['Open']*(1-tc)**2)
print("mtv         :",maximum_theoretical_value(eth_ts))
print("strat value :",trading_strat(individual = best_solution, df=eth_ts, pset=pset)[0])

In [None]:
importlib.reload(fitness_functions)

In [None]:
pd.DataFrame({"Population":[[i.name for i in ind] for ind in hof.__dict__['items']]}).to_csv(rf"{directory_path}\hall_of_fame\hof1.csv")

In [None]:
pset.primitives

In [None]:
deap.tools.support.HallOfFame

In [None]:
logbook

In [None]:
import pickle

with open(".\hall_of_fame\hof.pkl", 'wb') as file:
    # Serialize the object and write it to the file
    pickle.dump(hof, file)


In [None]:
hof.items

In [None]:
import os
date = pendulum.now().format('YYYY-MM-DD_hh-mm')
folder_name = rf'.\results\run_{date}'
os.makedirs(folder_name)

In [None]:
t1 = pendulum.now()

In [None]:
t2=pendulum.now()

In [None]:
(t2-t1).seconds

In [None]:
str(hof.items[1])

In [None]:
pendulum.now().format("YYYYMMDD_hh-mm")


In [None]:
te=None
f"jdjhfjajh{te}152454"

In [None]:
from operator import attrgetter

In [None]:
s_inds = sorted(pop, key=attrgetter('fitness'), reverse=True)
s_inds_ranked=[]
for val, indi in enumerate(s_inds):
    s_inds_ranked.append((len(s_inds)-val,indi))

sum_s_inds = sum([i[0] for i in s_inds_ranked])
chosen = []
for i in range(len(pop)):
    u = random.random() * sum_s_inds
    sum_ = 0
    for ind in s_inds_ranked:
        sum_ += ind[0]
        if sum_ > u:
            chosen.append(ind)
            break
print(np.mean([i[0] for i in chosen]))
print([i[0] for i in chosen])

In [None]:
np.mean([1,2,34])

In [None]:
[i[0] for i in s_inds_ranked]

In [None]:
[1,2,4,5]+[89,100]

In [None]:
from operator import attrgetter

In [None]:
# toolbox = base.Toolbox()
# toolbox.register("expr", gp.genHalfAndHalf, pset=pset, min_=1, max_=5)
# toolbox.register("individual", tools.initIterate, creator.Individual, toolbox.expr)
# toolbox.register("custom_individual",generate, pset)
# toolbox.register("population", tools.initRepeat, list, toolbox.custom_individual)

population = toolbox.population(n=50)
cxpb=0.7, 
mutpb=0.6, 
ngen=2, 
elite_pop_size= 10,

offspring = toolbox.select(population, len(population)-elite_pop_size)
elite_pop = sorted(pop, key=attrgetter("fitness"), reverse=True)[:elite_pop_size]
# offspring = [toolbox.clone(ind) for ind in population]
offspring = elite_pop + offspring

# Apply crossover and mutation on the offspring
for i in range(elite_pop_size+1, len(offspring)):
    if random.random() < cxpb:
        bi = random.randint(elite_pop_size+1, len(offspring)-1)
        t1,t2 = toolbox.mate(offspring[bi],offspring[i])
        print(t1 ==offspring[bi], t2 ==offspring[i])
        del offspring[bi].fitness.values, offspring[i].fitness.values

for i in range(elite_pop_size+1, len(offspring)):
    if random.random() < mutpb:
        mut_per = ((ngen+1-gen)/(ngen+1-gen))*100
        # print("Before Mut:",str(gp.PrimitiveTree(offspring[i])))
        offspring[i], = toolbox.mutate(offspring[i], mut_per=mut_per)
        # print("After Mut:",str(gp.PrimitiveTree(offspring[i])))
        del offspring[i].fitness.values

elits_check = []
for ind,val in enumerate(population):
    elits_check.append(val == offspring[ind])
# print(elits_check)
print(sum(elits_check))

In [None]:
offspring = toolbox.select(population, len(population)-elite_pop_size)
toolbox.mate(offspring[10],offspring[15])