![QuantConnect Logo](https://cdn.quantconnect.com/web/i/icon.png)
<hr>

In [None]:
qb = QuantBook()
symbol = qb.add_equity("SPY").symbol
num_years = 20

history_df = qb.history(symbol, 360*num_years, Resolution.DAILY)
history_df.reset_index(level=0, inplace=True)
history_df.drop(columns=['symbol'], inplace=True)


In [None]:
frama_df = qb.indicator(FractalAdaptiveMovingAverage(30, 180), symbol, 360*num_years, Resolution.DAILY)
frama_df.tail()

momp_df = qb.indicator(MomentumPercent(20), symbol, 360*num_years, Resolution.DAILY)

In [None]:
#manually create indicators based on of VWAP 
history_tradebars = qb.history[TradeBar](symbol, 360*num_years, Resolution.DAILY)

vwap = VolumeWeightedAveragePriceIndicator(period=14)

#### Momentum of VWAP ####
momp = MomentumPercent(20)
momp_of_vwap = IndicatorExtensions.of(momp,vwap)

momp_of_vwap_window = {}
momp_of_vwap_window['time'] = RollingWindow[DateTime](20)
momp_of_vwap_window['mom_of_vwap'] = RollingWindow[float](20)

momp_of_vwap_df = pd.DataFrame(columns=["mom_of_vwap"])
    # Define an update function to add the indicator values to the RollingWindow object.
def update_vwap_vwap_window(sender: object, updated: IndicatorDataPoint) -> None:
    indicator = sender

    momp_of_vwap_window['time'].add(updated.end_time)
    momp_of_vwap_window["mom_of_vwap"].add(updated.value)
    
    momp_of_vwap_df.loc[updated.end_time] = {"mom_of_vwap": updated.value}
    
momp_of_vwap.updated += update_vwap_vwap_window

for bar in history_tradebars:
    vwap.update(bar)


In [None]:
vwap1 = VolumeWeightedAveragePriceIndicator(21)
#### fractal adaptive moving average of VWAP ####
frama = FractalAdaptiveMovingAverage(30, 180)
frama_of_vwap = IndicatorExtensions.of(frama, vwap1)

frama_of_vwap_window = {}
frama_of_vwap_window['time'] = RollingWindow[DateTime](30)
frama_of_vwap_window['frama_of_vwap'] = RollingWindow[float](30)

frama_of_vwap_df = pd.DataFrame(columns=["frama_of_vwap"])

def update_frama_vwap_window(sender: object, updated: IndicatorDataPoint) -> None:
    indicator = sender
    frama_of_vwap_window['time'].add(updated.end_time)
    frama_of_vwap_window["frama_of_vwap"].add(updated.value)
    
    # Append the new data to the DataFrame
    frama_of_vwap_df.loc[updated.end_time] = {"frama_of_vwap": updated.value}

frama_of_vwap.updated += update_frama_vwap_window

for bar in history_tradebars:
    frama.update(bar)

In [None]:
import plotly.graph_objects as go

# Create traces for momp_df and momp_of_vwap_df
fig = go.Figure()

fig.add_trace(go.Scatter(
    x=momp_df.index,
    y=momp_df['current'],
    mode='lines',
    name='Momentum Percent'
))

fig.add_trace(go.Scatter(
    x=momp_of_vwap_df.index,
    y=momp_of_vwap_df['mom_of_vwap'],
    mode='lines',
    name='Momentum of VWAP'
))

# Update layout
fig.update_layout(
    title="Momentum Percent and Momentum of VWAP",
    xaxis_title="Time",
    yaxis_title="Value",
    legend=dict(x=0, y=1),
    template="plotly_white"
)

fig.show()

In [None]:
sma = qb.indicator(ExponentialMovingAverage(14), symbol, 360*num_years, Resolution.DAILY)

In [None]:
import plotly.graph_objects as go

# Create a candlestick chart for history_df
fig = go.Figure(data=[go.Candlestick(
    x=history_df.index,  # Extract the datetime index
    open=history_df['open'],
    high=history_df['high'],
    low=history_df['low'],
    close=history_df['close'],
    name='Candlesticks'
)])

# Add a line plot for frama_df
fig.add_trace(go.Scatter(
    x=frama_df.index,
    y=frama_df['current'],
    mode='lines',
    name='FRAMA'
))

fig.add_trace(go.Scatter(
    x=frama_of_vwap_df.index,
    y=frama_of_vwap_df['frama_of_vwap'],
    mode='lines',
    name='FRAMA of VWAP'
))

fig.add_trace(go.Scatter(
    x=sma.index,
    y=sma['current'],
    mode='lines',
    name='SMA'
))

# Update layout to enable box zoom and remove the range slider
fig.update_layout(
    title="Candlesticks and FRAMA",
    xaxis_title="Time",
    yaxis_title="Price",
    template="plotly_white",
    dragmode="zoom",  # Enable box zoom
    xaxis=dict(rangeslider=dict(visible=False)),  # Disable the range slider
    width=1200,  # Set the width of the plot
    height=800   # Set the height of the plot
)

fig.show()

In [None]:
df = history_df.copy().join(frama_df, how='inner', rsuffix='_frama').join(momp_df, how='inner', rsuffix='_momp').join(frama_of_vwap_df, how='inner', rsuffix='_frama_of_vwap').join(momp_of_vwap_df, how='inner', rsuffix='_momp_of_vwap')

In [None]:
df['frama_roc1'] = (df['current'].pct_change() > 0) & (df['close'] > df['current']) & (df['current'].pct_change().pct_change() > 0)
df['frama_roc2'] = (df['current'].pct_change(3) > 0) & (df['close'] > df['current']) & (df['current'].pct_change().pct_change() > 0)
df['momp_roc1'] = df['current'].pct_change() > 0
df['momp_roc2'] = df['current'].pct_change(2) > 0
df['momp_of_vwap_roc1'] = df['mom_of_vwap'].pct_change() > 0
df['momp_of_vwap_roc2'] = df['mom_of_vwap'].pct_change(7) > 0

df['frama_momp_vwap1'] = df['frama_roc1'] & df['momp_of_vwap_roc1']
df['frama_momp_vwap2'] = df['frama_roc2'] & df['momp_of_vwap_roc2']

In [None]:
import interactive_charting as chart
fig = chart.equity_chart_with_signals(
    df,
    start_date='2004-01-01',
    end_date='2025-05-02',
    chart_type="candlestick",
    num_signals=2,
    signal_column_names=["frama_momp_vwap1", "frama_momp_vwap2"],
    marker_colors=["blue", "yellow"],
)
fig.add_trace(go.Scatter(
    x=sma.index,
    y=sma['current'],
    mode='lines',
    name='SMA'
))

fig.update_layout(
    yaxis=dict(
        scaleanchor='x',
        scaleratio=1
    )
)
fig.update_xaxes(
    rangebreaks=[
        dict(bounds=["sat", "mon"]),  # Hide weekends
        dict(bounds=[17, 9], pattern="hour")  # Hide non-trading hours
    ]
)


fig.show()

In [None]:
"""RSI, BB, ADX"""
# —––––– 1. Align your data —–––––
# Assume you have:
#   price: a DataFrame indexed by datetime, with a 'close' column
#   rsi:   a DataFrame indexed by datetime, with a 'rsi' column
#   bbdf:  a DataFrame indexed by datetime, with a 'lowerband' column
#   adx:   a DataFrame indexed by datetime, with 'adx', 'positivedirectionalindex', 'negativedirectionalindex' columns
# Make sure all data frames share the same index (inner-join):

# —––––– 2. Identify signal dates —–––––
# RSI signal when RSI < 30


# —––––– 3. Compute forward returns for horizons 1–14 —–––––
# Create columns close_t+1 … close_t+14
for h in range(14, 30):
    df[f'close_t+{h}'] = df['close'].shift(-h)

# Compute forward returns: (close_{t+h}/close_t) - 1
for h in range(14, 30):
    df[f'return_{h}d'] = df[f'close_t+{h}'] / df['close'] - 1

# —––––– 4. Aggregate by signal type —–––––
# Define signal types and their labels
signal_types = [
    ('frama_roc1', 'frama roc 1;'),
    ('frama_roc2', 'frama roc 2;'),
    ('momp_roc1', 'momp roc 1;'),
    ("momp_roc2", 'momp roc 2;'),
    ('momp_of_vwap_roc1', 'momp of vwap roc 1;'),
    ('momp_of_vwap_roc2', 'momp of vwap roc 2;'),
    ('frama_momp_vwap1', 'frama momp vwap 1;'),
    ('frama_momp_vwap2', 'frama momp vwap 2;'),
]

# Calculate average returns for each signal type
returns_series = {}
for signal_column, label in signal_types:
    returns_series[label] = (
        df.loc[df[signal_column], [f'return_{h}d' for h in range(14, 30)]]
        .mean()
        .rename(label)
    )

# Calculate win rates and sample sizes efficiently
win_rate_dfs = {}
win_rate_series = {}

for signal_column, label in signal_types:
    # Calculate signal count
    total_signals = df[signal_column].sum()
    
    win_rates = []
    sample_sizes = []
    
    for h in range(14, 30):
        # Calculate positive returns
        positive_signals = df[df[signal_column] & (df[f'return_{h}d'] > 0)].shape[0]
        win_rate = positive_signals / total_signals if total_signals > 0 else 0
        
        win_rates.append(win_rate)
        sample_sizes.append(total_signals)
    
    # Create DataFrame with win rates and sample sizes
    idx = [f'return_{h}d' for h in range(14, 30)]
    win_rate_dfs[label] = pd.DataFrame({
        'Win Rate': win_rates,
        'Sample Size': sample_sizes
    }, index=idx)
    
    # Extract just the win rates for the summary table
    win_rate_series[f'{label} Win Rate'] = win_rate_dfs[label]['Win Rate'].rename(f'{label} Win Rate')

# —––––– 5. Combine into one summary table —–––––
# Interleave return metrics with win rates for each signal type
summary_data = []
for signal_column, label in signal_types:
    summary_data.append(returns_series[label])
    summary_data.append(win_rate_series[f'{label} Win Rate'])

summary = pd.concat(summary_data, axis=1)
summary.index.name = 'Days After Signal'
summary.columns.name = 'Signal Type'

print(summary)

# Print the detailed win rate DataFrames with sample sizes (not included in summary)
for label in [label for _, label in signal_types]:
    print(f"\n{label} Statistics:")
    print(win_rate_dfs[label])