In [1]:
import vectorbt as vbt
import math
import os
import sys
import numpy as np
import pandas as pd
from numba import njit
from plotly.subplots import make_subplots
from vectorbt.generic import nb as generic_nb
module_path = os.path.abspath(os.path.join('../..'))
if module_path not in sys.path:
    sys.path.append(module_path)

from lib.utils import file_to_data_frame, LR, ExtendedPortfolio, where_true_set_series, shift_np

In [None]:
#vbt.settings.caching['enabled'] = False
symbol = "ADA"
candles = "_3000"
file = f"/Users/pilo/development/itba/pf/Binance_Minute_OHLC_CSVs/{candles[1:]}/Binance_{symbol}USDT_minute{candles}.csv"
_, ohlcv = file_to_data_frame(file)

fee = 0.001
min_trades = 10
min_lr = 0.0
save_html = True
close = ohlcv["Close"]
volume = ohlcv["Volume"]
lr_ind = LR.run(close)
print(lr_ind.lr.shape)

In [3]:
@njit
def signal_calculations(lr, lr_ma, lr_mstd, vol, vol_mstd, lr_thld, vol_thld):
    lr_thld_std = lr_thld * lr_mstd
    ups = lr_ma - lr_thld_std # lr_thld < 0
    downs = lr_ma + lr_thld_std
    lr_entries = np.where(lr < downs, True, False)
    lr_exits = np.where(lr > ups, True, False)
    vol_thld_std = vol_thld * vol_mstd
    vol_entries = np.where(vol > vol_thld_std, True, False)
    return ups, downs, lr_entries, lr_exits, vol_thld_std, vol_entries

@njit
def ma_mstd(shifted_lr, shifted_volume, lag):
    lr_ma = generic_nb.rolling_mean_nb(shifted_lr, lag)
    lr_mstd = generic_nb.rolling_std_nb(shifted_lr, lag)
    vol_mstd = generic_nb.rolling_std_nb(shifted_volume, lag)
    return lr_ma, lr_mstd, vol_mstd

@njit
def signals_nb(lr, shifted_lr, vol, shifted_vol, lr_thld, vol_thld, lag):
    lr_ma, lr_mstd, vol_mstd = ma_mstd(shifted_lr, shifted_vol, lag)
    _, _, lr_entries, lr_exits, _, vol_entries = signal_calculations(lr, lr_ma, lr_mstd, vol, vol_mstd, lr_thld, vol_thld)
    final_entries = lr_entries & vol_entries
    return final_entries, lr_exits

In [None]:
ENTRY_SIGNALS = vbt.IndicatorFactory(
    input_names=['lr', 'shifted_lr', 'vol', 'shifted_vol'],
    param_names=['lr_thld', 'vol_thld', 'lag'],
    output_names=['entries','exits']
).from_apply_func(signals_nb, use_ray=True)
lr_thld = -np.linspace(0,3, 30, endpoint=True)
vol_thld = np.linspace(0,4, 30, endpoint=True)
lag = list(range(6,100, 2))
signals = ENTRY_SIGNALS.run(lr=lr_ind.lr, shifted_lr=shift_np(lr_ind.lr.to_numpy(), 1),
                            vol=volume, shifted_vol=shift_np(volume.to_numpy(), 1),
                            lr_thld=lr_thld, vol_thld=vol_thld, lag=lag,
                            param_product=True, short_name="signals")
signals.entries.head()

In [None]:
portfolio_kwargs = dict(
    direction='longonly',
    freq='m',
    size=np.inf,
    fees=fee,
)
port = ExtendedPortfolio.from_signals(close, signals.entries, signals.exits, **portfolio_kwargs, max_logs=0)

In [None]:
vol_plots = []

In [None]:
elr = port.trades.expected_log_returns(min_trades=min_trades)
vol_plots.append({
    "data": elr[elr>0],
    "title": f"AVG(log(p_exit/p_entry * (1-fee)**2)), AVG > 0, #Trades > {min_trades}"
})

In [None]:
elr = port.trades.expected_log_returns(min_trades=min_trades, min_lr=min_lr)
vol_plots.append({
    "data": elr[elr>0],
    "title": f"AVG(log(p_exit/p_entry * (1-fee)**2)), log(...) > 0, AVG > 0, #Trades > {min_trades}"
})

In [None]:
mlr = port.trades.median_log_returns(min_trades=min_trades)
vol_plots.append({
    "data": mlr[mlr>0],
    "title": f"Median(log(p_exit/p_entry * (1-fee)**2)), Median > 0, #Trades > {min_trades}"
})

In [None]:
for i in range(len(vol_plots)):
    if vol_plots[i]["data"].size:
        vol_plots[i]["data"].vbt.volume(title=vol_plots[i]["title"]).show()

In [None]:
best_indexes = entry_exit_prob_avg.nlargest(20, keep="all")

# graficamos con el mejor de los resultados de la optimización
best_idx = best_indexes[-1]
best_lr_thld, best_vol_thld, best_lag = best_idx

In [None]:
# recalculamos algunas cosas para graficar y entender lo que está pasando
ups, downs, lr_entries, lr_exits, vol_thld_std, vol_entries = signal_calculations(
    lr_ind.lr.to_numpy(),
    ma_mstd.lr_ma[best_lag].to_numpy(),
    ma_mstd.lr_mstd[best_lag].to_numpy(),
    volume.to_numpy(),
    ma_mstd.vol_mstd[best_lag].to_numpy(),
    best_lr_thld,
    best_vol_thld)

In [None]:
def plot_series_vs_scatters(series_list: list, booleans_list):
    index = None
    series = series_list.pop(0)
    fig = series.vbt.plot()
    while len(series_list):
        series = series_list.pop(0)
        if not isinstance(series, pd.Series):
            series = pd.Series(series, index=index, copy=True)
        elif index is None:
            index = series.index
        series.vbt.plot(fig=fig)
    i = 1
    for scatter in booleans_list:
        if not isinstance(scatter, pd.Series):
            scatter = pd.Series(scatter, index=index, copy=True)
        elif index is None:
            index = series.index
        scatter = where_true_set_series(series, scatter)
        scatter.name = i
        i += 1
        fig = scatter.vbt.scatterplot(fig=fig)
    return fig

entries = signals.entries[best_idx]
plot_series_vs_scatters([lr_ind.lr, downs], []).show()

In [None]:
plots = [
    plot_series_vs_scatters([lr_ind.lr, downs.T], []) #, downs, ups], [lr_entries, lr_exits]),
    #plot_series_vs_scatters([volume, vol_thld_std], [vol_entries]),
    #(entries.where(entries == True, np.nan)).vbt.scatterplot()
]

In [None]:
def add_all_subplots(fig, row, col, list):
    for a in list:
        fig.add_trace(a, row=row, col=col)

fig = make_subplots(rows=3, cols=1, shared_xaxes=True,
                    vertical_spacing=0.02)
for i in range(len(plots)):
    add_all_subplots(fig, i+1, 1, plots[i].data)

fig.update_layout(height=700, legend=dict(
    orientation="h",
    yanchor="bottom",
    y=1.02,
    xanchor="right",
    x=1
))
fig.show()

In [None]:
@njit
def double_multiplier_nb(values, x, y):
    return values*x, values*y

DOUBLE_MULTIPLIER = vbt.IndicatorFactory(
    input_names=['values'],
    param_names=['x', 'y'],
    output_names=['x_mu','y_mu']
).from_apply_func(double_multiplier_nb)
x = np.linspace(0,2,5, endpoint=True)
y = -np.linspace(0,2,5, endpoint=True)
tp_sl = DOUBLE_MULTIPLIER.run(mstd_ind.mstd, x, y, param_product=True, short_name="tp_sl")

In [None]:
tp_exits = lr_ind.lr_above(tp_sl.x_mu)
sl_exits = lr_ind.lr_below(tp_sl.y_mu)
final_exits = tp_exits.vbt | sl_exits.vbt
final_exits.columns = final_exits.columns.rename("lag", level=-1)
final_exits.head()


In [22]:
lag = 2
p = math.e ** np.array([[5],[6],[6],[3],[2],[4],[7],[8],[6],[5],[4]])
v = np.array([[5.],[6.],[6.],[3.],[2.],[4.],[7.],[8.],[6.],[5.],[4.]])
e_ln = np.log(p)
e_lr = np.array([[np.nan],[1],[0],[-3],[-1],[2],[3],[1],[-2],[-1],[-1]])
e_shifted_lr = np.array([[np.nan],[np.nan],[1],[0],[-3],[-1],[2],[3],[1],[-2], [-1]])
e_lr_ma = np.array([[np.nan], [np.nan],[np.nan],[ 1/2], [-3/2], [-2],[ 1/2],[ 5/2], [2], [-1/2], [-3/2]])
e_lr_mstd = np.array([[np.nan], [np.nan],[np.nan], [0.5], [1.5], [1. ], [1.5], [0.5], [1. ], [1.5], [0.5]])
lr = LR.run(p).lr.to_numpy()
shifted_lr = shift_np(lr, 1)
shifted_vol = shift_np(v, 1)
e_vol_mstd = np.array([[np.nan], [np.nan], [0.5], [0. ], [1.5], [0.5], [1. ], [1.5], [0.5], [1. ], [0.5]])
lr_ma, lr_mstd, vol_mstd = ma_mstd(shifted_lr, shifted_vol, 2)
assert (np.isclose(lr, e_lr, equal_nan=True).all())
assert (np.isclose(shifted_lr, e_shifted_lr, equal_nan=True).all())
assert (np.isclose(lr_ma, e_lr_ma, equal_nan=True).all())
assert (np.isclose(lr_mstd, e_lr_mstd, equal_nan=True).all())
assert (np.isclose(vol_mstd, e_vol_mstd, equal_nan=True).all())

thld = -2
e_lr_entries = e_lr < e_lr_ma + thld * e_lr_mstd
e_lr_exits = e_lr > e_lr_ma - thld * e_lr_mstd
e_vol_entries = v > -thld * vol_mstd
e_final_entries = e_lr_entries & e_vol_entries
_, _, lr_entries, lr_exits, _, vol_entries = signal_calculations(lr, lr_ma, lr_mstd, v, vol_mstd, thld, -thld)
final_entries, _ = signals_nb(lr, shifted_lr, v, shifted_vol, thld, thld, lag)
assert (np.array_equal(lr_entries, e_lr_entries))
assert (np.array_equal(lr_exits, e_lr_exits))
assert (np.array_equal(vol_entries, e_vol_entries))
assert (np.array_equal(final_entries, e_final_entries))
_portfolio_kwargs = dict(
    direction='longonly',
    size=np.inf,
    freq='m',
    fees=0.001,
)
port = ExtendedPortfolio.from_signals(p, e_final_entries, lr_exits, **_portfolio_kwargs)
e_elr = (np.log(1.714564 +1 ) + np.log(-0.982684 +1 )) / 2
assert (np.isclose(e_elr, port.trades.expected_log_returns()[0], equal_nan=True))
e_mlr = e_elr
assert (np.isclose(e_elr, port.trades.median_log_returns()[0], equal_nan=True))
e_filtered_elr = np.log(1.714564 +1 )
assert (np.isclose(e_filtered_elr, port.trades.expected_log_returns(min_lr=0)[0], equal_nan=True))