In [1]:
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm
import numpy as np
import pandas as pd
from bp import data_gather_from_files,run_strategy_optimised,run_strategy_eval
import time
from datetime import datetime
from dateutil.relativedelta import relativedelta

In [2]:
import multiprocessing
from multiprocessing import Pool
num_cores = multiprocessing.cpu_count()


In [3]:
num_cores

16

In [4]:
def error_check(params,n):
    if len(params) != n:
        raise IndexError('The number of parameters is not correct')

def generate_date_ranges_for_walk_forward(start_month_year, end_month_year, day=15, n_months = 1):
    # Initialize an empty list to store the date ranges
    date_ranges = []
    if day not in range(1, 29):
        raise ValueError('Day must be between 1 and 28')
    # Convert the input strings to datetime objects, using the given day
    start_date = datetime.strptime(f"{day} {start_month_year}", '%d %b %Y')
    end_date = datetime.strptime(f"{day} {end_month_year}", '%d %b %Y')
    
    # Generate the date ranges
    current_date = start_date
    while current_date <= end_date:
        next_date = current_date + relativedelta(months=n_months)
        date_range = [current_date.strftime('%d %b %Y'), (next_date - relativedelta(days=1)).strftime('%d %b %Y')]
        date_ranges.append(date_range)
        current_date = next_date
    
    return date_ranges[:-1]

def get_previous_n_months(end_date_str, n_months):
    # Convert the input string to a datetime object
    end_date = datetime.strptime(end_date_str, '%d %b %Y')
    
    # Calculate the start date
    start_date = end_date - relativedelta(months=n_months)
    
    # Create the date range
    date_range = [start_date.strftime('%d %b %Y'), (end_date - relativedelta(days=1)).strftime('%d %b %Y')]
    
    return date_range

In [5]:
def walk_forward_analysis(evaluation_start, evaluation_end, evaluation_day,parameters,optimization_function = None, optimizer_params =[],  lookback_in_months = 6,evaluation_period = 3):
    generated_date_ranges = generate_date_ranges_for_walk_forward(evaluation_start, evaluation_end,evaluation_day,n_months = evaluation_period)
    df = {}
    for dates in generated_date_ranges:
        train_period = get_previous_n_months(dates[0], lookback_in_months)
        train_data = data_gather_from_files(train_period[0],train_period[1])['EURUSD.mid']
        print('Data gathered for training period: ',train_period[0],train_period[1])
        test_data = data_gather_from_files(dates[0],dates[1])['EURUSD.mid']
        print('Data gathered for testing period: ',dates[0],dates[1])
        max_loss, U_PNL, max_position, R_PNL,profit,optimal_params = optimization_function(train_data,test_data,parameters,optimizer_params)
        print('Optimal parameters are: ',optimal_params)
        #add optimization phrase evaluation G,n,d
        a, b, c, d, e = run_strategy_optimised(train_data, optimal_params[0],optimal_params[1],optimal_params[2])
        print('Optimization phrase; Max loss, U_PNL, max_position, R_PNL,profit are: ',a, b, c, d, e)
        ##
        print('Training phrase; Max loss, U_PNL, max_position, R_PNL,profit are: ',max_loss, U_PNL, max_position, R_PNL,profit)
        df[dates[0] +'-'+ dates[1]] = [max_loss, U_PNL, max_position, R_PNL,profit]
    df = pd.DataFrame(df).T
    df.columns = ['max_loss', 'min_U_PNL', 'max_position', 'R_PNL','profit']
    return df

In [6]:
import warnings
warnings.simplefilter("ignore", category=RuntimeWarning)

In [7]:
def parallel_map(evaluate_function, items, num_processes):
    with Pool(processes=num_processes) as pool:
        results = pool.map(evaluate_function, items)
    return results


In [8]:
import random
from deap import base, creator, tools

def objective(individual, train_data, grid_params, position_params):
    G, n, d = individual[0]*grid_params[2], individual[1]*position_params[2], individual[2]
    max_loss, U_PNL, max_position, R_PNL, profit = run_strategy_optimised(train_data, G, n, d)
    
    constraints = [
        max_position > 10e6,
        U_PNL < -150e3,
        max_loss < -500e3
        # scaling_factor * n * d > 10e6  # Uncomment if you have scaling_factor in your genes
    ]

    if any(constraints):
        return float('-inf'),  # Return large negative value when constraints are not satisfied
    return profit,

def deap_optimiser_g_n_d(train_data, test_data, parameters, optimization_params):
    ngen = optimization_params[0]  # number of generations
    npop = optimization_params[1]  # number of population
    error_check(parameters,3)
    
    grid_params = parameters[0]
    position_params = parameters[1]
    depth_params = parameters[2]

    creator.create("FitnessMax", base.Fitness, weights=(1.0,)) #maximizing
    creator.create("Individual", list, fitness=creator.FitnessMax)

    toolbox = base.Toolbox()

    # Define the genes for our individual
    toolbox.register("G_gene", random.randint, grid_params[0]//grid_params[2], grid_params[1]//grid_params[2])
    toolbox.register("n_gene", random.randint, position_params[0]//position_params[2], position_params[1]//position_params[2])
    toolbox.register("d_gene", random.randint, depth_params[0], depth_params[1])

    # Create an individual with the genes
    toolbox.register("individual", tools.initCycle, creator.Individual, (toolbox.G_gene, toolbox.n_gene, toolbox.d_gene), n=1)
    toolbox.register("population", tools.initRepeat, list, toolbox.individual)

    toolbox.register("mate", tools.cxTwoPoint)
    toolbox.register("mutate", tools.mutUniformInt, low=[grid_params[0]//grid_params[2], position_params[0]//position_params[2], depth_params[0]], 
                     up=[grid_params[1]//grid_params[2], position_params[1]//position_params[2], depth_params[1]], indpb=0.2)
    toolbox.register("select", tools.selTournament, tournsize=3)
    toolbox.register("evaluate", objective, train_data=train_data, grid_params=grid_params, position_params=position_params)

    population = toolbox.population(n=npop)
    CXPB, MUTPB = 0.5, 0.2

    # Evaluate the entire population
    #num_cores = 4  # Or however many cores you want to use or have available
    fitnesses = parallel_map(toolbox.evaluate, population, num_cores)

    #fitnesses = list(map(toolbox.evaluate, population))
    for ind, fit in zip(population, fitnesses):
        ind.fitness.values = fit

    #initiate early stopping
    stagnant_generations = 0  # Counter for generations without improvement
    MAX_STAGNANT_GEN = 20  # Early stopping criterion: stop if no improvement over x generations
    best_fitness_so_far = float('-inf')  # since we're maximizing
    ##
    for gen in range(ngen):
        offspring = toolbox.select(population, len(population))
        offspring = list(map(toolbox.clone, offspring))
        
        # Crossover
        for child1, child2 in zip(offspring[::2], offspring[1::2]):
            if random.random() < CXPB:
                toolbox.mate(child1, child2)
                del child1.fitness.values
                del child2.fitness.values
        
        # Mutation
        for mutant in offspring:
            if random.random() < MUTPB:
                toolbox.mutate(mutant)
                del mutant.fitness.values

        #fitnesses = list(map(toolbox.evaluate, offspring))
        fitnesses = parallel_map(toolbox.evaluate, offspring, num_cores)

        for ind, fit in zip(offspring, fitnesses):
            ind.fitness.values = fit

        #early stopping
        current_best_fitness = max(ind.fitness.values[0] for ind in population)

        if current_best_fitness > best_fitness_so_far:
            best_fitness_so_far = current_best_fitness
            stagnant_generations = 0  # Reset counter
        else:
            stagnant_generations += 1

        if stagnant_generations >= MAX_STAGNANT_GEN:
            print(f"Early stopping on generation {gen} due to no improvement.")
            break
        ##
        population[:] = offspring

    best_ind = tools.selBest(population, 1)[0]
    optimal_g = best_ind[0]*grid_params[2]
    optimal_n = best_ind[1]*position_params[2]
    optimal_d = best_ind[2]
    print("optimisation completed")
    max_loss, U_PNL, max_position, R_PNL,profit = run_strategy_optimised(test_data, optimal_g,optimal_n,optimal_d)
    return max_loss, U_PNL, max_position, R_PNL,profit,[optimal_g,optimal_n,optimal_d]


In [9]:
grid_params = [0.001,0.01,0.0005]
lot_params = [100000,2000000,100000]
depth_params = [3,12,1]

n_grid_params = ((grid_params[1]-grid_params[0])/grid_params[2])
n_lot_params = ((lot_params[1]-lot_params[0])/lot_params[2])
n_depth_params = ((depth_params[1]-depth_params[0])/depth_params[2])

print('number of grid params:-',(n_grid_params))
print('number of lot params:-',(n_lot_params))
print('number of depth params:-',(n_depth_params))
print('total_number_of_combinations:-',(n_grid_params*n_lot_params*n_depth_params))

# Adjust these parameter according to number of iterations
n_trials = 50 #NGEN 50-100 
npop = 300 # around 10% of search space
optimizer_param = [n_trials, npop]

parameters = [grid_params,lot_params,depth_params]
results = walk_forward_analysis('jan 2021','jan 2022',1,parameters,optimization_function=deap_optimiser_g_n_d,optimizer_params=[n_trials, npop],lookback_in_months=1,evaluation_period=1)
results

number of grid params:- 18.0
number of lot params:- 19.0
number of depth params:- 9.0
total_number_of_combinations:- 3078.0
Data gathered for training period:  01 Dec 2020 31 Dec 2020
Data gathered for testing period:  01 Jan 2021 31 Jan 2021
