In [None]:

import numpy as np
import vectorbtpro as vbt
import pandas as pd
from numba import njit
vbt.settings.set_theme("dark")
vbt.settings.plotting["layout"]["width"] = 800
vbt.settings.plotting['layout']['height'] = 200
vbt.settings.plotting.use_resampler = True
# show all columns
pd.set_option('display.max_columns', None)


Import the data for BTC and ETH and put them in a single data object

In [None]:

btc_data_path = '/Users/ericervin/Documents/Coding/data-repository/data/fixed_BTCUSDT.csv'
eth_data_path = '/Users/ericervin/Documents/Coding/data-repository/data/fixed_ETHUSDT.csv'
eth_min_data = vbt.BinanceData.from_csv(eth_data_path)
btc_min_data = vbt.BinanceData.from_csv(btc_data_path)
print(eth_min_data.shape)
print(btc_min_data.shape)
# Create a combined data object with both ETH and BTC
min_data = vbt.BinanceData.merge(
    eth_min_data.rename({'fixed_ETHUSDT': 'ETH'}), 
    btc_min_data.rename({'fixed_BTCUSDT': 'BTC'}), 
    missing_index='drop')
print(min_data.shape)

In [None]:
import numpy as np
from numba import njit

@njit(nogil=True)
def psar_nb_with_next(high, low, close, af0=0.02, af_increment=0.02, max_af=0.2):
    length = len(high)
    long = np.full(length, np.nan)  # Equivalent to PSARl
    short = np.full(length, np.nan)  # Equivalent to PSARs
    af = np.full(length, np.nan)  # Equivalent to PSARaf
    reversal = np.zeros(length, dtype=np.int_)  # Equivalent to PSARr
    next_long = np.full(length, np.nan)  # Next bar's PSAR for long
    next_short = np.full(length, np.nan)  # Next bar's PSAR for short

    falling = False
    acceleration_factor = af0
    extreme_point = high[0] if falling else low[0]
    sar = low[0] if falling else high[0]

    for i in range(1, length):
        # Calculate current PSAR
        if falling:
            sar = max(sar + acceleration_factor * (extreme_point - sar), high[i-1], high[i-2] if i > 1 else high[i-1])
            if high[i] > sar:
                falling = False
                reversal[i] = 1
                sar = extreme_point
                extreme_point = high[i]
                acceleration_factor = af0
        else:
            sar = min(sar + acceleration_factor * (extreme_point - sar), low[i-1], low[i-2] if i > 1 else low[i-1])
            if low[i] < sar:
                falling = True
                reversal[i] = 1
                sar = extreme_point
                extreme_point = low[i]
                acceleration_factor = af0

        if falling:
            if low[i] < extreme_point:
                extreme_point = low[i]
                acceleration_factor = min(acceleration_factor + af_increment, max_af)
            short[i] = sar
        else:
            if high[i] > extreme_point:
                extreme_point = high[i]
                acceleration_factor = min(acceleration_factor + af_increment, max_af)
            long[i] = sar

        af[i] = acceleration_factor

        # Calculate next bar's PSAR
        if i < length - 1:
            next_sar = sar + acceleration_factor * (extreme_point - sar)
            if falling:
                next_short[i] = max(next_sar, high[i], high[i-1] if i > 0 else high[i])
            else:
                next_long[i] = min(next_sar, low[i], low[i-1] if i > 0 else low[i])

    return long, short, af, reversal, next_long, next_short

# Usage example:
eth_min = eth_min_data.get()
high_prices = eth_min['High'].values
low_prices = eth_min['Low'].values
close_prices = eth_min['Close'].values

psarl, psars, psaraf, psarr, next_psarl, next_psars = psar_nb_with_next(high_prices, low_prices, close_prices)

# Add the PSAR values to the DataFrame
eth_min['psarl'] = psarl
eth_min['psars'] = psars
eth_min['next_psarl'] = next_psarl
eth_min['next_psars'] = next_psars

date_range = slice('2021-01-03', '2021-01-03 00:30:00')
eth_min.loc[date_range]


In [None]:
# Create a vectorbt indicator factory
PSAR = vbt.IF(
    class_name = 'PSAR',
    short_name = 'psar',
    input_names = ['high', 'low', 'close'],
    param_names = ['af_0', 'af', 'max_af'],
    output_names = ['psarl', 'psars', 'psaraf', 'psarr', 'next_psarl', 'next_psars'],
).with_apply_func(
    psar_nb_with_next,
    takes_1d=True,
    af_0=0.02,
    af=0.02,
    max_af=0.2,
    param_product=True,
    )

In [None]:
vbt.phelp(PSAR.run)

Use the indicator

In [None]:

# These params could be single values or lists of values
af_0 = 0.01
af = 0.001
max_af = 0.15

psar = PSAR.run(
    eth_min_data.high, 
    eth_min_data.low, 
    eth_min_data.close, 
    af_0=af_0, 
    af=af, 
    max_af=max_af
    )
psar_df = pd.concat([eth_min_data.get(), psar.psarl, psar.psars, psar.psarr, psar.next_psarl, psar.next_psars], axis=1)

psar_df.columns = ['Open', 'High', 'Low', 'Close', 'psarl', 'psars', 'psarr', 'next_psarl', 'next_psars']

# Calculate how many times the psar reversed
psar.psarr.value_counts()

In [None]:
# This class helps us plot the PSAR indicator

class PSAR(PSAR):
    def plot(self, 
             column=None, 
             close_kwargs=None,
             psarl_kwargs=None,
             psars_kwargs=None,
             next_psarl_kwargs=None,
             next_psars_kwargs=None,
             fig=None, 
             **layout_kwargs):
        close_kwargs = close_kwargs if close_kwargs else {}
        psarl_kwargs = psarl_kwargs if psarl_kwargs else {}
        psars_kwargs = psars_kwargs if psars_kwargs else {}
        next_psarl_kwargs = next_psarl_kwargs if next_psarl_kwargs else {}
        next_psars_kwargs = next_psars_kwargs if next_psars_kwargs else {}
        
        close = self.select_col_from_obj(self.close, column).rename('Close')
        psarl = self.select_col_from_obj(self.psarl, column).rename('Long')
        psars = self.select_col_from_obj(self.psars, column).rename('Short')
        next_psarl = self.select_col_from_obj(self.next_psarl, column).rename('Next_Long')
        next_psars = self.select_col_from_obj(self.next_psars, column).rename('Next_Short')
        
        fig = close.vbt.plot(fig=fig, **close_kwargs, **layout_kwargs)
        psarl.vbt.plot(fig=fig, **psarl_kwargs)
        psars.vbt.plot(fig=fig, **psars_kwargs)
        next_psarl.vbt.plot(fig=fig, **next_psarl_kwargs)
        next_psars.vbt.plot(fig=fig, **next_psars_kwargs)
        
        return fig

In [None]:
date_range = slice('2021-01-03', '2021-01-03 00:30:00')

psar = PSAR.run(eth_min_data.high, eth_min_data.low, eth_min_data.close, af_0=0.02, af=0.02, max_af=0.2)
psar.loc[date_range].plot(
    psarl_kwargs=dict(trace_kwargs=dict(line_color='limegreen', mode='markers')),
    psars_kwargs=dict(trace_kwargs=dict(line_color='red', mode='markers')),
    next_psarl_kwargs=dict(trace_kwargs=dict(line_color='lightgreen', mode='markers', marker=dict(symbol='cross'))),
    next_psars_kwargs=dict(trace_kwargs=dict(line_color='pink', mode='markers', marker=dict(symbol='cross'))),
    ).show()

# Now lets upsample and downsample and align our psar

In [None]:
resample_period = '2h'
resampled_eth = eth_min_data.resample(resample_period)
psar = PSAR.run(resampled_eth.high, resampled_eth.low, resampled_eth.close, af_0=0.02, af=0.02, max_af=0.2)
nb_psar_df = pd.concat([psar.next_psarl, psar.next_psars], axis=1)
nb_psar_df.columns = ['psarl', 'psars']
# Reindex to match the original ETH data
nb_final_df = nb_psar_df.reindex(eth_min_data.get().index, method='ffill')
# Join the PSAR values to the original ETH data
nb_final_df = eth_min_data.get().join(nb_final_df)
# final_df.loc['2019-01-03 07:00:00':'2019-01-03 09:00:00']

# Print the final dataframe at a particular crossover point
nb_final_df.iloc[230:250]
nb_psar_df.head(50)


In [None]:
# Resample the ETH data to 2 hour candles

resample_period = '2h'
resampled_eth = eth_min_data.resample(resample_period)
psar = PSAR.run(resampled_eth.high, resampled_eth.low, resampled_eth.close, af_0=0.02, af=0.02, max_af=0.2)
psar_df = pd.concat([psar.next_psarl, psar.next_psars], axis=1)
psar_df.columns = ['psarl', 'psars']
# Reindex to match the original ETH data
final_df = psar_df.reindex(eth_min_data.get().index, method='ffill')
# Join the PSAR values to the original ETH data
final_df = eth_min_data.get().join(final_df)
# Print the final dataframe at a particular crossover point

# Create function to map the PSAR values to the lower resolution data
@njit(nogil=True)
def map_and_refine_psar_to_lower_res_nb(high, low, psarl, psars):
    # Initialize the updated PSAR arrays with original values
    updated_psarl = np.copy(psarl)
    updated_psars = np.copy(psars)
    
    # Initialize variables for tracking the last known active PSAR values and breach indices
    active_psarl, active_psars = None, None
    # Iterate through the entire series
    for i in range(len(high)):
        
        # Handle PSAR long (psarl)
        if not np.isnan(psarl[i]):
            active_psarl = psarl[i]
            updated_psarl[i] = active_psarl
        # Handle PSAR short (psars)
        if not np.isnan(psars[i]):
            active_psars = psars[i]
            updated_psars[i] = active_psars
            
        # If psarl[i] is NaN, check if the current low has breached the last known active PSAR long value if not already breached set updated_psarl[i] to active_psarl
        if np.isnan(psarl[i]) and (active_psarl is not None) and (low[i] > active_psarl):
            updated_psarl[i] = active_psarl
            # Handle the updated_psars[i] to make sure it is NaN if it has not been breached yet
            updated_psars[i] = np.nan
        if np.isnan(psars[i]) and (active_psars is not None) and (high[i] < active_psars):
            updated_psars[i] = active_psars
            # Handle the updated_psarl[i] to make sure it is NaN if it has not been breached yet
            updated_psarl[i] = np.nan
        # If psarl[i] is NaN, check if the current low has breached the last known active PSAR long value
        if np.isnan(psarl[i]) and (active_psarl is not None) and (low[i] <= active_psarl):
            # psarl_breach_index = i
            updated_psarl[i] = active_psarl
            active_psarl = None # Reset the active PSAR long value
        # If psars[i] is NaN, check if the current high has breached the last known active PSAR short value
        if np.isnan(psars[i]) and (active_psars is not None) and (high[i] >= active_psars):
            # psars_breach_index = i
            updated_psars[i] = active_psars
            active_psars = None    

    return updated_psarl, updated_psars

# Create series to pass to the function
high_prices = final_df['High'].values
low_prices = final_df['Low'].values
psarl = final_df['psarl'].values
psars = final_df['psars'].values

updated_psarl, updated_psars = map_and_refine_psar_to_lower_res_nb(high_prices, low_prices, psarl, psars)
# from Ipython.display import display
# Build a DataFrame with the updated PSAR values to compare to original psar series
refined_psar_df = pd.DataFrame({'updated_psarl': updated_psarl, 'updated_psars': updated_psars}, index=eth_min_data.get().index)
new_full_df = pd.concat([final_df, refined_psar_df], axis=1)
# display(new_full_df.iloc[1190:1210])
display(new_full_df.loc['2019-01-03 07:53:00+00:00':'2019-01-03 08:20:00+00:00'])
# pd.set_option('display.max_rows', None)
# display(new_full_df.loc['2020-03-10 10:30:00':'2020-03-15'])

In [None]:
new_psar_entries = (~new_full_df.updated_psarl.isnull()) #.vbt.signals.fshift()
new_psar_exits = (~new_full_df.updated_psars.isnull()) #.vbt.signals.fshift()
clean_entries, clean_exits = new_psar_entries.vbt.signals.clean(new_psar_exits)

pf = vbt.Portfolio.from_signals(
    # close=resampled_eth.close,
    close=new_full_df.Close,
    entries=clean_entries,
    short_entries=clean_exits,
    freq = '1min',
)
pf.stats()

In [None]:
pf.plot().show()

In [None]:
vbt.phelp(PSAR.run)

# Now let's create a pipeline and optimize

In [None]:
# Create a vectorbt indicator factory
PSAR = vbt.IF(
    class_name = 'PSAR',
    short_name = 'psar',
    input_names = ['high', 'low', 'close'],
    param_names = ['af_0', 'af', 'max_af'],
    output_names = ['psarl', 'psars', 'psaraf', 'psarr', 'next_psarl', 'next_psars'],
).with_apply_func(
    psar_nb_with_next,
    takes_1d=True,
    af_0=0.02,
    af=0.02,
    max_af=0.2,
    param_product=True,
    )

In [None]:
resample_period = '1h'
resampled_eth = eth_min_data.resample(resample_period)

# Create a vectorbt indicator factory
af0 = np.arange(0.01, 0.025, 0.001)
af = np.arange(0.01, 0.02, 0.001)
max_af = np.arange(0.05, 0.15, 0.02)
psar = PSAR.run(resampled_eth.high, resampled_eth.low, resampled_eth.close, af_0=af0, af=af, max_af=max_af)

# print(psar.psarl)

entries = (~psar.next_psarl.isnull()) #.vbt.signals.fshift()
exits = (~psar.next_psars.isnull()) #.vbt.signals.fshift()
clean_entries, clean_exits = entries.vbt.signals.clean(exits)

pf = vbt.Portfolio.from_signals(
    close=resampled_eth.close,
    open=resampled_eth.open,
    high=resampled_eth.high,
    low=resampled_eth.low,
    entries=clean_entries,
    short_entries=clean_exits,
    freq = resample_period,
)

# pf.stats(agg_func=None).sort_values('Sharpe Ratio', ascending=False)
pf.total_return.sort_values(ascending=False).head(10)


In [None]:
pf.total_return.vbt.volume().show()

In [None]:
pf[pf.total_return.idxmax()].plot().show()
print(pf[pf.total_return.idxmax()].stats())

In [None]:
resample_period = '30min'
resampled_data = min_data.resample(resample_period)

# Create a vectorbt indicator factory
af0 = np.arange(0.005, 0.03, 0.002)
af = np.arange(0.005, 0.03, 0.002)
max_af = np.arange(0.1, 0.3, 0.01)
psar = PSAR.run(
    resampled_data.high, 
    resampled_data.low, 
    resampled_data.close, af_0=af0, af=af, max_af=max_af)

# print(psar.psarl)

entries = (~psar.next_psarl.isnull()) #.vbt.signals.fshift()
exits = (~psar.next_psars.isnull()) #.vbt.signals.fshift()
clean_entries, clean_exits = entries.vbt.signals.clean(exits)

long_pf = vbt.Portfolio.from_signals(
    close   =resampled_data.close,
    open    =resampled_data.open,
    high    =resampled_data.high,
    low     =resampled_data.low,
    entries=clean_entries,
    exits=clean_exits,
    freq = resample_period,
)
short_pf = vbt.Portfolio.from_signals(
    close   =resampled_data.close,
    open    =resampled_data.open,
    high    =resampled_data.high,
    low     =resampled_data.low,
    short_entries=clean_exits,
    short_exits=clean_entries,
    freq = resample_period,
)

# pf.stats(agg_func=None).sort_values('Sharpe Ratio', ascending=False)
display(long_pf.total_return.sort_values(ascending=False).head(10))
print("Short Portfolio")
display(short_pf.total_return.sort_values(ascending=False).head(10))

If we want to filter for only BTC then we need to look at the 3rd level of the mult index call `pf.wrapper.columns` and you will see how the multindex is organized. 
`names=['psar_af_0', 'psar_af', 'psar_max_af', 'symbol']`

In [None]:
display(long_pf.total_return.xs('BTC', level=3).sort_values(ascending=False).head(5))
display(long_pf.total_return.xs('ETH', level=3).sort_values(ascending=False).head(5))
best_btc = long_pf.total_return.xs('BTC', level=3).idxmax() + ('BTC',) # Need to add back the BTC in the tuple for the index
best_eth = long_pf.total_return.xs('ETH', level=3).idxmax() + ('ETH',)

In [None]:
display(long_pf[best_btc].stats())
display(long_pf[best_eth].stats()) 

In [None]:
pf.wrapper.columns

In [None]:
long_pf.sharpe_ratio.vbt.volume(
    x_level='psar_af_0', 
    y_level='psar_af',
    z_level='psar_max_af',
    slider_level='symbol'
).show()

In [None]:
long_pf[best_eth].plot().show()
long_pf[best_eth].stats()

In [None]:
# resample_period = '30min'
# resampled_data = min_data.resample(resample_period)
# # Create a vectorbt indicator factory
# af0 = np.arange(0.005, 0.03, 0.002)
# af = np.arange(0.005, 0.03, 0.002)
# max_af = np.arange(0.2, 0.3, 0.01)
# psar = PSAR.run(
#     resampled_data.high, 
#     resampled_data.low, 
#     resampled_data.close, af_0=af0, af=af, max_af=max_af)




# psar_df = pd.concat([psar.psarl, psar.psars], axis=1)

# # Reindex to match the original ETH data
# final_df = psar_df.reindex(eth_min_data.get().index, method='ffill')
# # Join the PSAR values to the original ETH data




In [None]:
resample_period = '2h'
resampled_eth = eth_min_data.resample(resample_period)
psar = PSAR.run(resampled_eth.high, resampled_eth.low, resampled_eth.close, af_0=0.02, af=0.02, max_af=0.2)

In [None]:
high_res_data = eth_min_data
# psar.psarl.reindex(high_res_data.wrapper.index, method='ffill')

In [None]:
def resample_psar_pipeline(high_res_data, resample_period, af_0=0.02, af=0.02, max_af=0.2):
    # Resample the PSAR data to the lower resolution
    resampled_data = high_res_data.resample(resample_period)
    # psar = PSAR.run(resampled_data.high, resampled_data.low, resampled_data.close, af_0=0.02, af=0.02, max_af=0.2)
    
    psar = PSAR.run(resampled_data.high, resampled_data.low, resampled_data.close, af_0=af_0, af=af, max_af=max_af)
    
    # Create function to map the PSAR values to the lower resolution data

    # Resample back to the original resolution
    high_res_psarl = psar.next_psarl.reindex(high_res_data.wrapper.index, method='ffill')
    high_res_psars = psar.next_psars.reindex(high_res_data.wrapper.index, method='ffill')
    high = high_res_data.high.values
    low = high_res_data.low.values
    psarl = high_res_psarl.values
    psars = high_res_psars.values
    updated_psarl, updated_psars = map_and_refine_psar_to_lower_res_nb(high, low, psarl, psars)
    
    # Build a DataFrame with the updated PSAR values to compare to original psar series
    refined_psar_df = pd.DataFrame({'psarl': psarl, 'psars': psars, 'updated_psarl': updated_psarl, 'updated_psars': updated_psars}, index=high_res_data.wrapper.index)
    new_full_df = pd.concat([high_res_data.get(), refined_psar_df], axis=1)
    return new_full_df


resample_period = '2h'
new_full_df = resample_psar_pipeline(eth_min_data, resample_period, af_0=0.019, af=0.023, max_af=0.2)
entries = (~new_full_df.updated_psarl.isnull()) #.vbt.signals.bshift()
exits = (~new_full_df.updated_psars.isnull())
clean_entries, clean_exits = entries.vbt.signals.clean(exits)
resampled_eth = eth_min_data.resample(resample_period)
resampled_atr = vbt.ATR.run(resampled_eth.high, resampled_eth.low, resampled_eth.close, window=15).tr

atr = resampled_atr.reindex(new_full_df.index, method='ffill')
date_range = slice('2019', '2023')
pf = vbt.Portfolio.from_signals(
    close       =new_full_df.loc[date_range].Close,
    open        =new_full_df.loc[date_range].Open,
    high        =new_full_df.loc[date_range].High,
    low         =new_full_df.loc[date_range].Low,
    entries     =clean_entries.loc[date_range],
    short_entries       =clean_exits.loc[date_range],
    freq = '1min',
    init_cash=1000,
    # tp_stop = atr.loc[date_range] * 5/new_full_df.loc[date_range].Close,
    # sl_stop = atr.loc[date_range] * 3/new_full_df.loc[date_range].Close,
    leverage=1,
    
)

pf.resample('1d').plot(
    # settings=dict(bm_returns=False)
    ).show()
pf.stats()

In [None]:
fig = new_full_df[['Close']].loc[date_range].vbt.plot()
new_full_df[['updated_psarl', 'updated_psars']].loc[date_range].vbt.plot(fig=fig, trace_kwargs=dict(mode='markers')).show()

In [None]:
pd.set_option('display.max_columns', None)
pf.trades.records_readable.sort_values('Return', ascending=False).head(10)