In [21]:
import os
import sys

sys.dont_write_bytecode = True
os.environ["NUMBA_DISABLE_JIT"] = "1"
import pandas as pd
import numpy as np

from quantfreedom.class_practice.enums import *
from quantfreedom.class_practice.base import backtest_df_only
from quantfreedom.class_practice.helper_funcs import create_os_cart_product_nb


np.set_printoptions(formatter={"float_kind": "{:.2f}".format})
pd.options.display.float_format = "{:,.2f}".format

%load_ext autoreload
%autoreload 2


The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [2]:
# gonzalo : function definition
from plotly.subplots import make_subplots
from typing import List, Tuple
from scipy import stats
import numpy as np
import timeit
from datetime import datetime
import threading

def crossed_above_1d_nb(arr1, arr2, wait: int = 0):
    """Get the crossover of the first array going above the second array."""
    out = np.empty(arr1.shape, dtype=np.bool_)
    was_below = False
    crossed_ago = -1

    for i in range(arr1.shape[0]):
        if np.isnan(arr1[i]) or np.isnan(arr2[i]):
            crossed_ago = -1
            was_below = False
            out[i] = False
        elif arr1[i] > arr2[i]:
            if was_below:
                crossed_ago += 1
                out[i] = crossed_ago == wait
            else:
                out[i] = False
        elif arr1[i] == arr2[i]:
            crossed_ago = -1
            out[i] = False
        else:
            crossed_ago = -1
            was_below = True
            out[i] = False
    return out

def plot_candles_res_sup(data, rows, cols, support, resistance):
    fig = make_subplots(rows = rows, cols = cols, shared_xaxes=True, row_heights=[1000])
    fig.add_candlestick(
                    x=data.index.values,
                    open=data.open.values,
                    high=data.high.values,
                    low=data.low.values,
                    close=data.close.values,
                    name='BTC',
                    row=1,
                    col=1)
    fig.update_layout(xaxis_rangeslider_visible=False)      # FIXME : only updates the rangeslider for the first row, to do the same with others, use4 xaxis2_rangeslider, xaxis3_rangeslider and so on...
    fig.add_scatter(x=data.index.values, y=support, mode='lines', line=dict(color='red'), name='Support', row=1, col=1)
    fig.add_scatter(x=data.index.values, y=resistance, mode='lines', line=dict(color='blue'), name='Resistance', row=1, col=1)

    return fig

def get_long_entries(close_values, support):
    in_range = np.where(np.isnan(support), 0, 1)
    return in_range & crossed_above_1d_nb(close_values, support)                 

def get_long_exit(close_values, resistance):
    in_range = np.where(np.isnan(resistance), 0, 1)
    return in_range & crossed_above_1d_nb(resistance, close_values)              

def strat_trade_within_range(fig, data, support, resistance):
    close = data.Close.values
    dates = data.index.values

    long_entries = get_long_entries(close, support)
    long_exit = get_long_exit(close, resistance)
    long_entries_display = np.where(long_entries, data.Close.values, np.nan)
    long_exit_display = np.where(long_exit, data.Close.values, np.nan)

    fig.add_scatter(x=dates, y=long_entries_display, mode='markers', marker=dict(size=20, symbol="arrow-up", color="Green"), name='long entry', row=1, col=1, secondary_y=False)
    fig.add_scatter(x=dates, y=long_exit_display, mode='markers', marker=dict(size=20, symbol="arrow-down", color="Red"), name='long exit', row=1, col=1, secondary_y=False)


def find_sweet_ranges_optimized(total_price_candles) -> List[Tuple[int,int,int,int]]:
    print('Starting "find_sweet_ranges"')
    i = 1
    ci = 0.99
    min_chunk_size = 100
    max_sigma_pct_allowed = 0.10
    all_found_ranges : List[Tuple[int,int,int,int]] = []
    total_candles = len(total_price_candles)
    while i < total_candles:
        found_a_range = False

        current_mu = 0
        current_sigma = 0
        current_range_starting = i
        current_sweet_range = total_price_candles[-1*(i+min_chunk_size):-current_range_starting]

        mean = np.mean(current_sweet_range)
        var = np.var(current_sweet_range)
        std = np.std(current_sweet_range)
        while i < total_candles and var < (mean * max_sigma_pct_allowed):               # if N calculated have a variance lower than 'max_sigma_pct_allowed' from the mu calculated, then keep counting
            found_a_range = True
            current_mu = mean
            current_sigma = std
            i += 1

            # estimate parameters for the expanded range
            current_sweet_range = total_price_candles[-1*(i+min_chunk_size):-current_range_starting]
            mean = np.mean(current_sweet_range)
            var = np.var(current_sweet_range)
            std = np.std(current_sweet_range)

        if found_a_range:
            all_found_ranges.append((current_range_starting, i+min_chunk_size-1, current_mu, current_sigma))

        i = i + 1       # FIXME : in the case the internal while gets to True, I might be advancing 'i' one extra time and therefore loosing one candle. 
        
    return all_found_ranges

def find_sweet_ranges(total_price_candles) -> List[Tuple[int,int,int,int]]:
    print('Starting "find_sweet_ranges"')
    i = 1
    ci = 0.99
    min_chunk_size = 100
    max_sigma_pct_allowed = 0.10
    all_found_ranges : List[Tuple[int,int,int,int]] = []
    while i < len(total_price_candles):
        found_a_range = False

        current_mu = 0
        current_sigma = 0
        current_range_starting = i
        current_sweet_range = total_price_candles[-1*(i+min_chunk_size):-current_range_starting]
        try:
            res_mean, res_var, res_std = stats.bayes_mvs(current_sweet_range, alpha=ci)
        except ValueError as e:
            print(f'bayes_mvs error -> i={i}, current_range_starting={current_range_starting}, min_chunk_size={min_chunk_size}, len(current_sweet_range)={len(current_sweet_range)}')
        while i < len(total_price_candles) and res_var.statistic < (res_mean.statistic * max_sigma_pct_allowed):               # if N calculated have a variance lower than 'max_sigma_pct_allowed' from the mu calculated, then keep counting
            found_a_range = True
            current_mu = res_mean.statistic
            current_sigma = res_std.statistic
            i += 1

            # estimate parameters for the expanded range
            current_sweet_range = total_price_candles[-1*(i+min_chunk_size):-current_range_starting]
            try:
                #print(f'i={i}, current_range_starting={current_range_starting}, min_chunk_size={min_chunk_size}, len(current_sweet_range)={len(current_sweet_range)}')
                res_mean, res_var, res_std = stats.bayes_mvs(current_sweet_range, alpha=ci)
            except ValueError as e:
                print(f'bayes_mvs error -> i={i}, current_range_starting={current_range_starting}, min_chunk_size={min_chunk_size}, len(current_sweet_range)={len(current_sweet_range)}')
                found_a_range = False

        if found_a_range:
            all_found_ranges.append((current_range_starting, i+min_chunk_size-1, current_mu, current_sigma))

        i = i + 1       # FIXME : in the case the internal while gets to True, I might be advancing 'i' one extra time and therefore loosing one candle. 
        
    return all_found_ranges

def find_sourranding_bound(mu, sigma, confidence) -> Tuple[int,int]:
    return (stats.norm.ppf(confidence/100.0, mu, sigma), stats.norm.ppf(1-(confidence/100.0), mu, sigma))


def merge_ranges(price_candles : np.array, all_ranges : List[Tuple[int,int,int,int]]) -> Tuple[np.array, np.array]:
    lower_bound_values : np.array = np.array([np.nan] * len(price_candles))
    upper_bound_values : np.array = np.array([np.nan] * len(price_candles))
    for interval in all_ranges:
        bound_limits : Tuple[int,int] = find_sourranding_bound(interval[2], interval[3], 5)
        
        lower_bound_values[interval[0]:interval[1]] = bound_limits[0]
        upper_bound_values[interval[0]:interval[1]] = bound_limits[1]

    return (lower_bound_values[::-1], upper_bound_values[::-1])


In [3]:
import ccxt
exchange = ccxt.bitget(
    {
        'apiKey': 'bg_db47784d11a9a8e3d0e1ea2af8333d7f',
        'secret': '7b4b1e135abbb442210888603f520e607b2cb6476c4b9cfd466c61d735082b8d',
        'password': 'passphrasesnake1942'
    },
)
exchange.set_sandbox_mode(False)
#exchange.set_sandbox_mode(False)
exchange.options['defaultType'] = 'swap'
exchange.load_markets()
#symbol = "SBTC/SUSDT:SUSDT"
symbol = "BTC/USDT:USDT"

In [4]:
from datetime import datetime

def get_candles(exchange, symbol, start_date, end_date = None, timeframe = '5m'):
    if end_date is None:
       end_date =  datetime.now().timestamp() * 1000

    result_candles = []
    iterating_date = start_date
    while iterating_date < end_date:
        print(f'iterating_date={iterating_date}, end_date={end_date}')
        try:
            new_candles = exchange.fetch_ohlcv(symbol, since=iterating_date, limit=1000, timeframe=timeframe, params={'limit':1000})
        except Exception as e:
            print(f'Got exception -> {repr(e)}')
            break

        print(f'Got {len(new_candles)} new candles for since={iterating_date}')
        if len(new_candles) == 0:
            print(f'fetch_ohlcv for since={iterating_date} got 0 candles')
            break
        else:
            result_candles.extend(new_candles)
            iterating_date = result_candles[-1][0]

    return result_candles

In [5]:
data_candles = get_candles(exchange, symbol, start_date=1693567982000)

iterating_date=1693567982000, end_date=1695482280372.169
Got 999 new candles for since=1693567982000
iterating_date=1693867800000, end_date=1695482280372.169
Got 999 new candles for since=1693867800000
iterating_date=1694167800000, end_date=1695482280372.169
Got 999 new candles for since=1694167800000
iterating_date=1694467800000, end_date=1695482280372.169
Got 999 new candles for since=1694467800000
iterating_date=1694767800000, end_date=1695482280372.169
Got 999 new candles for since=1694767800000
iterating_date=1695067800000, end_date=1695482280372.169
Got 999 new candles for since=1695067800000
iterating_date=1695367800000, end_date=1695482280372.169
Got 381 new candles for since=1695367800000
iterating_date=1695482100000, end_date=1695482280372.169
Got exception -> ExchangeError('bitget {"code":"20001","msg":"startTime should be less than endTime","requestTime":1695482282178,"data":null}')


In [6]:
import pandas as pd
import numpy as np

data_index = pd.Index(
            data=pd.to_datetime(np.array(data_candles)[:,0], unit="ms"),
            name="open_time",
        )

data_cols = pd.MultiIndex.from_tuples(
    [
        (symbol, "open"),
        (symbol, "high"),
        (symbol, "low"),
        (symbol, "close"),
        (symbol, "volume"),
    ],
    names=["symbol", "candle_info"]
)
data = pd.DataFrame(
    np.array(data_candles)[:,1:],
    columns=data_cols,
    index=data_index,
)

data.drop(columns=(symbol,'volume'), inplace=True, axis=1)

In [7]:
data

symbol,BTC/USDT:USDT,BTC/USDT:USDT,BTC/USDT:USDT,BTC/USDT:USDT
candle_info,open,high,low,close
open_time,Unnamed: 1_level_2,Unnamed: 2_level_2,Unnamed: 3_level_2,Unnamed: 4_level_2
2023-09-01 11:40:00,26020.00,26039.50,26016.50,26017.50
2023-09-01 11:45:00,26017.50,26057.00,26017.00,26047.00
2023-09-01 11:50:00,26047.00,26048.00,26033.00,26035.50
2023-09-01 11:55:00,26035.50,26039.50,26024.00,26024.50
2023-09-01 12:00:00,26024.50,26044.50,26024.00,26044.50
...,...,...,...,...
2023-09-23 14:55:00,26560.30,26562.50,26560.10,26560.40
2023-09-23 15:00:00,26560.40,26562.40,26560.10,26561.20
2023-09-23 15:05:00,26561.20,26563.40,26560.00,26563.40
2023-09-23 15:10:00,26563.40,26564.50,26559.00,26561.20


In [8]:
data_index = pd.Index(
        data=pd.to_datetime(np.array(data_candles)[:,0], unit="ms"),
        name="open_time",
    )
data_index

DatetimeIndex(['2023-09-01 11:40:00', '2023-09-01 11:45:00',
               '2023-09-01 11:50:00', '2023-09-01 11:55:00',
               '2023-09-01 12:00:00', '2023-09-01 12:05:00',
               '2023-09-01 12:10:00', '2023-09-01 12:15:00',
               '2023-09-01 12:20:00', '2023-09-01 12:25:00',
               ...
               '2023-09-23 14:30:00', '2023-09-23 14:35:00',
               '2023-09-23 14:40:00', '2023-09-23 14:45:00',
               '2023-09-23 14:50:00', '2023-09-23 14:55:00',
               '2023-09-23 15:00:00', '2023-09-23 15:05:00',
               '2023-09-23 15:10:00', '2023-09-23 15:15:00'],
              dtype='datetime64[ns]', name='open_time', length=6375, freq=None)

In [9]:
data_cols = pd.MultiIndex.from_tuples(
        [
            (symbol, "open"),
            (symbol, "high"),
            (symbol, "low"),
            (symbol, "close"),
            (symbol, "volume"),
        ],
        names=["symbol", "candle_info"],
)
    

In [10]:
data = pd.DataFrame(
    np.array(data_candles)[:,1:],
    columns=data_cols,
    index=data_index,
)
data.drop(columns=(symbol,'volume'), inplace=True, axis=1)
data

symbol,BTC/USDT:USDT,BTC/USDT:USDT,BTC/USDT:USDT,BTC/USDT:USDT
candle_info,open,high,low,close
open_time,Unnamed: 1_level_2,Unnamed: 2_level_2,Unnamed: 3_level_2,Unnamed: 4_level_2
2023-09-01 11:40:00,26020.00,26039.50,26016.50,26017.50
2023-09-01 11:45:00,26017.50,26057.00,26017.00,26047.00
2023-09-01 11:50:00,26047.00,26048.00,26033.00,26035.50
2023-09-01 11:55:00,26035.50,26039.50,26024.00,26024.50
2023-09-01 12:00:00,26024.50,26044.50,26024.00,26044.50
...,...,...,...,...
2023-09-23 14:55:00,26560.30,26562.50,26560.10,26560.40
2023-09-23 15:00:00,26560.40,26562.40,26560.10,26561.20
2023-09-23 15:05:00,26561.20,26563.40,26560.00,26563.40
2023-09-23 15:10:00,26563.40,26564.50,26559.00,26561.20


In [11]:
data = pd.read_hdf("../../tests/data/400k5mcandles.hd5")
data = data.iloc[-2000:]
data

symbol,BTC/USDT:USDT,BTC/USDT:USDT,BTC/USDT:USDT,BTC/USDT:USDT
candle_info,open,high,low,close
open_time,Unnamed: 1_level_2,Unnamed: 2_level_2,Unnamed: 3_level_2,Unnamed: 4_level_2
2023-09-10 02:10:00,25847.50,25849.70,25841.70,25841.80
2023-09-10 02:15:00,25841.80,25849.60,25835.00,25847.80
2023-09-10 02:20:00,25847.90,25849.90,25840.20,25849.10
2023-09-10 02:25:00,25849.10,25849.10,25835.50,25837.00
2023-09-10 02:30:00,25837.00,25838.20,25835.40,25836.20
...,...,...,...,...
2023-09-17 00:15:00,26499.70,26505.50,26470.50,26503.40
2023-09-17 00:20:00,26503.40,26514.30,26497.80,26512.10
2023-09-17 00:25:00,26512.10,26514.30,26486.40,26489.30
2023-09-17 00:30:00,26489.50,26507.70,26489.50,26490.30


In [12]:
data[symbol]

candle_info,open,high,low,close
open_time,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2023-09-10 02:10:00,25847.50,25849.70,25841.70,25841.80
2023-09-10 02:15:00,25841.80,25849.60,25835.00,25847.80
2023-09-10 02:20:00,25847.90,25849.90,25840.20,25849.10
2023-09-10 02:25:00,25849.10,25849.10,25835.50,25837.00
2023-09-10 02:30:00,25837.00,25838.20,25835.40,25836.20
...,...,...,...,...
2023-09-17 00:15:00,26499.70,26505.50,26470.50,26503.40
2023-09-17 00:20:00,26503.40,26514.30,26497.80,26512.10
2023-09-17 00:25:00,26512.10,26514.30,26486.40,26489.30
2023-09-17 00:30:00,26489.50,26507.70,26489.50,26490.30


In [13]:
def plot_basic_entries_exists_with_range_opt(data):
    data_close = data[symbol, 'close'].values
    ranges = find_sweet_ranges_optimized(data_close)
    (support, resistance) = merge_ranges(data_close, ranges)
    fig = plot_candles_res_sup(data['BTC/USDT:USDT'], 1, 1, support, resistance)
    #strat_trade_within_range(fig, data, support, resistance)
    fig.show()

In [14]:
plot_basic_entries_exists_with_range_opt(data)

Starting "find_sweet_ranges"


In [15]:
def find_range(data, time_it=False):
    close_values = data[symbol, 'close'].values

    start = datetime.now()
    ranges = find_sweet_ranges_optimized(close_values)
    if time_it:
        print(f'"find_sweet_ranges" execution time = {(datetime.now() - start).total_seconds()}')
    
    start = datetime.now()
    result = merge_ranges(close_values, ranges)
    if time_it:
        print(f'"merge_ranges" execution time = {(datetime.now() - start).total_seconds()}')

    return result

In [16]:
def get_entries_exits_signals(data, time_it=False):
    close_values = data[symbol, 'close'].values

    start = datetime.now()
    (support, resistance) = find_range(data, time_it)
    if time_it:
        print(f'"find_range" execution time = {(datetime.now() - start).total_seconds()}')

    start = datetime.now()
    entries_array = get_long_entries(close_values, support)
    if time_it:
        print(f'"get_long_entries" execution time = {(datetime.now() - start).total_seconds()}')
    entries = pd.DataFrame(entries_array, index=data.index)

    start = datetime.now()
    exit_array = get_long_exit(close_values, resistance)
    if time_it:
        print(f'"get_long_exit" execution time = {(datetime.now() - start).total_seconds()}')
    exits = pd.DataFrame(exit_array, index=data.index)

    return (entries, exits)


In [17]:
def get_backtesting_config():
    account_state = AccountState()
    backtest_settings = BacktestSettings()
    exchange_settings = ExchangeSettings()
    order_settings_arrays = OrderSettingsArrays(
        risk_account_pct_size=np.array([1.0]) / 100,
        sl_based_on_add_pct=np.array([0.01]) / 100,
        sl_based_on_lookback=np.array([30]),
        risk_reward=np.array([1.0]),
        leverage_type=np.array([LeverageType.Dynamic]),
        sl_candle_body_type=np.array([CandleBodyType.Low]),
        increase_position_type=np.array([IncreasePositionType.RiskPctAccountEntrySize]),
        stop_loss_type=np.array([StopLossType.SLBasedOnCandleBody]),
        take_profit_type=np.array([TakeProfitType.Provided]),
        max_equity_risk_pct=np.array([3.0]) / 100,
        order_type=np.array([OrderType.Long]),
        sl_to_be_based_on_candle_body_type=np.array([CandleBodyType.High]),
        sl_to_be_when_pct_from_candle_body=np.array([1, 2]) / 100,
        sl_to_be_zero_or_entry_type=np.array([SLToBeZeroOrEntryType.ZeroLoss]),
        trail_sl_based_on_candle_body_type=np.array([CandleBodyType.Close]),
        trail_sl_when_pct_from_candle_body=np.array([2.0]) / 100,
        trail_sl_by_pct=np.array([1.0]) / 100,
        static_leverage=np.array([1.0]),
        tp_fee_type=np.array([TakeProfitFeeType.Limit]),
    )
    return (account_state, order_settings_arrays, backtest_settings, exchange_settings)

In [24]:
def run_backtesting(symbol, data : pd.DataFrame = None, time_it : bool = False) -> Tuple[pd.DataFrame, pd.DataFrame]:
    if data is None:
        data = pd.read_hdf("../../tests/data/400k5mcandles.hd5")
        data = data.iloc[20:]

    print(f'Starting backtesting execution for {len(data)} candles')
    account_state, order_settings_arrays, backtest_settings, exchange_settings = get_backtesting_config()
    os_cart_arrays = create_os_cart_product_nb(order_settings_arrays=order_settings_arrays)
    entries, exits = get_entries_exits_signals(data, time_it)
    start = datetime.now()
    backtest_result = backtest_df_only(
        account_state=account_state,
        os_cart_arrays=os_cart_arrays,
        backtest_settings=backtest_settings,
        exchange_settings=exchange_settings,
        price_data=data,
        entries=entries,
        exit_signals=exits,
    )
    if time_it:
        print(f'"backtest_df_only" execution time = {(datetime.now() - start).total_seconds()}')
    
    return backtest_result


In [26]:
strat_df = run_backtesting(symbol='BTC/USDT:USDT', time_it=True)

Starting backtesting execution for 423541 candles
Starting "find_sweet_ranges"
"find_sweet_ranges" execution time = 57.733791
"merge_ranges" execution time = 1.116863
"find_range" execution time = 58.850654
"get_long_entries" execution time = 2.004943
"get_long_exit" execution time = 1.381362
Starting the backtest now ... and also here are some stats for your backtest.

Total indicator settings to test: 1
Total order settings to test: 2
Total combinations of settings to test: 2

Total candles: 423,541
Total candles to test: 847,082
Long Order - check_move_stop_loss_to_be=true
Long Order - check_move_stop_loss_to_be=true
Long Order - check_move_stop_loss_to_be=true
Long Order - check_move_stop_loss_to_be=true
Long Order - check_move_stop_loss_to_be=true
Long Order - check_move_stop_loss_to_be=true
Long Order - check_move_stop_loss_to_be=true
Long Order - check_move_stop_loss_to_be=true
Long Order - check_move_stop_loss_to_be=true
Long Order - check_move_stop_loss_to_be=true
Long Order -

In [27]:
strat_df

Unnamed: 0,ind_set_idx,or_set_idx,total_trades,gains_pct,win_rate,to_the_upside,total_pnl,ending_eq
0,0,0,1433.0,1150.81,46.68,0.8,11508.05,12508.05
1,0,1,1407.0,1228.49,46.86,0.79,12284.85,13284.85
