## Imports

In [12]:
import pandas as pd
import yfinance as yf
import numpy as np
import time
import plotly.graph_objects as go
import plotly.io as pio
from plotly.subplots import make_subplots
from datetime import datetime, timedelta
import math
import seaborn as sns
import matplotlib.pyplot as plt
import itertools
import warnings
from tqdm.notebook import tqdm
from itertools import islice
from typing import List, Dict, Callable, Tuple

warnings.simplefilter(action='ignore', category=FutureWarning)

## Functions

In [5]:
def download_data(symbols: List[str], period: str, interval: str, batch_size: int = 100, delay: float = 0.1) -> Dict[str, pd.DataFrame]:
    """
    Download OHLCV data for multiple symbols using Yahoo Finance.

    Args:
        symbols (List[str]): List of ticker symbols to download.
        period (str): Data period (e.g., "1d", "5d", "1mo", "1y").
        interval (str): Candlestick interval (e.g., "1m", "15m", "1h", "1d").
        batch_size (int, optional): Number of symbols per request batch. Defaults to 100.
        delay (float, optional): Delay (in seconds) between batches to avoid rate limiting. Defaults to 0.1.

    Returns:
        Dict[str, pd.DataFrame]: Mapping of symbol → OHLCV DataFrame.
            Each DataFrame contains: Datetime, Open, High, Low, Close, Volume, Symbol.
    """
    results: Dict[str, pd.DataFrame] = {}

    num_batches = (len(symbols) - 1) // batch_size + 1
    for i in tqdm(range(0, len(symbols), batch_size), desc="Downloading batches", total=num_batches):
        batch = symbols[i:i + batch_size]

        try:
            raw_df = yf.download(
                tickers=batch,
                period=period,
                interval=interval,
                group_by="ticker",
                progress=False,
                threads=True,
                ignore_tz=True,
                auto_adjust=False
            )
        except Exception as e:
            print(f"Error downloading batch {batch}: {e}")
            time.sleep(delay)
            continue

        if raw_df.empty:
            time.sleep(delay)
            continue

        # Normalize structure
        raw_df = raw_df.reset_index(drop=False)

        if isinstance(raw_df.columns, pd.MultiIndex):
            raw_df.columns = [
                f"{symbol}_{field}" if field else "Datetime"
                for symbol, field in raw_df.columns
            ]
        else:
            raw_df.columns = [
                "Datetime" if col == "Date" else f"{batch[0]}_{col}"
                for col in raw_df.columns
            ]

        # Extract data per symbol
        for symbol in batch:
            cols = [col for col in raw_df.columns if col.startswith(f"{symbol}_")]
            if not cols:
                continue

            df_symbol = raw_df[["Datetime"] + cols].copy()
            df_symbol.columns = [
                col.split("_", 1)[1] if "_" in col else col
                for col in df_symbol.columns
            ]
            df_symbol["Symbol"] = symbol

            results[symbol] = df_symbol

        time.sleep(delay)

    return results

In [7]:
def write_list_to_file(data: List[str], filepath: str) -> None:
    """
    Write a list of strings to a file as comma-separated values.

    Args:
        data (List[str]): List of strings to write.
        filepath (str): Path to the output file.
    """
    try:
        with open(filepath, "w", encoding="utf-8") as file:
            file.write(",".join(data))
    except Exception as e:
        raise IOError(f"Failed to write to file {filepath}: {e}")

def read_list_from_file(filepath: str) -> List[str]:
    """
    Read a comma-separated list of strings from a file.

    Args:
        filepath (str): Path to the input file.

    Returns:
        List[str]: List of strings read from the file.
    """
    try:
        with open(filepath, "r", encoding="utf-8") as file:
            content = file.read().strip()
            return content.split(",") if content else []
    except FileNotFoundError:
        raise FileNotFoundError(f"File not found: {filepath}")
    except Exception as e:
        raise IOError(f"Failed to read from file {filepath}: {e}")

In [10]:
symbols_list = read_list_from_file(
    filepath='data/all-symbols-june-2025.txt'
)

In [11]:
stock_data = download_data(
    symbols=symbols_list, 
    period='1000d', 
    interval='1d',
    batch_size=100
)

Downloading batches:   0%|          | 0/55 [00:00<?, ?it/s]


2 Failed downloads:
['X', 'JNPR']: YFPricesMissingError('possibly delisted; no price data found  (period=1000d) (Yahoo error = "No data found, symbol may be delisted")')

1 Failed download:
['AZEK']: YFPricesMissingError('possibly delisted; no price data found  (period=1000d) (Yahoo error = "No data found, symbol may be delisted")')

1 Failed download:
['DIST']: YFPricesMissingError('possibly delisted; no price data found  (period=1000d) (Yahoo error = "No data found, symbol may be delisted")')

2 Failed downloads:
['JVSA', 'OCX']: YFPricesMissingError('possibly delisted; no price data found  (period=1000d) (Yahoo error = "No data found, symbol may be delisted")')

1 Failed download:
['AKYA']: YFPricesMissingError('possibly delisted; no price data found  (period=1000d) (Yahoo error = "No data found, symbol may be delisted")')

1 Failed download:
['KRON']: YFPricesMissingError('possibly delisted; no price data found  (period=1000d) (Yahoo error = "No data found, symbol may be delisted"

In [13]:
def apply_to_dict(
    stock_dict: Dict[str, pd.DataFrame],
    function: Callable[..., pd.DataFrame],
    **kwargs
) -> Dict[str, pd.DataFrame]:
    """
    Apply a transformation function to each DataFrame in a dictionary.

    Args:
        stock_dict (Dict[str, pd.DataFrame]): Mapping of symbol → DataFrame.
        function (Callable[..., pd.DataFrame]): Function to apply to each DataFrame.
        **kwargs: Additional keyword arguments for the function.

    Returns:
        Dict[str, pd.DataFrame]: Updated mapping with transformed DataFrames.
    """
    new_stock_dict: Dict[str, pd.DataFrame] = {}
    for symbol, df in tqdm(stock_dict.items(), desc="Processing symbols", total=len(stock_dict)):
        df_copy = df.copy()
        try:
            new_stock_dict[symbol] = function(df_copy, **kwargs)
        except Exception as e:
            raise RuntimeError(f"Error processing {symbol}: {e}")
    return new_stock_dict

def calculate_ma(df: pd.DataFrame, period: int, source: str = "Close", method: str = "EMA") -> pd.DataFrame:
    """
    Calculate a moving average (EMA or SMA) and add it as a new column.

    Args:
        df (pd.DataFrame): DataFrame containing a source column (e.g., 'Close').
        period (int): Lookback period for the moving average.
        source (str, optional): Column to calculate MA on. Defaults to "Close".
        method (str, optional): Type of moving average ("EMA" or "SMA"). Defaults to "EMA".

    Returns:
        pd.DataFrame: DataFrame with the new MA column added.

    Raises:
        ValueError: If the source column is missing, if period <= 0, or if method is invalid.
        TypeError: If period is not an integer.
    """
    try:
        if method.upper() == "EMA":
            ma_series = df[source].ewm(span=period, adjust=False).mean()
            ma_series.iloc[:period - 1] = pd.NA
        else:  # SMA
            ma_series = df[source].rolling(window=period).mean()

        col_name = f"{method.upper()}_{source}_{period}"
        df[col_name] = ma_series
        return df
    except Exception as e:
        raise RuntimeError(f"Failed to calculate {method.upper()} on column '{source}' with period {period}: {e}")

def calculate_atr(df: pd.DataFrame, period: int = 14) -> pd.DataFrame:
    """
    Calculate Average True Range (ATR) and ATR%.

    Args:
        df (pd.DataFrame): DataFrame with 'High', 'Low', and 'Close' columns.
        period (int, optional): ATR lookback period. Defaults to 14.

    Returns:
        pd.DataFrame: DataFrame with added 'ATR_{period}' and 'ATR%' columns.
    """
    try:
        high_low = df["High"] - df["Low"]
        high_close = (df["High"] - df["Close"].shift()).abs()
        low_close = (df["Low"] - df["Close"].shift()).abs()
        tr = pd.concat([high_low, high_close, low_close], axis=1).max(axis=1)

        atr_col = f"ATR_{period}"
        df[atr_col] = tr.rolling(period).mean()
        df["ATR%"] = df[atr_col] / df["Close"] * 100
        return df
    except Exception as e:
        raise RuntimeError(f"Failed to calculate ATR: {e}") 
    
def calculate_macd(df: pd.DataFrame) -> pd.DataFrame:
    """
    Calculate MACD, Signal, and Histogram.

    Args:
        df (pd.DataFrame): DataFrame with a 'Close' column.

    Returns:
        pd.DataFrame: DataFrame with added 'MACD', 'MACD_Signal', 'MACD_Hist' columns.
    """
    try:
        ema12 = df["Close"].ewm(span=12, adjust=False).mean()
        ema26 = df["Close"].ewm(span=26, adjust=False).mean()

        df["MACD"] = ema12 - ema26
        df["MACD_Signal"] = df["MACD"].ewm(span=9, adjust=False).mean()
        df["MACD_Hist"] = df["MACD"] - df["MACD_Signal"]
        return df
    except Exception as e:
        raise RuntimeError(f"Failed to calculate MACD: {e}") 

def label_candle_color(df: pd.DataFrame) -> pd.DataFrame:
    """
    Label each candlestick as Green, Red, or Doji.

    Args:
        df (pd.DataFrame): DataFrame with 'Open' and 'Close' columns.

    Returns:
        pd.DataFrame: DataFrame with added 'Candle_Color' column.
    """
    try:
        df["Candle_Color"] = df.apply(
            lambda row: (
                "Green" if row["Close"] > row["Open"]
                else "Red" if row["Close"] < row["Open"]
                else "Doji"
            ),
            axis=1
        )
        return df
    except Exception as e:
        raise RuntimeError(f"Failed to label candle color: {e}") 

def mark_crossovers(df: pd.DataFrame, short_ma_params: Tuple, long_ma_params: Tuple) -> pd.DataFrame:
    """
    Detect moving average crossovers (Bullish or Bearish).

    Args:
        df (pd.DataFrame): DataFrame containing the required MA columns.
        short_ma_params (Tuple[int, str, str]): (period, source, method) for the short MA.
        long_ma_params (Tuple[int, str, str]): (period, source, method) for the long MA.

    Returns:
        pd.DataFrame: DataFrame with an added crossover signal column.

    Raises:
        ValueError: If the required MA columns are missing or periods are invalid.
    """
    try:
        short_p, short_s, short_m = short_ma_params
        long_p, long_s, long_m = long_ma_params

        if short_p >= long_p:
            raise ValueError(
                f"Short MA must have smaller period than Long MA.\nFound Short MA Period: {short_p}, Long MA Period: {long_p}"
            )

        short_col = f"{short_m}_{short_s}_{short_p}"
        long_col = f"{long_m}_{long_s}_{long_p}"
        crossover_col = f"{short_col}_{long_col}_Crossover"

        if short_col not in df.columns or long_col not in df.columns:
            raise ValueError(
                f"Required columns '{short_col}' and/or '{long_col}' not found in DataFrame."
            )

        cross_up = (df[short_col] > df[long_col]) & (df[short_col].shift(1) <= df[long_col].shift(1))
        cross_down = (df[short_col] < df[long_col]) & (df[short_col].shift(1) >= df[long_col].shift(1))

        df[crossover_col] = "No"
        df.loc[cross_up, crossover_col] = "Bullish"
        df.loc[cross_down, crossover_col] = "Bearish"

        return df
    except Exception as e:
        raise RuntimeError(f"Failed to calculate MA Crossover: {e}")  

def process_symbol_df(
    df: pd.DataFrame,
    ma_params: List[int] = [
        (8, "Close", "EMA"), (20, "Close", "EMA"), (34, "Close", "EMA"), (50, "Close", "EMA"), (200, "Close", "EMA")
    ],
    crossover_mas: List[Tuple[Tuple[int, str, str], Tuple[int, str, str]]] = [
        ((8, "Close", "EMA"), (20, "Close", "EMA"))
    ]
) -> pd.DataFrame:
    """
    Clean OHLCV data and compute multiple technical indicators.

    Args:
        df (pd.DataFrame): Symbol DataFrame with OHLCV columns.
        ma_params (List[Tuple[int, str, str]], optional): List of MA configs as (period, source, method).
        crossover_mas (List[Tuple[Tuple, Tuple]], optional): List of MA crossover pairs.
            Example: [((8, "Close", "EMA"), (20, "Close", "EMA"))]

    Returns:
        pd.DataFrame: DataFrame with added indicators.
    """
    # Ensure numeric OHLCV
    for col in ["Open", "High", "Low", "Close", "Volume"]:
        if col in df.columns:
            df[col] = pd.to_numeric(df[col], errors="coerce")

    # Round OHLC values
    for col in ["Open", "High", "Low", "Close"]:
        if col in df.columns:
            df[col] = df[col].round(2)

    # --- Calculate EMAs, SMAs and above/below flags ---
    for period, source, method in ma_params:
        df = calculate_ma(df, period=period, source=source, method=method)
        ma_col = f"{method}_{source}_{period}"
        df[f"Above_{ma_col}"] = df["Close"] > df[ma_col]

    # --- Other indicators ---
    df = calculate_atr(df)
    df = calculate_macd(df)
    df = label_candle_color(df)

    # --- Add crossover signals ---
    for short_params, long_params in crossover_mas:
        df = mark_crossovers(df, short_params, long_params)

    return df

In [14]:
stock_data_labeled = apply_to_dict(stock_data, process_symbol_df)

Processing symbols:   0%|          | 0/5443 [00:00<?, ?it/s]

In [28]:
def find_potential_entries(
    stock_data_labeled: Dict[str, pd.DataFrame],
    short_ema: int = 8,
    long_ema: int = 20,
    min_crossover_vol: int = 500_000,
    min_price: float = 20.0,
    max_atr: float = 5.77,
    shift: int = 0,  # 0 = last row, 1 = one row before, etc.
) -> List[str]:
    """
    Scan all stocks and return list of symbols that may have a long entry for a given session.

    Args:
        stock_data_labeled (dict): Dict of {symbol: DataFrame} with indicators and crossovers.
        short_ema (int): Short EMA period.
        long_ema (int): Long EMA period.
        min_crossover_vol (int): Minimum volume on crossover day.
        min_price (float): Minimum entry price.
        max_atr (float): Maximum ATR% allowed.
        shift (int): Number of rows to shift backwards (0 = last row).

    Returns:
        List[str]: Symbols that may have entry setups.
    """
    candidates = []

    crossover_col = f"EMA_Close_{short_ema}_EMA_Close_{long_ema}_Crossover"

    for symbol, df in stock_data_labeled.items():
        if df.empty or shift >= len(df):
            continue

        last_row = df.iloc[-1 - shift]  # shifted row

        if crossover_col not in df.columns or last_row[crossover_col] != "Bullish":
            continue

        # Build filter condition based on your actual columns
        conditions = (
            last_row.get("Above_EMA_Close_34", False) &
            last_row.get("Above_EMA_Close_50", False) &
            last_row.get("Above_EMA_Close_200", False) &
            (last_row.get("MACD", 0) > 0) &
            (last_row.get("MACD_Signal", 0) < 0) &
            (last_row.get("Volume", 0) >= min_crossover_vol) &
            (last_row.get("Close", 0) >= min_price) &
            (last_row.get("ATR%", 100) <= max_atr)
        )

        if conditions:
            candidates.append(symbol)

    return candidates

In [31]:
potential_entries = find_potential_entries(stock_data_labeled, shift=0)