# Importing some packages

In [1]:
import numpy as np
from numba import njit
import pandas as pd
from pathlib import Path
from collections import defaultdict
from tqdm import tqdm

# Limited testing

In [2]:
@njit(cache=True)
def get_signals(signal_list, exit_list):

    start_idx = 0
    exit_idx = 0
   
    for i in range(len(signal_list[0])):

        if i == start_idx:

            if signal_list[0][i] == 0:

                start_idx += 1

                exit_list[0][i] = 0

            else:

                for j in range(i+1, len(exit_list[0])):
                    if exit_list[0][j] == -signal_list[0][i]:
                        exit_idx = j
                        break
                    else:
                        exit_idx = j

                for k in range(i+1, exit_idx+1):
                    if signal_list[0][k] == -signal_list[0][i]:
                        exit_idx = k
                        exit_list[1][k] = signal_list[1][k]
                        break
                    else:
                        exit_idx = k

                for p in range(i+1, exit_idx):
                    signal_list[0][p] = 0
                    exit_list[0][p] = 0
                
                if exit_idx == len(signal_list[0]) - 1 and exit_list[0][exit_idx] != -signal_list[0][i]:
                    exit_list[0][exit_idx] = 0
                    exit_list[0][i] = 0
                    signal_list[0][exit_idx] = 0
                else:
                    exit_list[0][exit_idx] = -signal_list[0][i]
                    exit_list[0][i] = 0
                    signal_list[0][exit_idx] = 0
                
                start_idx = exit_idx + 1

        else:

            continue

    if sum(np.abs(signal_list[0])) == sum(np.abs(exit_list[0])):

        return signal_list, exit_list
    
    else:

        for i in range(len(signal_list[0])):
            if signal_list[0][-(i+1)] != 0:
                signal_list[0][-(i+1)] = 0
                break

        return signal_list, exit_list
    
@njit(cache=True)
def create_position_open_prices(signal_list, exit_list):

    pos_open_prices = np.zeros(len(signal_list[0]))
    pos_exit_prices = np.zeros(len(exit_list[0]))

    start_idx = 0
    price_idx = 0

    for i in range(len(signal_list[0])):
        if exit_list[0][i] != 0:
            for j in range(start_idx, i):
                if signal_list[0][j] != 0:
                    price_idx = j
                    break
            pos_open_prices[i] = signal_list[1][price_idx]
            pos_exit_prices[i] = exit_list[1][i]
            start_idx = i
        else:
            pass

    return pos_open_prices, pos_exit_prices

@njit(cache=True)
def get_pnl_testing(
    trade_close_prices,
    signal_list, 
    trade_open_prices,
    commission=0.015,
    slippage=0.05,
    init_inv=20000,
    trade_size=0.1
):

    pnl_list = np.zeros(len(trade_close_prices))

    for i in range(len(trade_close_prices)):

        if signal_list[i] == 0 or trade_open_prices[i] == 0:
            pass
        
        # signal_list contains the points where exit occurs
        elif signal_list[i] == -1: 
            temp_n_assets = int(init_inv * trade_size / trade_open_prices[i])
            temp_pnl = temp_n_assets * (trade_close_prices[i] - trade_open_prices[i] * (1 + slippage)) 
            temp_pnl = temp_pnl * (1 - commission)
            init_inv += temp_pnl

        else:
            temp_n_assets = int(init_inv * trade_size / trade_open_prices[i])
            temp_pnl = temp_n_assets * (trade_open_prices[i] * (1 - slippage) - trade_close_prices[i])
            temp_pnl = temp_pnl * (1 - commission)
            init_inv += temp_pnl

        pnl_list[i] = temp_pnl

    return pnl_list


## Entry testing

### Fixed stop and target exit

In [3]:
@njit(cache=True)
def get_exit_entry_testing1(
    close_prices, 
    open_prices,  
    signal_list,
    stoploss_th,
    takeprofit_th, 
    commission, 
    slippage, 
    init_inv, 
    trade_size
):

    exit_list = np.zeros((2, len(close_prices)))

    for i in range(len(close_prices)-1):

        if signal_list[1][i] == 0:

            pass

        else:

            temp_n_assets = int(init_inv * trade_size / signal_list[1][i])
            if signal_list[0][i] == 1:
                temp_pnl = temp_n_assets * (close_prices[i] - signal_list[1][i] * (1 + slippage))
            else:
                temp_pnl = -temp_n_assets * (close_prices[i] - signal_list[1][i] * (1 - slippage))
            temp_pnl = temp_pnl * (1 - commission)
            init_inv += temp_pnl

            if -temp_pnl >= stoploss_th or temp_pnl >= takeprofit_th:
                exit_list[0][i+1] = -signal_list[0][i]
                exit_list[1][i+1] = open_prices[i+1]
            else:
                pass
        
    return exit_list

### Fixed bar exit

In [4]:
@njit(cache=True)
def get_exit_entry_testing2( 
    open_prices,  
    signal_list,
    n_exit_bars
):

    exit_list = np.zeros((2, len(signal_list[0])))

    n_exit_bars = np.int64(n_exit_bars)

    for i in range(len(signal_list[0])-1):

        if signal_list[0][i] == 0:

            pass

        else:
            
            if i + n_exit_bars < len(signal_list[0]):
                exit_list[0][i+n_exit_bars] = -signal_list[0][i]
                exit_list[1][i+n_exit_bars] = open_prices[i+n_exit_bars]
            else:
                pass
        
    return exit_list


### Random exit

In [5]:
@njit(cache=True)
def get_exit_entry_testing3( 
    open_prices,  
    signal_list
):

    exit_list = np.zeros((2, len(signal_list[0])))

    for i in range(len(signal_list[0])-1):

        if signal_list[0][i] == 0:

            pass

        else:

            for j in range(i+1, len(signal_list[0])):
                if signal_list[0][j] != 0:
                    j = j - 1
                    break
                else:
                    if np.random.rand() > 0.5:
                        break
            
            exit_list[0][j] = -signal_list[0][i]
            exit_list[1][j] = open_prices[j]
        
    return exit_list


### Winning percentage

In [6]:
def calculate_mean_win_perc_entry_testing(data, text_code):

    bars_per_5week = 7 * 60 * 24 * 5
    n_bars_per_year = 7 * 60 * 24 * 52

    n_not_worked = 0
    n_total_cases = 0

    entry_walk_forward_dict = defaultdict(list)

    for idx in range(0, n_bars_per_year, bars_per_5week):

        n_total_cases += 1

        df = data.iloc[idx:idx+bars_per_5week, :]
        df.reset_index(drop=True, inplace=True)
        
        try:
            exec_dict = {'data': df}
            exec(text_code, exec_dict)
            df = exec_dict['df']

            commission = exec_dict['COMMISSION']
            slippage = exec_dict['SLIPPAGE'] 
            init_inv = exec_dict['AVAILABLE_CAPITAL']
            trade_size = exec_dict['TRADE_SIZE'] 

            signal_idxs = exec_dict['buy_idxs'].copy()
            signal_idxs.extend(exec_dict['sell_idxs'])
            signal_idxs = sorted(signal_idxs)
            signal_idxs_true = [i - 1 for i in signal_idxs]

            df['new_signal'] = 0
            df.loc[df.index.isin(signal_idxs_true), 'new_signal'] = df.loc[df.index.isin(signal_idxs_true), 'signal'].values
            df['signal_prices'] = 0
            df.loc[df.index.isin(signal_idxs), 'signal_prices'] = df.loc[df.index.isin(signal_idxs), 'open'].values

            signal_list = np.zeros((2, df.shape[0]))
            signal_list[0][1:] = df['new_signal'].values[:-1]
            signal_list[1] = df['signal_prices'].values

            # fixed stop and target exit testing
            exit_list = get_exit_entry_testing1(
                close_prices=df['close'].values, 
                open_prices=df['open'].values,  
                signal_list=signal_list,
                stoploss_th=50,
                takeprofit_th=100,
                commission=commission, 
                slippage=slippage, 
                init_inv=init_inv, 
                trade_size=trade_size
            )

            signal_list, exit_list = get_signals(signal_list, exit_list)
            pos_open_prices, pos_exit_prices = create_position_open_prices(signal_list, exit_list)

            pnl_list = get_pnl_testing(
                trade_close_prices=pos_exit_prices,
                signal_list=exit_list[0], 
                trade_open_prices=pos_open_prices,
                commission=commission, 
                slippage=slippage, 
                init_inv=init_inv, 
                trade_size=trade_size
            )

            fixed_winning_percent = 100 * sum(pnl_list > 0) / np.sum(pnl_list != 0)
            entry_walk_forward_dict['fixed_sp_testing'].append(fixed_winning_percent)

            # fixed bar exit testing
            exit_list = get_exit_entry_testing2( 
                open_prices=df['open'].values,  
                signal_list=signal_list,
                n_exit_bars=5
            )

            signal_list, exit_list = get_signals(signal_list, exit_list)
            pos_open_prices, pos_exit_prices = create_position_open_prices(signal_list, exit_list)

            pnl_list = get_pnl_testing(
                trade_close_prices=pos_exit_prices,
                signal_list=exit_list[0], 
                trade_open_prices=pos_open_prices,
                commission=commission, 
                slippage=slippage, 
                init_inv=init_inv, 
                trade_size=trade_size
            )

            fixed_bar_winning_percent = 100 * sum(pnl_list > 0) / np.sum(pnl_list != 0)
            entry_walk_forward_dict['fixed_bar_testing'].append(fixed_bar_winning_percent)

            # random exit testing
            exit_list = get_exit_entry_testing3( 
                open_prices=df['open'].values,  
                signal_list=signal_list
            )

            signal_list, exit_list = get_signals(signal_list, exit_list)
            pos_open_prices, pos_exit_prices = create_position_open_prices(signal_list, exit_list)

            pnl_list = get_pnl_testing(
                trade_close_prices=pos_exit_prices,
                signal_list=exit_list[0], 
                trade_open_prices=pos_open_prices,
                commission=commission, 
                slippage=slippage, 
                init_inv=init_inv, 
                trade_size=trade_size
            )

            random_winning_percent = 100 * sum(pnl_list > 0) / np.sum(pnl_list != 0)
            entry_walk_forward_dict['random_exit_testing'].append(random_winning_percent)

        except:

            n_not_worked += 1

    mean_win_perc_dict = defaultdict(list)

    mean_win_perc_dict['Fixed_StopLoss_TakeProfit_testing'].\
        append(np.mean(entry_walk_forward_dict['fixed_sp_testing']))
    mean_win_perc_dict['Fixed_Bar_testing'].\
        append(np.mean(entry_walk_forward_dict['fixed_bar_testing']))
    mean_win_perc_dict['Random_Exit_testing'].\
        append(np.mean(entry_walk_forward_dict['random_exit_testing']))
    mean_win_perc_dict['Not_Working'].\
        append(100 * n_not_worked / n_total_cases)
    
    win_pc_df = pd.DataFrame(mean_win_perc_dict)

    return win_pc_df


## Exit testing

### Similar approach entry

In [7]:
@njit(cache=True)
def get_entry_exit_testing1(
    close_prices,
    open_prices,
    n_bars
):

    signal_list = np.zeros((2, len(close_prices)))

    # n_bars = np.int64(n_bars)

    for i in range(n_bars, len(signal_list[0])):
            
        if close_prices[i - n_bars] - close_prices[i - 1] > 0:
            signal_list[0][i] = 1
            signal_list[1][i] = open_prices[i]
        elif close_prices[i - n_bars] - close_prices[i - 1] < 0:
           signal_list[0][i] = -1
           signal_list[1][i] = open_prices[i]
        else:
            pass
        
    return signal_list

@njit(cache=True)
def get_rsi(close_prices, prev_close_prices, length=14):
    # Create numpy arrays to store the gain/loss values
    gains = np.zeros(len(close_prices))
    losses = np.zeros(len(close_prices))

    # Iterate through the data frame and calculate the gain/loss for each period
    for i in range(1, len(close_prices)):
        change = close_prices[i] - prev_close_prices[i]
        if change > 0:
            gains[i] = change
        elif change < 0:
            losses[i] = abs(change)

    # Calculate the average gain and loss for each period
    avg_gains = np.zeros(len(close_prices))
    avg_losses = np.zeros(len(close_prices))
    for i in range(length, len(close_prices)):
        avg_gains[i] = np.mean(gains[i-length:i])
        avg_losses[i] = np.mean(losses[i-length:i])

    # Calculate the relative strength and RSI for each period
    rs = np.zeros(len(close_prices))
    rsi = np.zeros(len(close_prices))
    
    for i in range(len(close_prices)):
        if i+1 < length:
            rsi[i] = -999
        elif avg_losses[i] == 0:
            rs[i] = avg_gains[i]
            rsi[i] = 100
        else:
            rs[i] = avg_gains[i] / avg_losses[i]
            rsi[i] = 100 - (100 / (1 + rs[i]))

    return rsi

@njit(cache=True)
def get_entry_exit_testing2(
    close_prices, open_prices, prev_close_prices, rsi_window_size, rsi_threshold
):

    signal_list = np.zeros((2, len(close_prices)))

    rsi = get_rsi(close_prices, prev_close_prices, length=rsi_window_size)

    for i in range(len(close_prices)-1):

        if i < rsi_window_size - 1 or rsi[i] == -999:
            continue
       
        if rsi[i] < rsi_threshold:
            signal_list[0][i+1] = 1
            signal_list[1][i+1] = open_prices[i+1]
        elif rsi[i] > (100 - rsi_threshold):
            signal_list[0][i+1] = -1
            signal_list[1][i+1] = open_prices[i+1]
        else:
            pass

    return signal_list


### Random entry

In [8]:
@njit(cache=True)
def get_entry_exit_testing3(
    open_prices
):

    signal_list = np.zeros((2, len(open_prices)))

    for i in range(len(open_prices)-1):

        if np.random.rand() > 0.7:
            signal_list[0][i] = 1
            signal_list[1][i] = open_prices[i]
        elif np.random.rand() < 0.3:
            signal_list[0][i] = -1
            signal_list[1][i] = open_prices[i]
        else:
            pass

    return signal_list


### Winning percentage

In [9]:
def calculate_mean_win_perc_exit_testing(data, text_code):

    bars_per_5week = 7 * 60 * 24 * 5
    n_bars_per_year = 7 * 60 * 24 * 52

    n_not_worked = 0
    n_total_cases = 0

    exit_walk_forward_dict = defaultdict(list)

    for idx in range(0, n_bars_per_year, bars_per_5week):

        n_total_cases += 1

        df = data.iloc[idx:idx+bars_per_5week, :]
        df.reset_index(drop=True, inplace=True)

        try:

            exec_dict = {'data': df}
            exec(text_code, exec_dict)
            df = exec_dict['df']

            commission = exec_dict['COMMISSION']
            slippage = exec_dict['SLIPPAGE'] 
            init_inv = exec_dict['AVAILABLE_CAPITAL']
            trade_size = exec_dict['TRADE_SIZE']       

            signal_idxs = exec_dict['buy_idxs'].copy()
            signal_idxs.extend(exec_dict['sell_idxs'])
            signal_idxs = sorted(signal_idxs)
            signal_idxs_true = [i - 1 for i in signal_idxs]

            df['exit_signal'] = 0
            df.loc[df.index.isin(signal_idxs_true[1:]), 'exit_signal'] = df.loc[df.index.isin(signal_idxs_true[1:]), 'signal'].values
            df['exit_prices'] = 0
            df.loc[df.index.isin(signal_idxs[1:]), 'exit_prices'] = df.loc[df.index.isin(signal_idxs[1:]), 'open'].values

            exit_list = np.zeros((2, df.shape[0]))
            exit_list[0][1:] = df['exit_signal'].values[:-1]
            exit_list[1] = df['exit_prices'].values

            # replacing entry with trend following entry
            signal_list = get_entry_exit_testing1(
                close_prices=df['close'].values,
                open_prices=df['open'].values,
                n_bars=5
            )

            signal_list, exit_list = get_signals(signal_list, exit_list)
            pos_open_prices, pos_exit_prices = create_position_open_prices(signal_list, exit_list)

            pnl_list = get_pnl_testing(
                trade_close_prices=pos_exit_prices,
                signal_list=exit_list[0], 
                trade_open_prices=pos_open_prices,
                commission=commission, 
                slippage=slippage, 
                init_inv=init_inv, 
                trade_size=trade_size
            )

            fixed_winning_percent = 100 * sum(pnl_list > 0) / np.sum(pnl_list != 0)
            exit_walk_forward_dict['trend_entry_testing'].append(fixed_winning_percent)

            # replacing entry with countertrend entry
            signal_list = get_entry_exit_testing2(
                close_prices=df['close'].values, 
                open_prices=df['open'].values, 
                prev_close_prices=df['close'].shift(1).fillna(method='bfill').values, 
                rsi_window_size=10, 
                rsi_threshold = 20
            )

            signal_list, exit_list = get_signals(signal_list, exit_list)
            pos_open_prices, pos_exit_prices = create_position_open_prices(signal_list, exit_list)

            pnl_list = get_pnl_testing(
                trade_close_prices=pos_exit_prices,
                signal_list=exit_list[0], 
                trade_open_prices=pos_open_prices,
                commission=commission, 
                slippage=slippage, 
                init_inv=init_inv, 
                trade_size=trade_size
            )

            fixed_bar_winning_percent = 100 * sum(pnl_list > 0) / np.sum(pnl_list != 0)
            exit_walk_forward_dict['countertrend_entry_testing'].append(fixed_bar_winning_percent)

            # random entry testing
            signal_list = get_entry_exit_testing3(
                open_prices=df['open'].values
            )

            signal_list, exit_list = get_signals(signal_list, exit_list)
            pos_open_prices, pos_exit_prices = create_position_open_prices(signal_list, exit_list)

            pnl_list = get_pnl_testing(
                trade_close_prices=pos_exit_prices,
                signal_list=exit_list[0], 
                trade_open_prices=pos_open_prices,
                commission=commission, 
                slippage=slippage, 
                init_inv=init_inv, 
                trade_size=trade_size
            )

            random_winning_percent = 100 * sum(pnl_list > 0) / np.sum(pnl_list != 0)
            exit_walk_forward_dict['random_entry_testing'].append(random_winning_percent)

        except:

            n_not_worked += 1

    mean_win_perc_dict = defaultdict(list)

    mean_win_perc_dict['Trend_testing'].\
        append(np.mean(exit_walk_forward_dict['trend_entry_testing']))
    mean_win_perc_dict['Countertrend_testing'].\
        append(np.mean(exit_walk_forward_dict['countertrend_entry_testing']))
    mean_win_perc_dict['Random_Entry_testing'].\
        append(np.mean(exit_walk_forward_dict['random_entry_testing']))
    mean_win_perc_dict['Not_Working'].\
        append(100 * n_not_worked / n_total_cases)
    
    win_pc_df = pd.DataFrame(mean_win_perc_dict)

    return win_pc_df


## Core testing

### Winning percentage

In [10]:
def calculate_mean_win_perc_core_testing(data, text_code):

    bars_per_5week = 7 * 60 * 24 * 5
    n_bars_per_year = 7 * 60 * 24 * 52

    n_not_worked = 0
    n_total_cases = 0

    core_walk_forward_dict = defaultdict(list)

    for idx in range(0, n_bars_per_year, bars_per_5week):

        n_total_cases += 1

        df = data.iloc[idx:idx+bars_per_5week, :]
        df.reset_index(drop=True, inplace=True)

        try:
            exec_dict = {'data': df}
            exec(text_code, exec_dict)
            df = exec_dict['df']
            pnl_list = exec_dict['all_arr']

            winning_percent = 100 * sum(pnl_list > 0) / np.sum(pnl_list != 0)
            core_walk_forward_dict['core_testing'].append(winning_percent)
        except:
            n_not_worked += 1

    mean_win_perc_dict = defaultdict(list)
    mean_win_perc_dict['Core_Testing'].\
        append(np.mean(core_walk_forward_dict['core_testing']))
    mean_win_perc_dict['Not_Working'].\
        append(100 * n_not_worked / n_total_cases)
    
    win_pc_df = pd.DataFrame(mean_win_perc_dict)

    return win_pc_df


### Performance

In [11]:
@njit(cache=True)
def get_drawdown(pnl_list):

    # running_total = np.cumsum(pnl_list)
    # max_running_total = np.maximum.accumulate(running_total)

    running_total = np.zeros(len(pnl_list))
    max_running_total = np.zeros(len(pnl_list))

    for i in range(len(pnl_list)):
        if i == 0:
            running_total[i] = pnl_list[i]
            max_running_total[i] = pnl_list[i]
        else:
            running_total[i] = running_total[i-1] + pnl_list[i]
            max_running_total[i] = max([max_running_total[i-1], pnl_list[i]])

    min_total = running_total[np.argmin(running_total)]

    max_total = max_running_total[np.argmin(running_total)]

    if max_total == 0:
        max_total = 0.00001

    max_drawdown = 100 * (min_total - max_total) / max_total

    if max_drawdown == -1:
        max_drawdown = 0

    return max_drawdown

@njit(cache=True)
def get_sharpe_ratio(
    pnl_list, 
    risk_free_rate=0
):
    # mean_return = np.mean(pnl_list)
    mean_return = 0
    for i in range(len(pnl_list)):
        mean_return += pnl_list[i]

    if len(pnl_list) == 0:
        mean_return = 0
    else:
        mean_return = mean_return / len(pnl_list)

    # std_dev = np.std(pnl_list)
    std_dev = 0
    for i in range(len(pnl_list)):
        std_dev += (pnl_list[i] - mean_return) ** 2

    if len(pnl_list) == 1:
        std_dev = np.sqrt(std_dev/len(pnl_list))
    else:
        std_dev = np.sqrt(std_dev/(len(pnl_list) - 1))

    if std_dev == 0:
        std_dev += 0.0001

    sharpe_ratio = (mean_return - risk_free_rate) / std_dev
    return sharpe_ratio


In [12]:
def calculate_mean_performance(data, text_code):

    bars_per_5week = 7 * 60 * 24 * 5
    n_bars_per_year = 7 * 60 * 24 * 52

    n_not_worked = 0
    n_total_cases = 0

    performance_walk_forward_dict = defaultdict(list)

    for idx in range(0, n_bars_per_year, bars_per_5week):

        n_total_cases += 1

        df = data.iloc[idx:idx+bars_per_5week, :]
        df.reset_index(drop=True, inplace=True)

        try:
            exec_dict = {'data': df}
            exec(text_code, exec_dict)
            df = exec_dict['df']
            pnl_list = exec_dict['all_arr']

            init_inv = exec_dict['AVAILABLE_CAPITAL']
            trade_size = exec_dict['TRADE_SIZE']       

            signal_idxs = exec_dict['buy_idxs'].copy()
            signal_idxs.extend(exec_dict['sell_idxs'])
            signal_idxs = sorted(signal_idxs)

            performance_walk_forward_dict['n_trades'].append(len(signal_idxs) - 1)
            performance_walk_forward_dict['pnl'].append(np.sum(pnl_list))
            performance_walk_forward_dict['roi'].append(100 * np.sum(pnl_list) / (trade_size * init_inv))
            performance_walk_forward_dict['avg_drawdown'].append(exec_dict['avg_drawdown'])
            performance_walk_forward_dict['drawdown'].append(get_drawdown(pnl_list))
            performance_walk_forward_dict['pnl_avgd_ratio'].append(exec_dict['fitness'])
            performance_walk_forward_dict['sharpe_ratio'].append(get_sharpe_ratio(pnl_list, risk_free_rate=0))
        except:
            n_not_worked += 1

    mean_perf_dict = defaultdict(list)
    mean_perf_dict['N_Trades'].\
        append(np.mean(performance_walk_forward_dict['n_trades']))
    mean_perf_dict['PNL'].\
        append(np.mean(performance_walk_forward_dict['pnl']))
    mean_perf_dict['ROI'].\
        append(np.mean(performance_walk_forward_dict['roi']))
    mean_perf_dict['AVG_Drawdown'].\
        append(np.mean(performance_walk_forward_dict['avg_drawdown']))
    mean_perf_dict['Drawdown'].\
        append(np.mean(performance_walk_forward_dict['drawdown']))
    mean_perf_dict['PNL_AVGD_Ratio'].\
        append(np.mean(performance_walk_forward_dict['pnl_avgd_ratio']))
    mean_perf_dict['Sharpe_Ratio'].\
        append(np.mean(performance_walk_forward_dict['sharpe_ratio']))
    mean_perf_dict['Not_Working'].\
        append(100 * n_not_worked / n_total_cases)
    
    perf_df = pd.DataFrame(mean_perf_dict)

    return perf_df


# Data loading function

In [13]:
def generate_52w_data(data_path):
    df = pd.read_csv(data_path)
    df['datetime'] = pd.to_datetime(df['datetime'])
    df = df.iloc[-(7 * 60 * 24 * 52):]
    df.sort_values('datetime', ascending=True, inplace=True)
    df.reset_index(inplace=True, drop=True)
    return df

# Testing strategies constructed from combination with ETH

In [14]:
data_path = Path(r'C:/\Users/\vchar/\OneDrive/\Desktop/\ML Projects/\Upwork/\AlgoT_ML_Dev/\GrammarEvolution/\PonyGE2/\datasets/\BTC-ETH-1m.csv')
df_52w = generate_52w_data(data_path)
df_52w.head()

Unnamed: 0,datetime,btc_open,btc_high,btc_low,btc_close,btc_volume,eth_open,eth_high,eth_low,eth_close,eth_volume
0,2024-09-07 14:22:00,54626.71,54626.71,54575.54,54587.68,3.02957,2295.48,2295.48,2292.22,2292.22,8.133025
1,2024-09-07 14:23:00,54579.67,54596.7,54557.25,54588.69,4.557107,2292.23,2293.17,2291.09,2292.84,18.279947
2,2024-09-07 14:24:00,54585.02,54605.49,54568.01,54605.49,3.003662,2292.75,2293.33,2291.55,2292.98,12.682892
3,2024-09-07 14:25:00,54605.49,54609.63,54581.65,54587.49,0.997784,2292.98,2293.07,2291.24,2292.0,29.546565
4,2024-09-07 14:26:00,54586.1,54658.16,54586.1,54658.16,2.026429,2291.81,2294.84,2291.8,2294.8,7.415414


In [16]:
strategy_file_path = Path(r'C:/\Users/\vchar/\OneDrive/\Desktop/\ML Projects/\Upwork/\AlgoT_ML_Dev/\GrammarEvolution/\PonyGE2/\derived_strategies/\ge_results_eth_comb2.csv')

df_str = pd.read_csv(strategy_file_path)
df_str = df_str[(df_str['fitness'] < 0) & (df_str['fitness'] < -2)]
df_str.sort_values('fitness', ascending=True, inplace=True)
df_str.reset_index(drop=True, inplace=True)
df_str

Unnamed: 0,buy,sell,fitness
0,"df['eth_high_1'].values.reshape(-1, 1) != df['...","(np.cos(df['btc_close'].values.reshape(-1, 1))...",-2046.165250
1,"df['eth_high_1'].values.reshape(-1, 1) != df['...","(np.cos(df['btc_close'].values.reshape(-1, 1))...",-2036.255448
2,"df['eth_high_1'].values.reshape(-1, 1) != df['...","(np.cos(df['btc_close'].values.reshape(-1, 1))...",-2036.255448
3,"df['eth_high_1'].values.reshape(-1, 1) != df['...","(np.cos(df['btc_close'].values.reshape(-1, 1))...",-2036.255448
4,"df['eth_high_1'].values.reshape(-1, 1) != df['...","(df['btc_open_1'].values.reshape(-1, 1) % 906 ...",-1992.333536
...,...,...,...
3488,"np.sin(df['eth_low_1'].values.reshape(-1, 1)) ...","(df['btc_close'].values.reshape(-1, 1) % 33.80...",-2.309342
3489,"df['eth_low_1'].values.reshape(-1, 1) != df['e...","(np.sin(df['eth_open_2'].values.reshape(-1, 1)...",-2.266570
3490,"df['eth_low_1'].values.reshape(-1, 1) != df['e...","(np.cos(df['eth_close_2'].values.reshape(-1, 1...",-2.147366
3491,"df['eth_high_3'].values.reshape(-1, 1) != df['...","(df['eth_low_2'].values.reshape(-1, 1) % 604 <...",-2.018926


In [17]:
final_entry_win_pc_df = pd.DataFrame()
final_exit_win_pc_df = pd.DataFrame()
final_core_win_pc_df = pd.DataFrame()
final_perf_df = pd.DataFrame()

lag_txt = '{i}'

for row in tqdm(df_str.iloc[:10].itertuples()):

    buy_signal_txt = row.buy
    sell_signal_txt = row.sell

    txt_code_ind = f'''import pandas as pd
import numpy as np
from src.fitness.indicators import indicators
from numba import njit
COMMISSION = 0.015
SLIPPAGE = 0.00005
AVAILABLE_CAPITAL = 700000
TRADE_SIZE = 0.5
@njit
def merge_pnl(arr1, arr2):
    out = np.zeros((len(arr1) + len(arr2)))
    idx = 1
    for i in range(len(arr1) + len(arr2)):
        if i % 2 == 0:
            out[i] = arr1[int(i/2)]
        else:
            out[i] = arr2[i-idx]
        idx += 1
    return out
@njit
def get_drawdowns(arr):
    drawdowns = np.zeros((len(arr)))
    max = arr[0]
    for i in range(1, len(drawdowns)-1):
        if arr[i-1] > arr[i] and arr[i] < arr[i+1]:
            min = arr[i]
            drawdowns[i] = max - min
        elif arr[i-1] < arr[i] and arr[i] > arr[i+1]:
            max = arr[i]
    return drawdowns
@njit
def get_pnl(trade_close_prices, trade_open_prices, commission, slippage, init_inv, trade_size, is_buy):
    pnl_list = np.zeros(len(trade_close_prices))
    for i in range(len(trade_close_prices)):
        temp_n_assets = int(init_inv * trade_size / trade_open_prices[i])
        if is_buy == 1:
            temp_pnl = temp_n_assets * (trade_close_prices[i] - trade_open_prices[i] * (1 + slippage))
        else:
            temp_pnl = -temp_n_assets * (trade_close_prices[i] - trade_open_prices[i] * (1 - slippage))
        temp_pnl = temp_pnl * (1 - commission)
        init_inv += temp_pnl
        pnl_list[i] = temp_pnl
    return pnl_list
df = data.copy()
for i in range(1, 6):
    df[f'btc_close_{lag_txt}'] = df['btc_close'].shift(i)
    df[f'btc_open_{lag_txt}'] = df['btc_open'].shift(i)
    df[f'btc_high_{lag_txt}'] = df['btc_high'].shift(i)
    df[f'btc_low_{lag_txt}'] = df['btc_low'].shift(i)
    df[f'btc_volume_{lag_txt}'] = df['btc_volume'].shift(i)
    df[f'eth_close_{lag_txt}'] = df['eth_close'].shift(i)
    df[f'eth_open_{lag_txt}'] = df['eth_open'].shift(i)
    df[f'eth_high_{lag_txt}'] = df['eth_high'].shift(i)
    df[f'eth_low_{lag_txt}'] = df['eth_low'].shift(i)
    df[f'eth_volume_{lag_txt}'] = df['eth_volume'].shift(i)
df.dropna(inplace=True)
df.reset_index(drop=True, inplace=True)
df['buy'] = ({buy_signal_txt}).astype(int)
df['sell'] = ({sell_signal_txt}).astype(int)
df['signal'] = df['buy'] + df['sell']
df['signal'] = df['signal'].apply(lambda x: 1 if x==1 else 0)
df['sell'] = df['sell'] * (-1)
df['signal'] = df['signal'] * df['sell']
df['signal'] = df['signal'] + df['buy']
df.drop(columns=['buy', 'sell'], inplace=True)
buy_idxs = []
sell_idxs = []
is_buy = 0
is_sell = 0
for i, row in enumerate(df.itertuples()):
    if row.signal == 1 and is_buy == 0:
        buy_idxs.append(i+1)
        is_buy = 1
        is_sell = 0
    elif row.signal == -1 and is_sell == 0:
        sell_idxs.append(i+1)
        is_sell = 1
        is_buy = 0
if len(buy_idxs) > len(sell_idxs):
    buy_idxs = buy_idxs[:-(len(buy_idxs) - len(sell_idxs))]
elif len(buy_idxs) < len(sell_idxs):
    sell_idxs = sell_idxs[:-(len(sell_idxs) - len(buy_idxs))]
buy_prices = df[df.index.isin(buy_idxs)]['btc_open'].values
sell_prices = df[df.index.isin(sell_idxs)]['btc_open'].values
if buy_idxs[0] < sell_idxs[0]:
    buy_arr = get_pnl(sell_prices, buy_prices, COMMISSION, SLIPPAGE, AVAILABLE_CAPITAL, TRADE_SIZE, 1)
    buy_pnl = np.sum(buy_arr)
    sell_arr = get_pnl(buy_prices[1:], sell_prices[:-1], COMMISSION, SLIPPAGE, AVAILABLE_CAPITAL, TRADE_SIZE, 0)
    sell_pnl = np.sum(sell_arr)
    all_arr = merge_pnl(buy_arr, sell_arr)
else:
    sell_arr = get_pnl(buy_prices, sell_prices, COMMISSION, SLIPPAGE, AVAILABLE_CAPITAL, TRADE_SIZE, 0)
    sell_pnl = np.sum(sell_arr)
    buy_arr = get_pnl(sell_prices[1:], buy_prices[:-1], COMMISSION, SLIPPAGE, AVAILABLE_CAPITAL, TRADE_SIZE, 1)
    buy_pnl = np.sum(buy_arr)
    all_arr = merge_pnl(sell_arr, buy_arr)
total_pnl = buy_pnl + sell_pnl
equity_curve_arr = np.cumsum(all_arr)
drawdowns = get_drawdowns(equity_curve_arr)
avg_drawdown = np.sum(drawdowns[drawdowns!=0]) / len(drawdowns[drawdowns!=0])
fitness = total_pnl / avg_drawdown'''

    entry_win_pc_df = calculate_mean_win_perc_entry_testing(
        data=df_52w, 
        text_code=txt_code_ind
    )

    temp_signal_df = pd.DataFrame({'buy': [buy_signal_txt], 'sell': [sell_signal_txt]})
    entry_win_pc_df = pd.concat([temp_signal_df, entry_win_pc_df], axis=1)
    final_entry_win_pc_df = pd.concat([final_entry_win_pc_df, entry_win_pc_df])

    exit_win_pc_df = calculate_mean_win_perc_exit_testing(
        data=df_52w, 
        text_code=txt_code_ind
    )

    exit_win_pc_df = pd.concat([temp_signal_df, exit_win_pc_df], axis=1)
    final_exit_win_pc_df = pd.concat([final_exit_win_pc_df, exit_win_pc_df])

    core_win_pc_df = calculate_mean_win_perc_core_testing(
        data=df_52w, 
        text_code=txt_code_ind
    )

    core_win_pc_df = pd.concat([temp_signal_df, core_win_pc_df], axis=1)
    final_core_win_pc_df = pd.concat([final_core_win_pc_df, core_win_pc_df])

    perf_df = calculate_mean_performance(
        data=df_52w, 
        text_code=txt_code_ind
    )

    perf_df = pd.concat([temp_signal_df, perf_df], axis=1)
    final_perf_df = pd.concat([final_perf_df, perf_df])

0it [00:00, ?it/s]

10it [00:49,  4.99s/it]


In [18]:
final_entry_win_pc_df

Unnamed: 0,buy,sell,Fixed_StopLoss_TakeProfit_testing,Fixed_Bar_testing,Random_Exit_testing,Not_Working
0,"df['eth_high_1'].values.reshape(-1, 1) != df['...","(np.cos(df['btc_close'].values.reshape(-1, 1))...",,,,100.0
0,"df['eth_high_1'].values.reshape(-1, 1) != df['...","(np.cos(df['btc_close'].values.reshape(-1, 1))...",,,,100.0
0,"df['eth_high_1'].values.reshape(-1, 1) != df['...","(np.cos(df['btc_close'].values.reshape(-1, 1))...",,,,100.0
0,"df['eth_high_1'].values.reshape(-1, 1) != df['...","(np.cos(df['btc_close'].values.reshape(-1, 1))...",,,,100.0
0,"df['eth_high_1'].values.reshape(-1, 1) != df['...","(df['btc_open_1'].values.reshape(-1, 1) % 906 ...",,,,100.0
0,"df['eth_high_1'].values.reshape(-1, 1) != df['...","(df['btc_open_1'].values.reshape(-1, 1) % 701 ...",,,,100.0
0,"df['eth_high_1'].values.reshape(-1, 1) != df['...","(df['btc_open_1'].values.reshape(-1, 1) % 906 ...",,,,100.0
0,"df['eth_high_1'].values.reshape(-1, 1) != df['...","(df['eth_close_5'].values.reshape(-1, 1) % 701...",,,,100.0
0,"df['eth_high_1'].values.reshape(-1, 1) != df['...","(df['btc_open_1'].values.reshape(-1, 1) % 701 ...",,,,100.0
0,"df['eth_high_1'].values.reshape(-1, 1) != df['...","(df['btc_open_1'].values.reshape(-1, 1) % 701 ...",,,,100.0


In [19]:
final_exit_win_pc_df

Unnamed: 0,buy,sell,Trend_testing,Countertrend_testing,Random_Entry_testing,Not_Working
0,"df['eth_high_1'].values.reshape(-1, 1) != df['...","(np.cos(df['btc_close'].values.reshape(-1, 1))...",,,,100.0
0,"df['eth_high_1'].values.reshape(-1, 1) != df['...","(np.cos(df['btc_close'].values.reshape(-1, 1))...",,,,100.0
0,"df['eth_high_1'].values.reshape(-1, 1) != df['...","(np.cos(df['btc_close'].values.reshape(-1, 1))...",,,,100.0
0,"df['eth_high_1'].values.reshape(-1, 1) != df['...","(np.cos(df['btc_close'].values.reshape(-1, 1))...",,,,100.0
0,"df['eth_high_1'].values.reshape(-1, 1) != df['...","(df['btc_open_1'].values.reshape(-1, 1) % 906 ...",,,,100.0
0,"df['eth_high_1'].values.reshape(-1, 1) != df['...","(df['btc_open_1'].values.reshape(-1, 1) % 701 ...",,,,100.0
0,"df['eth_high_1'].values.reshape(-1, 1) != df['...","(df['btc_open_1'].values.reshape(-1, 1) % 906 ...",,,,100.0
0,"df['eth_high_1'].values.reshape(-1, 1) != df['...","(df['eth_close_5'].values.reshape(-1, 1) % 701...",,,,100.0
0,"df['eth_high_1'].values.reshape(-1, 1) != df['...","(df['btc_open_1'].values.reshape(-1, 1) % 701 ...",,,,100.0
0,"df['eth_high_1'].values.reshape(-1, 1) != df['...","(df['btc_open_1'].values.reshape(-1, 1) % 701 ...",,,,100.0


In [20]:
final_core_win_pc_df

Unnamed: 0,buy,sell,Core_Testing,Not_Working
0,"df['eth_high_1'].values.reshape(-1, 1) != df['...","(np.cos(df['btc_close'].values.reshape(-1, 1))...",33.333333,90.909091
0,"df['eth_high_1'].values.reshape(-1, 1) != df['...","(np.cos(df['btc_close'].values.reshape(-1, 1))...",40.0,90.909091
0,"df['eth_high_1'].values.reshape(-1, 1) != df['...","(np.cos(df['btc_close'].values.reshape(-1, 1))...",40.0,90.909091
0,"df['eth_high_1'].values.reshape(-1, 1) != df['...","(np.cos(df['btc_close'].values.reshape(-1, 1))...",40.0,90.909091
0,"df['eth_high_1'].values.reshape(-1, 1) != df['...","(df['btc_open_1'].values.reshape(-1, 1) % 906 ...",40.0,90.909091
0,"df['eth_high_1'].values.reshape(-1, 1) != df['...","(df['btc_open_1'].values.reshape(-1, 1) % 701 ...",40.0,90.909091
0,"df['eth_high_1'].values.reshape(-1, 1) != df['...","(df['btc_open_1'].values.reshape(-1, 1) % 906 ...",40.0,90.909091
0,"df['eth_high_1'].values.reshape(-1, 1) != df['...","(df['eth_close_5'].values.reshape(-1, 1) % 701...",40.0,90.909091
0,"df['eth_high_1'].values.reshape(-1, 1) != df['...","(df['btc_open_1'].values.reshape(-1, 1) % 701 ...",40.0,90.909091
0,"df['eth_high_1'].values.reshape(-1, 1) != df['...","(df['btc_open_1'].values.reshape(-1, 1) % 701 ...",40.0,90.909091


In [21]:
final_perf_df

Unnamed: 0,buy,sell,N_Trades,PNL,ROI,AVG_Drawdown,Drawdown,PNL_AVGD_Ratio,Sharpe_Ratio,Not_Working
0,"df['eth_high_1'].values.reshape(-1, 1) != df['...","(np.cos(df['btc_close'].values.reshape(-1, 1))...",3.0,14765.226544,4.218636,7.216048,17949.524627,2046.16525,0.508753,90.909091
0,"df['eth_high_1'].values.reshape(-1, 1) != df['...","(np.cos(df['btc_close'].values.reshape(-1, 1))...",5.0,14722.2552,4.206359,7.216048,36280.558596,2036.255448,0.394057,90.909091
0,"df['eth_high_1'].values.reshape(-1, 1) != df['...","(np.cos(df['btc_close'].values.reshape(-1, 1))...",5.0,14722.2552,4.206359,7.216048,36280.558596,2036.255448,0.394057,90.909091
0,"df['eth_high_1'].values.reshape(-1, 1) != df['...","(np.cos(df['btc_close'].values.reshape(-1, 1))...",5.0,14722.2552,4.206359,7.216048,36280.558596,2036.255448,0.394057,90.909091
0,"df['eth_high_1'].values.reshape(-1, 1) != df['...","(df['btc_open_1'].values.reshape(-1, 1) % 906 ...",5.0,12040.797473,3.440228,5.980353,40200.632768,1992.333536,0.436608,90.909091
0,"df['eth_high_1'].values.reshape(-1, 1) != df['...","(df['btc_open_1'].values.reshape(-1, 1) % 701 ...",5.0,12040.797473,3.440228,5.980353,40200.632768,1992.333536,0.436608,90.909091
0,"df['eth_high_1'].values.reshape(-1, 1) != df['...","(df['btc_open_1'].values.reshape(-1, 1) % 906 ...",5.0,12040.797473,3.440228,5.980353,40200.632768,1992.333536,0.436608,90.909091
0,"df['eth_high_1'].values.reshape(-1, 1) != df['...","(df['eth_close_5'].values.reshape(-1, 1) % 701...",5.0,12040.797473,3.440228,5.980353,40200.632768,1992.333536,0.436608,90.909091
0,"df['eth_high_1'].values.reshape(-1, 1) != df['...","(df['btc_open_1'].values.reshape(-1, 1) % 701 ...",5.0,12040.797473,3.440228,5.980353,40200.632768,1992.333536,0.436608,90.909091
0,"df['eth_high_1'].values.reshape(-1, 1) != df['...","(df['btc_open_1'].values.reshape(-1, 1) % 701 ...",5.0,12040.797473,3.440228,5.980353,40200.632768,1992.333536,0.436608,90.909091


In [22]:
final_entry_win_pc_df.to_csv('testing_results/entry_testing_eth_comb.csv', index=False)
final_exit_win_pc_df.to_csv('testing_results/exit_testing_eth_comb.csv', index=False)
final_core_win_pc_df.to_csv('testing_results/core_testing_eth_comb.csv', index=False)
final_perf_df.to_csv('testing_results/perf_eth_comb.csv', index=False)