In [8]:
import yaml

import numpy as np
import pandas as pd
import vectorbt as vbt

with open("alpaca_api.yaml", 'r') as stream:
    alpaca_api = yaml.safe_load(stream)

vbt.settings.data['alpaca']['key_id'] = alpaca_api["key"]
vbt.settings.data['alpaca']['secret_key'] = alpaca_api["secret_key"]

vbt.settings.plotting['layout']['template'] = "vbt_dark"
vbt.settings.plotting['layout']['width'] = 1200
vbt.settings.plotting['layout']['height'] = 500

vbt.settings.array_wrapper['freq'] = "5m"

vbt.settings.portfolio['size_granularity'] = 1
vbt.settings.portfolio['init_cash'] = 10000

data = vbt.AlpacaData.download('SPY', start='2024-05-06', end='2024-07-27', timeframe='5m', limit=10000)
price = data.get('Close')

In [9]:
price.tail()

timestamp
2024-07-24 09:45:00+00:00    549.79
2024-07-24 09:50:00+00:00    549.79
2024-07-24 09:55:00+00:00    549.94
2024-07-24 10:00:00+00:00    549.95
2024-07-24 10:05:00+00:00    550.05
Name: Close, dtype: float64

In [None]:
# plot the price
figure = price.vbt.plot(trace_names=['Price'])
figure.show()

In [None]:
from scipy.stats import siegelslopes


def rmv_slope(prices, n):
    """Calculate the Repeated Median Velocity (RMV) for a series of prices."""
    if len(prices) < n:
        return np.nan
    
    x = np.arange(n)
    y = prices[-n:]
    slope, _ = siegelslopes(y, x)
    return slope

In [None]:
def rmv(close, n):
    signal = np.full(close.shape, np.nan)
    
    if len(close) < n:
        return signal
    
    for i in range(n - 1, len(close)):
        signal[i] = rmv_slope(close[i - n + 1:i + 1], n)
    
    # Multiply RMV by xmult*sqrt(N) to normalize stdev
    return signal * (np.sqrt(n) * 4.00512)

In [None]:
my_indicator = vbt.IndicatorFactory(
         class_name="RMV",
         short_name="rmv",
         input_names=["close"],
         param_names=["n"],
         output_names=["signal"]
         ).from_apply_func(
             rmv,
             n=20,
         )


In [None]:
# Find xmult to normalize stdev for all values of n
def Average(lst): 
    return sum(lst) / len(lst)

std_list = []
for n in range(4, 25, 1):
    rmv = my_indicator.run(price, n)
    std_list.append(1 / np.std(rmv.signal))
    
print(f"The xmult value is: {Average(std_list):.5f}")

In [None]:
results = my_indicator.run(price, 20)
results.signal

In [None]:
market_open = pd.Timestamp('10:00').time()    
market_close = pd.Timestamp('15:55').time()

sig = results.signal

entries = pd.Series(False, index=sig.index, name='Close')
for i in range(len(sig)):
    if sig.iloc[i] > 2.4 and (sig.index[i].time() >= market_open) and (sig.index[i].time() < market_close):
        entries.iloc[i] = True
    else:
        entries.iloc[i] = False
        
        
exits = pd.Series(False, index=sig.index, name='Close')
for i in range(len(sig)):
    if (sig.index[i].time() >= market_close):
        exits.iloc[i] = True
    else:
        exits.iloc[i] = False
        
        
short_entry = pd.Series(False, index=sig.index, name='Close')
for i in range(len(sig)):
    if sig.iloc[i] < -2.7 and (sig.index[i].time() >= market_open) and (sig.index[i].time() < market_close):
        short_entry.iloc[i] = True
    else:
        short_entry.iloc[i] = False


short_exit = pd.Series(False, index=sig.index, name='Close')
for i in range(len(sig)):
    if (sig.index[i].time() >= market_close):
        short_exit.iloc[i] = True
    else:
        short_exit.iloc[i] = False

In [None]:
pf = vbt.Portfolio.from_signals(
    price,
    entries=entries,
    exits=exits,
    short_entries=short_entry,
    short_exits=short_exit,
    freq="5m",
    size=50,
    )

pf.stats()

In [None]:
pf.plot()

In [None]:
rmv = my_indicator.run(price, 20)

In [None]:
def plot_strategy(price: pd.Series, rmv: pd.Series, pf: vbt.portfolio.base.Portfolio, show_legend: bool = True):

    kwargs1 = {"title_text" : "Closing Price and RMV", 
               "title_font_size" : 18,
               "height" : 960,
               "legend" : dict(yanchor="top",y=0.99, xanchor="left",x= 0.1)}
    
    fig = vbt.make_subplots(rows=2,cols=1, shared_xaxes=True, vertical_spacing=0.1)
    ## Filter Data according to date slice

    ## Plot Figures
    price.vbt.plot(add_trace_kwargs=dict(row=1, col=1),  fig=fig, **kwargs1) ## Without Range Slider
    rmv.signal.vbt.plot(add_trace_kwargs=dict(row=2, col=1), trace_kwargs = dict(connectgaps=True), fig=fig) 

    fig = fig.add_hline(y=2.4,
                 line_color="#858480",
                 row = 2,
                 col = 1,
                 line_width = 4)
    
    fig = fig.add_hline(y=-2.7,
                 line_color="#858480",
                 row = 2,
                 col = 1,
                 line_width = 4)
    
    ## Plots Long Entries / Exits and Short Entries / Exits
    pf.plot_trades(add_trace_kwargs=dict(row=1, col=1), fig=fig)
    pf.plot_positions(add_trace_kwargs=dict(row=1, col=1), fig=fig)

    fig.layout.showlegend = show_legend  
    
    return fig

In [None]:
fig = plot_strategy(price, rmv, pf, show_legend = True)

fig.show()

### Walk-Forward Optimization

In [None]:
price.vbt.rolling_split(
    n=5,
    window_len=5730,
    set_lens=(955,),
    left_to_right=False,
    plot=True,
    trace_names=['In Sample', 'Out of Sample'],
)

In [None]:
(in_price, in_indexes), (out_price, out_indexes) = price.vbt.rolling_split(
    n=5,
    window_len=5730,
    set_lens=(955,),
    left_to_right=False,
)

In [None]:
import pandas as pd
import numpy as np
from datetime import timedelta

def split_stock_data(price_series, n_splits=5):
    """
    Split a pandas Series with 5-minute interval stock prices into N rolling windows,
    each with 30 days of training data and 5 days of testing data, moving forward by 5 days for each window.
    Weekends are excluded from both training and testing periods.
    
    Args:
    price_series (pd.Series): A pandas Series with DatetimeIndex and stock prices as values.
    n_splits (int): Number of splits to create.
    
    Returns:
    tuple: ((in_price, in_indexes), (out_price, out_indexes))
        in_price, out_price: pandas DataFrames with split data
        in_indexes, out_indexes: lists of DatetimeIndex for each split
    """
    
    # Ensure the index is DatetimeIndex and sorted
    price_series = price_series.sort_index()
    
    # Function to check if a date is a weekday
    is_weekday = lambda date: date.weekday() < 5
    
    # Initialize lists to store results
    in_data, out_data = [], []
    in_indexes, out_indexes = [], []
    
    # Calculate the start date for the first split
    start_date = price_series.index[0]
    
    for i in range(n_splits):
        # Find the end of the training period (30 weekdays from start_date)
        train_end = start_date
        train_days = 0
        while train_days < 30:
            train_end += timedelta(days=1)
            if is_weekday(train_end):
                train_days += 1
        
        # Find the end of the testing period (5 weekdays from train_end)
        test_end = train_end
        test_days = 0
        while test_days < 5:
            test_end += timedelta(days=1)
            if is_weekday(test_end):
                test_days += 1
        
        # Extract the training and testing data
        train_data = price_series[(price_series.index >= start_date) & (price_series.index < train_end)]
        test_data = price_series[(price_series.index >= train_end) & (price_series.index < test_end)]
        
        # Append to result lists
        in_data.append(train_data)
        out_data.append(test_data)
        in_indexes.append(train_data.index)
        out_indexes.append(test_data.index)
        
        # Move the start date forward by 5 weekdays for the next split
        for _ in range(5):
            start_date += timedelta(days=1)
            while not is_weekday(start_date):
                start_date += timedelta(days=1)
    
    # Convert lists of Series to DataFrames
    in_price = pd.concat(in_data, axis=1)
    in_price.columns = [f'split_{i}' for i in range(n_splits)]
    
    out_price = pd.concat(out_data, axis=1)
    out_price.columns = [f'split_{i}' for i in range(n_splits)]
    
    return (in_price, in_indexes), (out_price, out_indexes)

In [None]:
(in_price, in_indexes), (out_price, out_indexes) = split_stock_data(price, 5)

In [None]:
import plotly.graph_objects as go
import pandas as pd
from datetime import datetime

# Prepare the data for plotting
def prepare_traces(indexes, color, name):
    x = []
    y = []
    
    for i, idx in enumerate(indexes):
        x.extend([idx[0], idx[-1], None])  # Start, end, and None for gaps between bars
        y.extend([i, i, None])             # Same y position to make the bars horizontal
        start = idx[0]
        end = idx[-1]
        
    return go.Scatter(
        x=x,
        y=y,
        mode='lines',
        name=name,
        line=dict(color=color, width=15),  # Width of lines to resemble bars
        hoverinfo='text',
        hovertext=f'Window {i+1}<br>Training: {start} to {end}<br>Duration: {(end - start).days + 1} days',
        connectgaps=False  # Ensures there are gaps between the periods
    )

# Create the in-sample and out-of-sample traces
in_sample_trace = prepare_traces(in_indexes, 'royalblue', 'In Sample')
out_sample_trace = prepare_traces(out_indexes, 'orange', 'Out of Sample')

# Create the layout
layout = go.Layout(
    title='In-Sample and Out-of-Sample Periods',
    xaxis=dict(title='Date'),
    yaxis=dict(title='Window Number', tickvals=list(range(len(in_indexes))), ticktext=[f'Window {i+1}' for i in range(len(in_indexes))]),
    legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1),
    plot_bgcolor='rgba(0,0,0,0)',
    template="plotly_dark"
)

# Create the figure
fig = go.Figure(data=[in_sample_trace, out_sample_trace], layout=layout)

# Show the figure
fig.show()

In [None]:
import plotly.graph_objects as go
import pandas as pd
import numpy as np

# Combine all unique timestamps
all_timestamps = sorted(set(np.concatenate([np.concatenate([index.to_numpy() for index in in_indexes]), 
                                             np.concatenate([index.to_numpy() for index in out_indexes])])))

# Create a grid for the heatmap
heatmap_data_in_sample = []
heatmap_data_out_sample = []

for i, (in_idx, out_idx) in enumerate(zip(in_indexes, out_indexes)):
    row_data_in = []
    row_data_out = []
    for timestamp in all_timestamps:
        if timestamp in in_idx:
            row_data_in.append(1)  # 1 for In-Sample
            row_data_out.append(np.nan)
        elif timestamp in out_idx:
            row_data_out.append(2)  # 2 for Out-of-Sample
            row_data_in.append(np.nan)
        else:
            row_data_in.append(np.nan)
            row_data_out.append(np.nan)
    heatmap_data_in_sample.append(row_data_in)
    heatmap_data_out_sample.append(row_data_out)

# Prepare data for the heatmap
heatmap_data_in_sample = np.array(heatmap_data_in_sample)
heatmap_data_out_sample = np.array(heatmap_data_out_sample)

# Create the heatmap for in-sample periods
heatmap_in_sample = go.Heatmap(
    z=heatmap_data_in_sample,
    x=all_timestamps,
    y=[f'Window {i+1}' for i in range(len(in_indexes))],
    colorscale=[[0, 'royalblue'], [1, 'royalblue']],
    showscale=False,
    hovertemplate='Date: %{x}<br>Window: %{y}<br>In Sample<extra></extra>',
    showlegend=True,
    name='In Sample',
)

# Create the heatmap for out-of-sample periods
heatmap_out_sample = go.Heatmap(
    z=heatmap_data_out_sample,
    x=all_timestamps,
    y=[f'Window {i+1}' for i in range(len(out_indexes))],
    colorscale=[[0, 'orange'], [1, 'orange']],
    showscale=False,
    hovertemplate='Date: %{x}<br>Window: %{y}<br>Out of Sample<extra></extra>',
    showlegend=True,
    name="Out of Sample",
)

# Create the layout
layout = go.Layout(
    title='In-Sample and Out-of-Sample Periods',
    xaxis=dict(title='Date', tickformat='%Y-%m-%d', type='date'),
    yaxis=dict(title='Window Number'),
    plot_bgcolor='rgba(0,0,0,0)',
    template="plotly_dark",
    legend=dict(
        traceorder="normal",
        itemsizing='constant',
        font=dict(
            size=12,
            color="white"
        ),
        orientation="h",
        x=0.79,
        y=1.2,
    ),
)

# Create the figure
fig = go.Figure(data=[heatmap_in_sample, heatmap_out_sample], layout=layout)

# Show the figure
fig.show()