## 🚀 Momentum Stock Screener Framework

### 📥 Setups

Installation and import of required packages

In [2008]:
# !pip install -r requirements.txt

In [2009]:
import yfinance as yf
import backtrader as bt

import pandas as pd
import numpy as np
from scipy.stats import ttest_ind, gaussian_kde

import os

import plotly.graph_objects as go
from plotly.subplots import make_subplots

Initialization of analysis parameters

In [2010]:
TICKER: str = 'Nasdaq-100'
TABLE_NUM: int = 4  # For webscraping data
TABLE_COL: str = 'Ticker' # For webscraping data

START_DATE: str = '2019-01-01'
END_DATE: str = '2025-08-08'

ENTRY_THRESHOLD: int = 10  # Score threshold for entering long, scaled from 0 to 12

### 🏗️ Data Acquisition

In [2011]:
def fetch_stock_data(index: str=TICKER, table_num: int=TABLE_NUM, table_col: str=TABLE_COL, start_date: str=START_DATE, end_date: str=END_DATE) -> pd.DataFrame:
    """
    Fetch historical stock data for all tickers in the index, both saved as .csv and returned as a dictionary of DataFrames.

    Parameters:
        index (str): Wikipedia page name for the index (Default to TICKER).
        table_num (int): Wikipedia table number for the index (Default to TABLE_NUM).
        table_col (str): Wikipedia table column for the index (Default to TABLE_COL).
        start_date (str): Start date in 'YYYY-MM-DD' format (Default to START_DATE).
        end_date (str): End date in 'YYYY-MM-DD' format (Default to END_DATE).

    Returns:
        pd.DataFrame: OHLCV data for all tickers in the index.
    """
    url = f"https://en.wikipedia.org/wiki/{index}"

    try:
        ttable = pd.read_html(url)
    except Exception as e:
        raise ConnectionError(f"Failed to fetch tables from {url}: {e}")

    if table_num >= len(ttable) or table_col not in ttable[table_num].columns:
        raise ValueError(f"No table with recognizable ticker column found in {url}.")
    
    tickers = ttable[table_num][table_col].tolist()

    os.makedirs("data", exist_ok=True)
    data_dict = {}
    
    try:
        data = yf.download(tickers, start=start_date, end=end_date, auto_adjust=True)
    except Exception as e:
        print(f"Error downloading")
    
    for ticker in tickers:
        df = data.xs(ticker, axis=1, level=1)
        df.columns = ['Close', 'High', 'Low', 'Open', 'Volume']

        if df.empty:
            print(f"Warning: No data for {ticker}")
        else:
            data_dict[ticker] = df

            file_path = os.path.join("data", f"{ticker}.csv")
            df.to_csv(file_path)

    return data_dict

### 📊 Stock Analysis

In [2012]:
def check_breakout(price: float, bb_upper: float) -> int:
    """
    """
    return int(price >= bb_upper)

def check_uptrend(price: float, short_ema: float, long_ema: float, short_ema_prev: float,  adx: float, adx_prev: float, adx_entry_threshold: float=23) -> int:
    """
    """
    return int(price >= short_ema) + int(price >= long_ema) + int(short_ema >= long_ema) + int(short_ema > short_ema_prev) + int(adx >= adx_entry_threshold) + int(adx >= adx_prev)

def check_momentum(rsi: float, macd: float, macd_hist: float, rsi_entry_threshold: int=55) -> int:
    """
    """
    return int(rsi >= rsi_entry_threshold) + int(macd_hist >= 0) + int(macd > 0)

def check_volume(vroc: float, vroc_prev: float, entry_threshold: float=0.38) -> int:
    """
    """
    return int(vroc > vroc_prev) + int(vroc >= entry_threshold)

def check_late(
    rsi: float, rsi_prev: float, rsi_lookback: list,
    macd_hist: float, macd_hist_prev: float, macd_lookback: float,
    rsi_late_threshold: int = 73, flat_rsi_std_threshold: float = 0.8, macd_slope_threshold: float = 0.08) -> bool:
    """
    """
    # RSI plateau detection via standard deviation
    rsi_std = np.std(rsi_lookback)
    rsi_flat = rsi_std < flat_rsi_std_threshold

    # MACD slope flattening
    macd_slope = np.polyfit(np.arange(len(macd_lookback)) , macd_lookback, 1)[0]
    macd_flat = abs(macd_slope) < macd_slope_threshold

    return (
        rsi < rsi_prev or
        rsi >= rsi_late_threshold or
        macd_hist < macd_hist_prev or
        rsi_flat or macd_flat
    )

In [2013]:
def check_ma_bull(price: float, short_ema: float, long_ema: float, short_ema_lookback: list[float], long_ema_lookback: list[float]):
    if price >= short_ema and price >= long_ema:
        for i in range (len(short_ema_lookback)):
            if short_ema_lookback[i] < long_ema_lookback[i]:
                return 1
        
        return 0
    
    return -1

def check_rsi_bull(rsi_lookback: list[float]):
    for rsi in rsi_lookback:
        if rsi < 35:
            return 0
        
    return -1

def check_macd_bull(macd_hist: float, macd: float):
    if macd_hist >= 0:
        if macd < 0:
            return 1
        
        return 0

    return -1

In [2014]:
class MomentumStrategy(bt.Strategy):
    def __init__(self, entry_score_threshold):
        self.bbands = bt.ind.BollingerBands(self.data.close, period=20, devfactor=1.5)

        self.ema_20 = bt.ind.EMA(self.data, period=20)
        self.ema_50 = bt.ind.EMA(self.data, period=50)
        self.macd = bt.ind.MACD(self.data.close)
        self.adx = bt.ind.ADX(self.data)

        self.rsi = bt.ind.RSI(period=14)

        self.vroc = bt.indicators.RateOfChange(self.data.volume, period=14)

        self.entry_score_threshold = entry_score_threshold

        # Collect indicator values
        self.log = []

    def next(self):
        signal = 0  # +1 for long, 0 for hold
        price = self.datas[0].close[0]

        # Entry indicators only
        breakout = check_breakout(price, self.bbands.top[0])
        uptrend = check_uptrend(price, self.ema_20[0], self.ema_50[0], self.ema_20[-1], self.adx[0], self.adx[-1])
        momentum = check_momentum(self.rsi[0], self.rsi[-1], self.macd.macd[0], self.macd.macd[0] - self.macd.signal[0])
        volume = check_volume(self.vroc[0], self.vroc[-1])
    
        entry_score = breakout + uptrend + momentum + volume

        is_late = check_late(self.rsi[0], self.rsi[-1], [self.rsi[-i] for i in range(15, -1, -1)],
                             self.macd.macd[0] - self.macd.signal[0], self.macd.macd[-1] - self.macd.signal[-1],
                             [self.macd.macd[-i] for i in range(15, -1, -1)])
        
        ma_signal = check_ma_bull(price, self.ema_20[0], self.ema_50[0], [self.ema_20[-i] for i in range(8, 0, -1)], [self.ema_50[-i] for i in range(8, 0, -1)])
        rsi_signal = check_rsi_bull([self.rsi[-i] for i in range(8, -1, -1)])
        macd_signal = check_macd_bull(self.macd.macd[0] - self.macd.signal[0], self.macd.macd[0])

        # # Entry only
        # if ma_signal == 1 or macd_signal == 1 or ((ma_signal == 0 or macd_signal == 0) and rsi_signal == 0):
        #     signal = 1

        # Entry only
        if ma_signal == 1 or macd_signal == 1 or (not is_late and entry_score >= self.entry_score_threshold):
            signal = 1

        # Store indicators and signal only
        self.log.append({
            'date': self.datas[0].datetime.date(0),
            'close': price,

            'bb_upper': self.bbands.top[0],
            'bb_middle': self.bbands.mid[0],
            'bb_lower': self.bbands.bot[0],

            'ema_20': self.ema_20[0],
            'ema_50': self.ema_50[0],
            'macd': self.macd.macd[0],
            'macd_signal': self.macd.signal[0],
            'macd_hist': self.macd.macd[0] - self.macd.signal[0],
            'adx': self.adx[0],

            'rsi': self.rsi[0],
            'vroc': self.vroc[0],
            
            'signal': signal
        })

In [2015]:
def get_signal_log(data_path: str, bt_engine: bt.cerebro) -> pd.DataFrame:
    """
    """
    data = bt.feeds.GenericCSVData(  # Load data from CSV
        dataname=data_path,
        dtformat='%Y-%m-%d',
        timeframe=bt.TimeFrame.Days,
        compression=1,
        openinterest=-1,
        headers=True
    )

    # Add data
    bt_engine.adddata(data)

    # Run backtest
    results = bt_engine.run()
    strat = results[0]

    signal_log = pd.DataFrame(strat.log)  # signal_log.to_csv("signal_log.csv", index=False)

    return signal_log

### 📝 Backtesting

In [2016]:
def calculate_returns(signal_log: pd.DataFrame, hold_period: list[int]=[2, 5, 10, 20, 50]) -> pd.DataFrame:
    """
    """
    signal_log.set_index('date', inplace=True)
    for p in hold_period:
        signal_log[f'return_{p}d'] = signal_log['close'].shift(-p) / signal_log['close'] - 1

    return signal_log

def backtest_returns(signal_log: pd.DataFrame) -> pd.DataFrame:
    """
    """
    return_stats = []
    signal_df = signal_log[signal_log['signal'] == 1]
    random_df = signal_log.sample(len(signal_df), random_state=42)

    for c in signal_log.columns:
        if 'return' not in c:
            continue
   
        avg_return = round(float(signal_df[c].mean()), 3)
        win_rate = round(float((signal_df[c] > 0).mean()), 3)

        random_avg_return = round(float(random_df[c].mean()), 3)
        random_win_rate = round(float((random_df[c] > 0).mean()), 3)

        return_stats.append({
            'hold_period': int(''.join(filter(str.isdigit, c))),
            'avg_return': avg_return,
            'win_rate': win_rate,
            'random_avg_return': random_avg_return,
            'random_win_rate': random_win_rate,
            'n_signals': len(signal_df)
        })

    result = pd.DataFrame(return_stats)
    result.set_index('hold_period', inplace=True)

    return result

def stat_test(signal_log: pd.DataFrame) -> pd.DataFrame:
    """
    """
    return_stats = []
    signal_df = signal_log[signal_log['signal'] == 1]
    random_df = signal_log.sample(len(signal_df), random_state=42)

    for c in signal_log.columns:
        if 'return' not in c:
            continue

        t_stat, p_val = ttest_ind(
            signal_df[c].dropna(),
            random_df[c].dropna(),
            equal_var=False
        )

        return_stats.append({
            'hold_period': int(''.join(filter(str.isdigit, c))),
            't_stat': round(t_stat, 2),
            'p_val': round(p_val, 2)
        })

    result = pd.DataFrame(return_stats)
    result.set_index('hold_period', inplace=True)

    return result

In [2017]:
def backtest(signal_log: pd.DataFrame):
    """
    """
    bt_returns = calculate_returns(signal_log)
    bt_returns = backtest_returns(signal_log)
    bt_stats = stat_test(signal_log)
    return pd.concat([bt_returns, bt_stats], axis=1)

### 📈 Visualization

In [2018]:
def plot_ohcl(prices: pd.DataFrame, fig: go.Figure, row: int=1, col: int=1) -> None:
    """
    """
    fig.add_trace(go.Candlestick(x=prices.index, open=prices['Open'], high=prices['High'], low=prices['Low'], close=prices['Close'], name="Candlestick"), 
                    row=row, col=col)
    fig.update_yaxes(title_text='Price', row=row, col=col)

def plot_signals(signals: pd.DataFrame, fig: go.Figure, row: int=1, col: int=1) -> None:
    """
    """
    fig.add_trace(go.Scatter(
        x=signals[signals['signal'] == 1].index,
        y=signals[signals['signal'] == 1]['close'],
        mode='markers',
        marker=dict(symbol='arrow-up', color='green', size=10),
        name='Breakout'
    ), row=row, col=col)

def plot_volume(df: pd.DataFrame, fig: go.Figure, row: int=2, col: int=1) -> None:
    """
    """
    fig.add_trace(go.Bar(x=df.index, y=df['Volume'], name='Volume', marker_color='gray'),
                    row=row, col=col)
    fig.update_yaxes(title_text='Volume', row=row, col=col)


def plot_macd(signals: pd.DataFrame, fig: go.Figure, row: int=4, col: int=1) -> None:
    """
    Adds line charts for MACD and MACD Signal, and a bar chart for MACD Histogram to a specific subplot row.

    Parameters:
        signals (pd.DataFrame): DataFrame containing 'macd', 'macd_signal', and 'macd_hist' columns.
        fig (go.Figure): Plotly Figure object to which MACD traces will be added.
        row (int): The row number of the subplot to add MACD-related plots to.
    """

    fig.add_trace(
        go.Scatter(x=signals.index, y=signals['macd'], name='MACD', line=dict(color='blue')),
        row=row, col=col
    )
    fig.add_trace(
        go.Scatter(x=signals.index, y=signals['macd_signal'], name='MACD Signal', line=dict(color='orange')),
        row=row, col=col
    )
    fig.add_trace(
        go.Bar(x=signals.index, y=signals['macd_hist'], name='MACD Hist', marker_color='purple'),
        row=row, col=col
    )
    fig.update_yaxes(title_text='MACD', row=row, col=col)

def plot_indicator(df: pd.DataFrame, indicator: str, fig: go.Figure, row: int, col: int, visible: bool=True):
    """
    """
    fig.add_trace(go.Scatter(x=df.index, y=df[indicator], name=indicator.upper(), visible='legendonly' if not visible else True),
                    row=row, col=col)

def plot_returns(signals: pd.DataFrame, return_col: str, fig: go.Figure, row: int) -> None:
    """
    """
    random_signals = signals.sample(len(signals[signals['signal'] == 1]), random_state=42)

    fig.add_trace(go.Histogram(
        x=signals[signals['signal'] == 1][return_col].dropna(),
        marker_color='purple',
        name='Strategy', showlegend=True if row==9 else False
    ), row=row, col=1)

    fig.add_trace(go.Histogram(
        x=random_signals[return_col].dropna(),
        marker_color='orange',
        name='Random', showlegend=True if row==9 else False
    ), row=row, col=1)

    # Prepare KDEs
    data_signal = signals[signals['signal'] == 1][return_col].dropna()
    kde_signal = gaussian_kde(data_signal)
    x_signal = np.linspace(data_signal.min(), data_signal.max(), 200)
    y_signal = kde_signal(x_signal)

    data_random = random_signals[return_col].dropna()
    kde_random = gaussian_kde(data_random)
    x_random = np.linspace(data_random.min(), data_random.max(), 200)
    y_random = kde_random(x_random)

    fig.add_trace(go.Scatter(x=x_random, y=y_random, mode='lines', marker_color='orange', name='Random', showlegend=False), row=row, col=2)
    fig.add_trace(go.Scatter(x=x_signal, y=y_signal, mode='lines', marker_color='purple', name='Strategy', showlegend=False), row=row, col=2)

    fig.update_xaxes(title_text=return_col, row=row)
    fig.update_yaxes(title_text='Count', row=row, col=1)
    fig.update_yaxes(title_text='PDF', row=row, col=2)

In [2019]:
def plot_breakout(data_path: str, signal_log: pd.DataFrame, bt_stats: pd.DataFrame):
    """
    """
    df_price = pd.read_csv(data_path)
    df_price.set_index('Date', inplace=True)
    
    fig = make_subplots(
        rows=11, cols=2,
        shared_xaxes=True, vertical_spacing=0.05,
        row_heights=[0.2] + [0.12]*4 + [0.1] + [0.05]*5,
        specs = [
            [{"colspan": 2, "type": "candlestick"}, None],
            *[[{"colspan": 2, "type": "xy"}, None]] * 4, 
            [{"colspan": 2, "type": "domain"}, None], 
            *[[{"type": "xy"}, {"type": "xy"}]] * 5
        ]
    )
    plot_ohcl(df_price, fig)
    plot_signals(signal_log, fig)
    plot_volume(df_price, fig)
    plot_indicator(signal_log, 'bb_upper', fig, 1, 1)
    plot_indicator(signal_log, 'bb_lower', fig, 1, 1)
    plot_indicator(signal_log, 'ema_20', fig, 1, 1, False)
    plot_indicator(signal_log, 'ema_50', fig, 1, 1, False)
    plot_indicator(signal_log, 'rsi', fig, 3, 1)
    plot_macd(signal_log, fig)
    plot_indicator(signal_log, 'adx', fig, 5, 1)
    
    r_inc = 0
    for c in signal_log.columns:
        if 'return' in c:
            plot_returns(signal_log, c, fig, 7 + r_inc)
            r_inc += 1

    fig.add_hline(y=70, line_dash='dash', line_color='red', row=3, col=1)
    fig.add_hline(y=30, line_dash='dash', line_color='blue', row=3, col=1)
    fig.add_hline(y=25, line_dash='dash', line_color='red', row=5, col=1)
    fig.add_hline(y=20, line_dash='dash', line_color='gray', row=5, col=1)

    bt_stats = bt_stats.reset_index()
    fig.add_trace(go.Table(
        header=dict(values=list(bt_stats.columns), fill_color='lightgrey', align='left'),
        cells=dict(values=[bt_stats[col] for col in bt_stats.columns], fill_color='white', align='left')
    ), row=6, col=1)

    fig.update_yaxes(title_text='RSI', row=3, col=1)
    fig.update_yaxes(title_text='ADX', row=5, col=1)

    fig.update_xaxes(title_text="Date", range=[df_price.index.min(), df_price.index.max()], row=5, col=1)

    fig.update_layout(
        height=1800,
        title_text="Breakout Detection Dashboard",
        title_x=0.5,
        xaxis_rangeslider_visible=False,
        showlegend=True
    )
    fig.show()

### 📊 Results

In [2020]:
# Create the backtest engine
cerebro = bt.Cerebro()

cerebro.addstrategy(MomentumStrategy, entry_score_threshold=ENTRY_THRESHOLD)

0

In [2021]:
# import glob

# data = glob.glob('/home/lawre/Finovax-Quantitative-Researcher-Internship-Summer-2025/Momentum-Stock-Screener-Week8/data/*.csv')

# aggregate = []
# for f in data:
#     signal_log = get_signal_log(f, cerebro)

#     backtest_stats = backtest(signal_log)

#     aggregate.append(backtest_stats)

# aggregate_stats = sum(aggregate) / len(aggregate)

In [2022]:
f = '/home/lawre/Finovax-Quantitative-Researcher-Internship-Summer-2025/Momentum-Stock-Screener-Week8/data/NXPI.csv'
signal_log = get_signal_log(f, cerebro)

backtest_stats = backtest(signal_log)

In [2023]:
plot_breakout(f, signal_log, backtest_stats)