In [None]:
import numpy as np
import pandas as pd
from numba import njit
import vectorbtpro as vbt
vbt.settings.set_theme("dark")
vbt.settings.plotting["layout"]["width"] = 800
vbt.settings.plotting['layout']['height'] = 200
import warnings
warnings.filterwarnings("ignore")

import pandas_ta as ta

In [None]:
btc_90M_db_vbt = vbt.BinanceData.load('data/btc_90M_db_vbt.pkl')

data = btc_90M_db_vbt['2021-01-01':'2023-01-01']
outofsample_data = btc_90M_db_vbt['2023-01-01':'2023-06-03']
print(data.shape)
print(outofsample_data.shape)
# Wherever you saved the pickle file
data_path = '/Users/ericervin/Documents/Coding/data-repository/data/fixed_BTCUSDT.csv'
min_data = vbt.BinanceData.from_csv(data_path)
print(min_data.shape)

# Example of building a simple portfolio sim in VBT
In the following cell we create an entries array and short_entries array then pass the ohlc and some stop paramaters

In [None]:
date_range = slice('2019', '2023-09-30') # Create a simple date range can use YYYY-MM-DD or YYYY-MM or YYYY

short_entries = min_data.index.to_series().dt.time == (pd.to_datetime('00:00').time()) # Simple Short Entry Signal at midnight every day
entries = min_data.index.to_series().dt.time == (pd.to_datetime('00:00').time() or pd.to_datetime('12:00').time()) # Simple Entry Signal at midnight and noon every day 

test_pf = vbt.Portfolio.from_signals(
    close               =min_data.loc[date_range].close,
    open                =min_data.loc[date_range].open,
    high                =min_data.loc[date_range].high,
    low                 =min_data.loc[date_range].low,
    # entries             =entries.loc[date_range], # Can comment this out if you want short only and vice versa or run them both
    short_entries       =short_entries.loc[date_range],
    tsl_th              =0.005, # Take Profit Treshold to start a trailing stop loss
    tsl_stop            =0.0015, # Trailing Stop Loss if tsl_th is reached
    sl_stop             =0.03, # Good old fashioned stop loss
    td_stop             =1440*0.25, # Time delta stop in minutes
    time_delta_format        ='rows', # Tells it what format, rows or index
    # fees                =0.00035,
)
print(test_pf.stats())

# Add some features

In [None]:
vbt.phelp(vbt.SUPERTREND.run)
vbt.phelp(vbt.MACD.run)
vbt.phelp(vbt.ATR.run)
vbt.phelp(vbt.RSI.run)
# vbt.phelp(vbt.ADX.run)

In [None]:
macd = vbt.MACD.run(min_data.close, fast_window=12*1440, slow_window=26*1440, signal_window=9*1440)

In [None]:
date_range = slice('2020-01-01', '2020-01-31')

macd[date_range].plot().show()


In [None]:
date_range = slice('2019', '2023-09-30') # Create a simple date range can use YYYY-MM-DD or YYYY-MM or YYYY

fast_window         = 12*1440
slow_window         = 26*1440
signal_window       = 12*1440

macd = vbt.MACD.run(min_data.close, fast_window=fast_window, slow_window=slow_window, signal_window=signal_window)

short_entries   = np.where(macd[date_range].macd < macd[date_range].signal, True, False) # short when macd is below signal
entries         = np.where(macd[date_range].macd > macd[date_range].signal, True, False) # enter when macd is above signal
test_pf = vbt.Portfolio.from_signals(
    close               =min_data.loc[date_range].close,
    open                =min_data.loc[date_range].open,
    high                =min_data.loc[date_range].high,
    low                 =min_data.loc[date_range].low,
    entries             =entries, # Can comment this out if you want short only and vice versa or run them both
    short_entries       =short_entries,
    tsl_th              =0.005, # Take Profit Treshold to start a trailing stop loss
    tsl_stop            =0.0015, # Trailing Stop Loss if tsl_th is reached
    sl_stop             =0.10, # Good old fashioned stop loss
    # td_stop             =1440*0.25, # Time delta stop in minutes
    # time_delta_format        ='rows', # Tells it what format, rows or index
    fees                =0.00035,
)
print(test_pf.stats())

In [None]:

macd_combinations = macd.run(
    min_data.close,
    fast_window=[12*1440, 26*1440],
    slow_window=[26*1440, 52*1440],
    signal_window=[9*1440, 12*1440],
    param_product=True,
        execute_kwargs=dict(
        engine="threadpool",
        chunk_len="auto",
        show_progress=True,
    )
)
   

In [None]:
vbt.phelp(vbt.MACD.run)

In [None]:
# You can see from the phelp that you can print `macd` or `signal``
macd_combinations.signal

In [None]:
# Create an entire table of all the combinations that you can create entry and exit signals from
macd_combinations = macd.run(
    min_data.close,
    fast_window=np.arange(770, 4*1440, 770),
    slow_window=np.arange(6*1440, 26*1440, 10*1440),
    signal_window=np.arange(360, 1440, 360),
    param_product=True,
        execute_kwargs=dict(
        engine="threadpool",
        chunk_len="auto",
        show_progress=True,
    )
)

print("Creating Buy and Sell Signals")
short_entries   = macd_combinations[date_range].macd < macd_combinations[date_range].signal # short when macd is below signal
entries         = macd_combinations[date_range].macd > macd_combinations[date_range].signal # enter when macd is above signal
clean_entries, clean_short_entries = entries.vbt.signals.clean(short_entries) # Clean the signals to make sure they are not overlapping this just grabs the first signal and ignores the rest until there is another crossover
print(f"No. of entries: {clean_entries.sum()}, No. of short entries: {clean_short_entries.sum()}") # Can comment this out if you don't care. It just prints out the number of entries and short entries for each combination.
print("Simulating Portfolio Backtests")

test_pf = vbt.Portfolio.from_signals(
    close               =min_data.loc[date_range].close,
    open                =min_data.loc[date_range].open,
    high                =min_data.loc[date_range].high,
    low                 =min_data.loc[date_range].low,
    entries             =clean_entries, # Can comment this out if you want short only and vice versa or run them both
    short_entries       =clean_short_entries,
    tsl_th              =0.005, # Take Profit Treshold to start a trailing stop loss
    tsl_stop            =0.0015, # Trailing Stop Loss if tsl_th is reached
    # tp_stop             =0.005, # Take Profit Stop
    sl_stop             =0.02, # Good old fashioned stop loss
    # td_stop             =1440*0.25, # Time delta stop in minutes
    # time_delta_format        ='rows', # Tells it what format, rows or index
    fees                =0.00035,
    leverage            =4,
)
# Print out a summary of the stats 
output = pd.concat([test_pf.sharpe_ratio, test_pf.total_return, test_pf.trades.win_rate, test_pf.trades.profit_factor, test_pf.trades.count()], axis=1)
output.columns = ['Sharpe Ratio', 'Total Return', 'Win Rate', 'Profit Factor', 'Trade Count']
output.sort_values(by='Profit Factor', ascending=False)
# print(test_pf.total_return)

In [None]:
best = test_pf.total_return.idxmax()
print(test_pf[best].stats())
test_pf[best].resample('1d').plot().show()

In [None]:
test_pf[best].resample('1d').plot_cum_returns(bm_returns=False).show()

In [None]:
output['Total Return'].vbt.plot_heatmap().show()

## Now work on RSI

In [None]:
vbt.phelp(vbt.RSI.run)

In [None]:
rsi = vbt.RSI.run(min_data.close, window=100)

rsi['2020-01-01':'2020-03-31'].plot().show()

In [None]:
overbought = 60
oversold = 40

rsi_combinations = vbt.RSI.run(
    min_data.close,
    window=np.arange(100, 300, 10),
    param_product=True,
        execute_kwargs=dict(
        engine="threadpool",
        chunk_len="auto",
        show_progress=True,
    )
)

print("Creating Buy and Sell Signals")
short_entries   = rsi_combinations[date_range].rsi.vbt > overbought 
entries         = rsi_combinations[date_range].rsi.vbt < oversold 
clean_entries, clean_short_entries   = entries.vbt.signals.clean(short_entries)

# print("Number of short entries: ", short_entries.sum())
# print("Number of long entries: ", entries.sum())
# print("Number of clean long entries: ", clean_entries.sum())
# print("Number of clean short entries: ", clean_short_entries.sum())
# print("Simulating Portfolio Backtests")

test_pf = vbt.Portfolio.from_signals(
    close               =min_data.loc[date_range].close,
    open                =min_data.loc[date_range].open,
    high                =min_data.loc[date_range].high,
    low                 =min_data.loc[date_range].low,
    entries             =entries, # Can comment this out if you want short only and vice versa or run them both
    short_entries       =short_entries,
    tsl_th              =0.005, # Take Profit Treshold to start a trailing stop loss
    tsl_stop            =0.0015, # Trailing Stop Loss if tsl_th is reached
    sl_stop             =0.02, # Good old fashioned stop loss
    # td_stop             =1440*0.25, # Time delta stop in minutes
    # time_delta_format        ='rows', # Tells it what format, rows or index
    fees                =0.00035,
)
# print(test_pf.stats())
# print(test_pf.total_return)
# print(f"RSI paramaters : {rsi_combinations.rsi.columns}")
# print(test_pf.sharpe_ratio)
# print(test_pf.total_return)
# Concat test_pf.sharpe_ratio and test_pf.total_return
# Print out a summary of the stats 
rsi_output = pd.concat([test_pf.sharpe_ratio, test_pf.total_return, test_pf.trades.win_rate, test_pf.trades.profit_factor, test_pf.trades.count()], axis=1)
rsi_output.columns = ['Sharpe Ratio', 'Total Return', 'Win Rate', 'Profit Factor', 'Trade Count']
rsi_output.sort_values(by='Profit Factor', ascending=False)# best_return_idx = test_pf.total_return.idxmax()
# print(test_pf[best_return_idx].stats())
# test_pf[best_return_idx].resample('1d').plot().show()

In [None]:
short_entries.vbt.signals.stats()

# Build a PSAR function
This function can also be used to upsample and downsample the data

In [None]:
def get_psar_signal(high, low, close, af0=0.02, step=0.02, max_=0.2, resample_period=None):
    """
    Returns a DataFrame with the following columns:
    - 'signal': 1 (long) or -1 (short)
    - 'close_long_price': price at which a long position should be closed
    - 'close_short_price': price at which a short position should be closed
    
    """
    data = pd.concat([high, low, close], axis=1)
    data.columns = ['High', 'Low', 'Close']

    # Resample data if resample_period is provided
    if resample_period:
        data_resampled = data.resample(resample_period, closed='right', label='right' ).agg({'High': 'max', 'Low': 'min', 'Close': 'last'})
        # Shift the resampled data forward by one to avoid look ahead bias
        data_resampled = data_resampled.shift(1).dropna()
    else:
        data_resampled = data
    
    psar = data_resampled.ta.psar(af0, step, max_)


    close_long_price = f"PSARl_{af0}_{max_}"   # or 'floor'
    close_short_price = f"PSARs_{af0}_{max_}"  # or 'ceiling'
    psar_reversal_col = f"PSARr_{af0}_{max_}"
    
    # print(psar[close_long_price].describe(), psar[close_short_price].describe(), psar[psar_reversal_col].describe())
    
    signal = np.zeros(len(psar))
    signal = np.where((psar[psar_reversal_col] == 1) & (psar[close_long_price].shift(1).notna()), 1, signal)  # breakout to the upside
    signal = np.where((psar[psar_reversal_col] == 1) & (psar[close_short_price].shift(1).notna()), -1, signal)  # breakout to the downside

    result = pd.DataFrame({
        'Close': data_resampled.Close,
        'signal': signal,
        'close_long_price': psar[close_long_price],
        'close_short_price': psar[close_short_price]
    }, index=data_resampled.index)

    # Reindex to the original timeframe and forward fill if resampling was done
    if resample_period:
        result = result.reindex(data.index).ffill()
        
        # Replace NaN values in the signal column with 0 (after resampling)
        result['signal'].fillna(0, inplace=True)

    return result


psar_signal = get_psar_signal(
    min_data.high, 
    min_data.low, 
    min_data.close, 
    resample_period='1h')
# print a small sample of the result
plot_columns = ['close_long_price', 'close_short_price']
fig = min_data.loc['2019-1-01':'2019-01-31'].close.vbt.plot()
psar_signal[plot_columns].loc['2019-01-01':'2020-01-31'].vbt.plot(fig=fig).show()


In [None]:
psar_signal = get_psar_signal(min_data.high, min_data.low, min_data.close, resample_period='1h')

In [None]:
psar_signal[plot_columns]

In [None]:
# print a small sample of the result
date_range = slice('2019-1-01', '2019-01-31')
plot_columns = ['close_long_price', 'close_short_price']
fig = min_data.loc[date_range].close.vbt.plot()
psar_signal[plot_columns].loc[date_range].vbt.plot(fig=fig).show()

if close_long_price is not nan then a bullish trend is in place

In [None]:
psar_pf = vbt.Portfolio.from_signals(
    close           =min_data.close,
    high            =min_data.high,
    low             =min_data.low,
    open            =min_data.open, 
    entries         =psar_signal['signal'] == 1, 
    exits           =psar_signal['signal'] == -1,
    short_entries   =psar_signal['signal'] == -1,
    short_exits     =psar_signal['signal'] == 1,
    tsl_th          =0.0050,
    tsl_stop        =0.0015,
    # fees            =0.00075,
    )
print(psar_pf.stats())

# Mean Reversion
Reverse the signals, if psar crosses up short it and if it crosses down get long


In [None]:
psar_pf = vbt.Portfolio.from_signals(
    close           =min_data.close,
    high            =min_data.high,
    low             =min_data.low,
    open            =min_data.open, 
    entries         =psar_signal['signal'] == -1, 
    exits           =psar_signal['signal'] == 1,
    short_entries   =psar_signal['signal'] == 1,
    short_exits     =psar_signal['signal'] == -1,
    fees            =0.00075,
    # tsl_th          =0.003,
    # tsl_stop        =0.0015,
    )
print(psar_pf.stats())
# psar_pf.plot().show()

In [None]:
psar_pf.plot().show()

## Now let's compare these same versions on minutely data

In [None]:
start = '2019-01-01'
end = '2023-09-30'
psar_signal = get_psar_signal(
    min_data.loc[start:end].high, 
    min_data.loc[start:end].low, 
    min_data.loc[start:end].close, 
    resample_period='2h')

In [None]:
# psar_signal.vbt.plot().show()

In [None]:
psar_pf = vbt.Portfolio.from_signals(
    close           =min_data.loc[start:end].close,
    high            =min_data.loc[start:end].high,
    low             =min_data.loc[start:end].low,
    open            =min_data.loc[start:end].open, 
    entries         =psar_signal['signal'] == 1, 
    exits           =psar_signal['signal'] == -1,
    short_entries   =psar_signal['signal'] == -1,
    short_exits     =psar_signal['signal'] == 1,
    # tsl_th          =0.003,
    # tsl_stop        =0.0015,
    freq            ='1m',
    fees            =0.0005,
    # sl_stop         =0.01,
    leverage        =1,
    )
print(psar_pf.stats())
# psar_pf.resample('1d').plot().show()

In [None]:
psar_signal['signal'].value_counts()

In [None]:
psar_signal.loc[start:'2019-01-31'][['Close','close_long_price', 'close_short_price']].vbt.plot().show()

# Reverse the signals
mean reversion version

In [None]:
psar_pf = vbt.Portfolio.from_signals(
    close           =min_data.loc[start:end].close,
    high            =min_data.loc[start:end].high,
    low             =min_data.loc[start:end].low,
    open            =min_data.loc[start:end].open, 
    entries         =psar_signal['signal'] == -1, 
    exits           =psar_signal['signal'] == 1,
    short_entries   =psar_signal['signal'] == 1,
    short_exits     =psar_signal['signal'] == -1,
    # tsl_th          =0.003,
    # tsl_stop        =0.0015,
    freq            ='1m',
    fees            =0.0014,
    # sl_stop         =0.01,
    leverage        =1,
    )
print(psar_pf.stats())
# psar_pf.resample('1d').plot().show()

In [None]:
psar_pf.resample('6h').plot().show()

In [None]:
min_data['2019-10-26':'2019-10-27 00:40:00'][['Open', 'High', 'Low', 'Close']].plot().show()

In [None]:
min_data.loc['2020-03-12'].close.vbt.plot().show()

In [None]:
# order by return
psar_pf.trades.records_readable.sort_values('Return', ascending=False).head(10)

In [None]:
psar_pf.trades.records_readable

# Now let's set it up for hyperparamater optimization

In [None]:
def get_psar_signal(high, low, close, af0=0.02, step=0.02, max_=0.2, resample_period=None):
    """
    Compute PSAR signals with optional resampling.
    
    Args:
    ... [same docstring arguments as before] ...
    
    - resample_period (str, optional): If provided, the data will be resampled to this period. E.g. '2H' for 2 hours.

    Returns:
    - DataFrame containing:
      * signal: buy signals (1), sell signals (-1), and no action (0).
      * close_long_price: Level at which a long position should be closed or reversed to short.
      * close_short_price: Level at which a short position should be closed or reversed to long.
    """
    # The next 3 lines help to work with numpy arrays because vbt converts them to numpy arrays

    high = pd.Series(high)
    low = pd.Series(low)
    close = pd.Series(close)
    
    data = pd.concat([high, low, close], axis=1)
    data.columns = ['High', 'Low', 'Close']

    # Resample data if resample_period is provided
    if resample_period:
        data_resampled = data.resample(resample_period).agg({'High': 'max', 'Low': 'min', 'Close': 'last'})
    else:
        data_resampled = data
    
    psar = data_resampled.ta.psar(af0, step, max_)

    close_long_price = f"PSARl_{af0}_{max_}"   # or 'floor'
    close_short_price = f"PSARs_{af0}_{max_}"  # or 'ceiling'
    psar_reversal_col = f"PSARr_{af0}_{max_}"

    signal = np.zeros(len(psar))
    signal = np.where((psar[psar_reversal_col] == 1) & (psar[close_long_price].shift(1).notna()), 1, signal)  # buy signal
    signal = np.where((psar[psar_reversal_col] == 1) & (psar[close_short_price].shift(1).notna()), -1, signal)  # sell signal

    result = pd.DataFrame({
        'signal': signal,
        'close_long_price': psar[close_long_price],
        'close_short_price': psar[close_short_price]
    }, index=data_resampled.index)
    
    # Reindex to the original timeframe and forward fill if resampling was done
    if resample_period:
        result = result.reindex(data.index).ffill()
        
        # Replace NaN values in the signal column with 0 (after resampling)
        result['signal'].fillna(0, inplace=True)

    return result["signal"], result["close_long_price"], result["close_short_price"]    

psar_indiator = vbt.IndicatorFactory(
    class_name='ParabolicSAR',
    short_name='psar',
    input_names=['high', 'low', 'close'],
    param_names=['af0', 'step', 'max_'],
    output_names=['signal','close_long_price', 'close_short_price'],
).with_apply_func(
    get_psar_signal,
    takes_1d=True,
    af0     =0.02,
    step    =0.02,
    max_    =0.2,
    resample_period=None,
)
psar_combinations = psar_indiator.run(
    data.high,
    data.low,
    data.close,
    af0     =np.arange(0.018,   0.022,   0.001),
    step    =0.02, #np.arange(0.02,   0.04,   0.01),
    max_    =np.arange(0.2,   0.25,  0.01),
    param_product=True,
        execute_kwargs=dict(
        engine="threadpool",
        chunk_len="auto",
        show_progress=True,
    )
)
   


In [None]:
pf = vbt.Portfolio.from_signals(
    close=  data.high,
    high=   data.high,
    low=    data.low,
    entries =       psar_combinations.signal==-1,
    exits =         psar_combinations.signal==1,
    short_entries = psar_combinations.signal==1,
    short_exits =   psar_combinations.signal==-1,
    freq = '10m',
    # tp_stop=0.003,
    tsl_th = 0.003,
    tsl_stop=0.0015,
    # sl_stop=0.02,

)
print(pf.stats())


In [None]:
# print the total returns for all the combinations
print(f'The best total return is {pf.total_return.max()}')

# Isolate the best Sharpe ratio portfolio
best_sharpe = pf.sharpe_ratio.max()
print(f'The best Sharpe ratio of all the combinations is {best_sharpe:.2f}')
best_sharpe_combination = pf.sharpe_ratio.idxmax()
print(f'The best combination is {best_sharpe_combination}')

# Isolate the best Sortino ratio portfolio
best_sortino = pf.sortino_ratio.max()
print(f'The best Sortino ratio of all the combinations is {best_sortino:.2f}')
best_sortino_combination = pf.sortino_ratio.idxmax()
print(f'The best combination is {best_sortino_combination}')

# Isolate the best Win rate portfolio
best_win_rate = pf.trades.win_rate.max() # Note these are in the portfolio.trades object not the portfolio object
print(f'The best Win rate of all the combinations is {best_win_rate:.2f}')
best_win_rate_combination = pf.trades.win_rate.idxmax() 
print(f'The best combination is {best_win_rate_combination}')

# Isolate the best max drawdown
best_max_drawdown = pf.max_drawdown.max()
print(f'The best max drawdown of all the combinations is {best_max_drawdown:.2%}')
best_max_drawdown_combination = pf.max_drawdown.idxmax()
print(f'The best combination is {best_max_drawdown_combination}')

# You get the gist. You can do this for any of the metrics in the stats dataframe

# Show the portfolio backtest simulation
# pf[13,9].plot().show() # you can call the pf object like a dictionary to get the backtest of a specific combination
# The above is the same as 
pf[best_sharpe_combination].plot().show() # you can call the pf object like a dictionary to get the backtest of a specific combination

# Show the portfolio backtest simulation

In [None]:
pf.sharpe_ratio.vbt.volume().show()