In [None]:
from itertools import combinations
from math import atan, pi

import numpy as np
import pandas as pd
import polars as pl
import vectorbt as vbt

try:
    import my_config as config
except ImportError:
    import config

selected_asset = 'spy'
selected_timeframe = '60m'
nwindows = 5
insample = 75
selected_range = [6, 40]
selected_strategy = 'MACD'
selected_direction = 'longonly'
selected_metric = 'total_return'

# Query the data from the database. Polars' cast method is much faster than using CAST within the query.
query = f'''
    SELECT
        date, open, high, low, close
    FROM
        {selected_asset}
    WHERE
        date
    BETWEEN
        '2022-1-1' AND '2022-11-1'
    ORDER BY
        date ASC
'''
df = pl.read_database(query, config.connection)
df = df.with_columns([pl.col(['open', 'high', 'low', 'close']).cast(pl.Decimal(8, 3))]).set_sorted('date')

# Aggregate the data to the selected timeframe
df = df.groupby_dynamic('date', every=selected_timeframe).agg(
    [pl.first('open'), pl.max('high'), pl.min('low'), pl.last('close')])
print(df)

In [None]:
vbt.settings.stats_builder['metrics'] = [
    'sharpe_ratio', 'total_return', 'open_trade_pnl', 'total_trades', 'total_open_trades', 'win_rate', 'avg_winning_trade', 'avg_losing_trade',
    'expectancy', 'avg_winning_trade_duration', 'avg_losing_trade_duration', 'max_gross_exposure']

# Adjusts window length based on the number of windows, providing a 75% overlap. Also used in plotting.py.
def overlap_factor(nwindows):
    factors = [.375, .5, .56, .6, .625, .64]
    if nwindows < 8:
        return factors[nwindows - 2]
    else:
        return (13 / (9 * pi)) * atan(nwindows)

# For creating a numpy.arange array with a closed interval instead of half-open.
def closed_arange(start, stop, step, dtype=None):
    array = np.arange(start, stop, step, dtype=dtype)
    if array[-1] + step <= stop:
        end_value = np.array(stop, ndmin=1, dtype=dtype)
        array = np.concatenate([array, end_value])
    return array

close = df.select(pl.col(['date', 'close'])).to_pandas()
close = close.set_index('date')

# Split the data into walk-forward windows to be looped through.
window_kwargs = dict(n=nwindows, set_lens=(insample / 100,),
                        window_len=round(len(df) / ((1 - overlap_factor(nwindows)) * nwindows)))
(in_price, in_dates), (out_price, out_dates) = close.vbt.rolling_split(**window_kwargs, plot=False)

# The portfolio calculations use a 24 hour trading day. This can effectively be corrected by inflating the time interval.
trading_day_conversion = 24 / 6.5
if selected_timeframe == '1d':
    time_interval = '1d'
else:
    time_interval = "{}{}".format(round(int(selected_timeframe[:-1]) * trading_day_conversion, 4), 'm')

pf_kwargs = dict(direction=selected_direction, freq=time_interval, init_cash=100, fees=0.000, slippage=0.000)

if selected_strategy == 'SMA Crossover':
    columns_list = ["Fast SMA Period", "Slow SMA Period"]
    parameter_values = closed_arange(selected_range[0], selected_range[1], 10, np.int16)

    def backtest_windows(price, sma_periods, all_periods=True):
        if all_periods is True:
            fast_sma, slow_sma = vbt.IndicatorFactory.from_talib('SMA').run_combs(price, sma_periods)
        else:
            fast_sma = vbt.IndicatorFactory.from_talib('SMA').run(price, sma_periods[0], per_column=True)
            slow_sma = vbt.IndicatorFactory.from_talib('SMA').run(price, sma_periods[1], per_column=True)
        entries = fast_sma.real_crossed_above(slow_sma.real)
        exits = fast_sma.real_crossed_below(slow_sma.real)
        return vbt.Portfolio.from_signals(price, entries, exits, **pf_kwargs)

elif selected_strategy == 'EMA Crossover':
    columns_list = ["Fast EMA Period", "Slow EMA Period"]
    parameter_values = closed_arange(selected_range[0], selected_range[1], 10, np.int16)

    def backtest_windows(price, ema_periods, all_periods=True):
        if all_periods is True:
            fast_ema, slow_ema = vbt.IndicatorFactory.from_talib('EMA').run_combs(price, ema_periods)
        else:
            fast_ema = vbt.IndicatorFactory.from_talib('EMA').run(price, ema_periods[0], per_column=True)
            slow_ema = vbt.IndicatorFactory.from_talib('EMA').run(price, ema_periods[1], per_column=True)
        entries = fast_ema.real_crossed_above(slow_ema.real)
        exits = fast_ema.real_crossed_below(slow_ema.real)
        return vbt.Portfolio.from_signals(price, entries, exits, **pf_kwargs)

elif selected_strategy == 'MACD':
    columns_list = ["Fast EMA Period", "Slow EMA Period"]
    raw_parameter_values = closed_arange(selected_range[0], selected_range[1], 2, np.int16)

    # Generate all entry and exit combinations with entry value < exit value by default and splitting them into seperate lists.
    # For the crossover strategies this was already done for us by the .run_combs function for the period parameters.
    parameter_combinations = list(combinations(raw_parameter_values, 2))
    parameter_values_entries = [parameter_combinations[i][0] for i in range(len(parameter_combinations))]
    parameter_values_exits = [parameter_combinations[i][1] for i in range(len(parameter_combinations))]
    parameter_values = [parameter_values_entries, parameter_values_exits]

    def backtest_windows(price, parameter_values, all_periods=True):
        if all_periods is True:
            macd = vbt.IndicatorFactory.from_talib('MACD').run(price, parameter_values[0], parameter_values[1])
        else:
            macd = vbt.IndicatorFactory.from_talib('MACD').run(price, parameter_values[0], parameter_values[1], per_column=True)
        entries = macd.macd_crossed_above(macd.macdsignal)
        exits = macd.macd_crossed_below(macd.macdsignal)
        return vbt.Portfolio.from_signals(price, entries, exits, **pf_kwargs)

elif selected_strategy == 'RSI':
    columns_list = ["RSI Entry Value", "RSI Exit Value"]
    raw_parameter_values = closed_arange(selected_range[0], selected_range[1], 2, np.int16)

    # Generate all entry and exit combinations with entry value < exit value by default and splitting them into seperate lists.
    # For the crossover strategies this was already done for us by the .run_combs function for the period parameters.
    parameter_combinations = list(combinations(raw_parameter_values, 2))
    parameter_values_entries = [parameter_combinations[i][0] for i in range(len(parameter_combinations))]
    parameter_values_exits = [parameter_combinations[i][1] for i in range(len(parameter_combinations))]
    parameter_values = [parameter_values_entries, parameter_values_exits]

    def backtest_windows(price, entry_exit_values, dummy_variable=None):
        rsi = vbt.IndicatorFactory.from_talib('RSI').run(price, 14)
        entries = rsi.real_crossed_below(entry_exit_values[0])
        exits = rsi.real_crossed_above(entry_exit_values[1])
        return vbt.Portfolio.from_signals(price, entries, exits, **pf_kwargs)

In [None]:
# A function to group by window index so we can properly access the various metrics for each window.
def get_optimal_parameters(accessed_metric):
    if selected_metric == 'max_drawdown':
        indexed_parameters = accessed_metric[accessed_metric.groupby("split_idx").idxmin()].index
    else:
        indexed_parameters = accessed_metric[accessed_metric.groupby("split_idx").idxmax()].index

    first_parameters = indexed_parameters.get_level_values(indexed_parameters.names[0]).to_numpy()
    second_parameters = indexed_parameters.get_level_values(indexed_parameters.names[1]).to_numpy()
    return np.array([first_parameters, second_parameters])

# Testing all parameter combinations on the in-sample windows and getting optimal parameters.
# The deep.getattr function is used to provide the portfolio metric to optimize on in a variable way.
pf_insample = backtest_windows(in_price, parameter_values)
outsample_parameters = get_optimal_parameters(pf_insample.deep_getattr(selected_metric))

# Testing the extracted parameters on the out-of-sample windows.
pf_outsample = backtest_windows(out_price, outsample_parameters, False)

# Testing all parameter combinations on the out-of-sample windows for comparison purposes.
pf_outsample_optimized = backtest_windows(out_price, parameter_values)
optimal_parameters = get_optimal_parameters(pf_outsample_optimized.deep_getattr(selected_metric))

# Creating DataFrames using portfolio.stats() to show the backtest results and formatting them using Pandas methods.
# The stats DataFrames can also be created with the desired format directly by passing dictionaries to portfolio.stats().
window_number = pd.DataFrame(np.arange(1, nwindows + 1), columns=["Window"], dtype=np.int16)

outsample_parameters_df = pd.DataFrame(outsample_parameters.transpose(), columns=columns_list, dtype=np.int16)
outsample_metrics = ['total_return', 'sharpe_ratio', 'max_dd', 'benchmark_return', 'open_trade_pnl', 'total_trades',
                        'total_open_trades', 'win_rate', 'avg_winning_trade', 'avg_losing_trade', 'expectancy', 'max_gross_exposure']
outsample_df = pf_outsample.stats(metrics=outsample_metrics, agg_func=None).reset_index(drop=True)
outsample_df = pd.concat([window_number, outsample_parameters_df, outsample_df], axis=1).round(2)
outsample_df = outsample_df.rename(columns={"Total Return [%]": "Return", "Total Open Trades": "Open Trades",
                                            "Max Gross Exposure [%]": "Exposure", "Max Drawdown [%]": "Max Drawdown",
                                            "Win Rate [%]": "Win Rate", "Avg Winning Trade [%]": "Avg Winning Trade",
                                            "Avg Losing Trade [%]": "Avg Losing Trade", "Benchmark Return [%]": "Benchmark Return"})

optimal_parameters_df = pd.DataFrame(optimal_parameters.transpose(), columns=columns_list, dtype=np.int16)
optimal_returns = (pf_outsample_optimized.total_return() * 100).groupby("split_idx").max().rename("Return")
optimal_sharpe_ratio = (pf_outsample_optimized.sharpe_ratio()).groupby("split_idx").max().rename("Sharpe Ratio")
optimal_maxdrawdowns = (pf_outsample_optimized.max_drawdown() * -100).groupby("split_idx").min().rename("Max Drawdown")
average_returns = (pf_outsample_optimized.total_return() * 100).groupby("split_idx").mean().rename("Average Returns")
optimal_df = pd.concat([window_number, optimal_parameters_df, optimal_returns, optimal_sharpe_ratio, optimal_maxdrawdowns, average_returns], axis=1).round(2)

display(outsample_df)
display(optimal_df)

In [None]:
display(pf_outsample.stats(agg_func=None))

In [None]:
import talib

macd, signal, histogram = talib.MACD(in_price[1])
macd_plot = signal.vbt.plot(trace_kwargs=dict(name="MACD", line=dict(color='#8332c6'))).show()

macd, signal, histogram = talib.MACD(in_price[1])

macd.plot(backend='plotly').show()
signal.plot(backend='plotly').show()
histogram.plot(backend='plotly').show()

help(macd.run)

In [None]:
plottable_parameters = list(zip(outsample_parameters[0],outsample_parameters[1]))
parameters_df = pd.DataFrame(plottable_parameters, columns=columns_list)

parameters_plot = parameters_df.plot(backend='plotly', kind='line')
parameters_plot.update_layout(title='Optimized Parameters', xaxis_title='Window Number', yaxis_title='Period',
                         height=300, width=None, xaxis=dict(gridcolor='#191919', tickmode='array', tickvals=parameters_df.index.tolist()), 
                         yaxis=dict(linecolor='#191919', gridcolor='#191919'), plot_bgcolor='#2b2b2b', paper_bgcolor='#2b2b2b', 
                         font_color='#cbcbcb', margin=dict(t=50, b=30, l=65, r=30), legend_title_text=None, 
                         legend=dict(orientation='h', yanchor='bottom', y=1.1, xanchor='right', x=1, bgcolor='#2b2b2b'))

In [None]:
import talib

dashboard = pf_outsample.plots(
    subplots=['trades', 'net_exposure', 'cum_returns', 'drawdowns'],
    subplot_settings=dict(
        trades=dict(
            yaxis_kwargs=dict(
                title="Asset Price"
            )
        ),
        cum_returns=dict(
            title="Strategy Return vs S&P 500",
            yaxis_kwargs=dict(
                title="Return (%)"
            )
        ),
        drawdowns=dict(
            top_n=2,
            yaxis_kwargs=dict(
                title="Portfolio Value"
            )
        ),
        net_exposure=dict(
            title="RSI",
            yaxis_kwargs=dict(
                title="RSI Value"
            ),
            trace_kwargs=dict(
                opacity=0
            )
        )
    )
)

dashboard.update_layout(plot_bgcolor='#2b2b2b', paper_bgcolor='#2b2b2b', height=1200, width=None,
                        xaxis=dict(gridcolor='#191919'), yaxis=dict(gridcolor='#191919'))

rsi = talib.RSI(out_price[i], 14)
rsi_plot = rsi.vbt.plot(trace_kwargs=dict(name="RSI", line=dict(color='#8332c6')),
                        add_trace_kwargs=dict(row=2, col=1), fig=dashboard)

dashboard.show()

In [None]:
my_metrics = list(pf_insample.metrics.items())
print(my_metrics)

In [None]:
# outsample_portfolios[0].save('my_pf')
# pf = vbt.Portfolio.load('my_pf')

# pickled_pf = outsample_portfolios[3].dumps()
# unpickled_pf = vbt.Portfolio.loads(pickled_pf)

In [None]:
# accessor_returns = pf_insample.returns().vbt.returns(freq=time_interval)
# grouped_returns = accessor_returns.total()[accessor_returns.total().groupby('split_idx').idxmax()].index
# print(grouped_returns)
# print(accessor_returns.total()[accessor_returns.total().groupby('split_idx').idxmax()].index)
# print(returnsss.total().idxmax())
# print(returnsss.sharpe_ratio().idxmin())

In [None]:
# print(pf_insample.orders.records)
# print(pf_insample.trades.records)
# print(type(pf_insample.total_return().idxmax()))
# print(pf_insample.stats())
# print(pf_insample.total_return().idxmax())