Source Repo: https://github.com/wholidi/Project/tree/main/Minervini

In [None]:
import os
import yfinance as yf
import pandas as pd

DATA_DIR = "yfinance_data"
os.makedirs(DATA_DIR, exist_ok=True)

def get_yfinance_data(symbol: str, start_date: str, end_date: str, interval: str = "1d") -> pd.DataFrame:
    """
    Fetch historical OHLCV data for a given ticker from Yahoo Finance, 
    using a cached file if available.

    If a cached CSV file exists for the symbol, data is loaded from it.
    Otherwise, live data is fetched from Yahoo Finance and saved to the cache.

    Args:
        symbol (str): Ticker symbol (e.g., "AAPL", "SPY").
        start_date (str): Start date for historical data in 'YYYY-MM-DD' format.
        end_date (str): End date for historical data in 'YYYY-MM-DD' format.
        interval (str): Data interval ("1d", "1wk", "1mo", etc.). Default is "1d".

    Returns:
        pd.DataFrame: DataFrame with timestamp index and columns: open, high, low, close, volume.
                      Returns empty DataFrame if no data is found.
    """
    file_path = os.path.join(DATA_DIR, f"{symbol}.csv")  # 🔹 File name without date

    # Load from cache if available
    if os.path.exists(file_path):
        df = pd.read_csv(file_path, index_col=0, parse_dates=True)
        print(f"📄 Loaded {symbol} data from file ({len(df)} rows)")
        return df

    # Fetch live data
    print(f"Calling live API for {symbol}")
    ticker = yf.Ticker(symbol)
    df = ticker.history(start=start_date, end=end_date, interval=interval)

    if df.empty:
        print(f"⚠️ No data found for {symbol}")
        return pd.DataFrame()

    df.index = df.index.tz_localize(None)  # remove timezone if present
    df = df.rename(columns={
        "Open": "open",
        "High": "high",
        "Low": "low",
        "Close": "close",
        "Volume": "volume"
    })

    df = df[["open", "high", "low", "close", "volume"]]

    # Save to cache
    df.to_csv(file_path)
    print(f"✅ Saved {symbol} data to {file_path} ({len(df)} rows)")

    return df

In [None]:
import datetime

# Index return using Yahoo Finance (SPY as proxy for S&P 500)
start_date = (datetime.datetime.now() - datetime.timedelta(days=365)).strftime("%Y-%m-%d")
end_date = datetime.datetime.now().strftime("%Y-%m-%d")

index_symbol = "SPY"
index_df = get_yfinance_data(index_symbol, start_date=start_date, end_date=end_date)

# Ensure 'close' is float
index_df["close"] = index_df["close"].astype(float)

# Compute daily percent change
index_df["Percent Change"] = index_df["close"].pct_change()

# Compute cumulative return
index_return = (index_df["Percent Change"] + 1).cumprod().iloc[-1]

print(f"S&P 500 proxy return (SPY): {index_return:.2f}x")
print(index_df.head())

In [None]:
import os
import requests
import pandas as pd
from io import StringIO

TICKERS_FILE = "tickers_sp500.csv"  # 🔹 file to store full table with sanitized tickers

def tickers_sp500() -> list:
    """
    Fetch S&P 500 tickers from Wikipedia, cache the full table to a CSV file
    with sanitized tickers, and return a list of tickers for Yahoo Finance.

    Returns:
        list: List of ticker symbols formatted for Yahoo Finance (e.g., BRK-B instead of BRK.B).
    """
    # 🔹 Load from file if exists
    if os.path.exists(TICKERS_FILE):
        df = pd.read_csv(TICKERS_FILE)
        tickers = df['Symbol'].dropna().tolist()
        print(f"📄 Loaded {len(tickers)} tickers from {TICKERS_FILE}")
        return tickers

    # 🔹 Fetch live data from Wikipedia
    url = 'https://en.wikipedia.org/wiki/List_of_S%26P_500_companies'
    headers = {"User-Agent": "Mozilla/5.0"}
    html = requests.get(url, headers=headers).text
    df_list = pd.read_html(StringIO(html), header=0)
    df = df_list[0]

    # 🔹 Sanitize tickers in the DataFrame
    df['Symbol'] = df['Symbol'].str.replace('.', '-', regex=False)

    # 🔹 Save full table with sanitized tickers to CSV
    df.to_csv(TICKERS_FILE, index=False)
    print(f"✅ Saved full S&P 500 table with sanitized tickers to {TICKERS_FILE}")

    # 🔹 Return list of tickers
    tickers = df['Symbol'].dropna().tolist()
    return tickers

In [None]:
tickers = tickers_sp500()
print(f"✅ Loaded {len(tickers)} S&P 500 tickers")
print(tickers[:50])

In [None]:
# Global dictionary to store all ticker data
yahoo_data = {}
successful_tickers = []
failed_tickers = []

def load_yahoo_data(tickers):
    """
    Fetch data once and store in global yahoo_data dictionary.
    Writes passed and failed tickers to text files.
    """
    global yahoo_data

    for symbol in tickers:
        df = get_yfinance_data(symbol, start_date=start_date, end_date=end_date)

        if df.empty or "close" not in df.columns:
            print(f"⚠️ Skipping {symbol}: No data or bad format")
            failed_tickers.append(symbol)
            continue

        yahoo_data[symbol] = df
        successful_tickers.append(symbol)
        print(f"✅ Loaded {symbol} successfully")

    # Write failed tickers to file
    with open("failed_tickers.txt", "w") as f:
        for ticker in failed_tickers:
            f.write(f"{ticker}\n")

    # Write successful tickers to file (optional)
    with open("successful_tickers.txt", "w") as f:
        for ticker in successful_tickers:
            f.write(f"{ticker}\n")

    print(f"\n📄 {len(successful_tickers)} tickers loaded successfully and saved to successful_tickers.txt")
    print(f"📄 {len(failed_tickers)} tickers failed and saved to failed_tickers.txt")


In [None]:
load_yahoo_data(tickers)

In [None]:
def calculate_rs(index_return, start_date, end_date, index_symbol="SPY"):
    """
    Calculate RS scores using preloaded yahoo_data and precomputed index_return.
    Only processes successful tickers from load_yahoo_data().
    """
    global yahoo_data, successful_tickers

    returns_multiples = []

    for symbol in successful_tickers:
        if symbol == index_symbol:
            continue  # skip the index itself

        df_subset = yahoo_data[symbol].loc[start_date:end_date].copy()
        df_subset["close"] = df_subset["close"].astype(float)
        df_subset["Percent Change"] = df_subset["close"].pct_change()
        stock_return = (df_subset["Percent Change"] + 1).cumprod().iloc[-1]
        rs_score = stock_return / index_return
        returns_multiples.append(rs_score)
        print(f"✅ {symbol} processed – RS Score: {rs_score:.2f}")

    return returns_multiples

returns_multiples = calculate_rs(index_return, start_date, end_date)
print(f"\nCalculated RS scores for {len(successful_tickers)} tickers.")

In [None]:
# 📊 Create RS Rating DataFrame
rs_df = pd.DataFrame({
    'Ticker': successful_tickers,
    'Returns_multiple': returns_multiples
})
rs_df['RS_Rating'] = rs_df['Returns_multiple'].rank(pct=True) * 100

# ✅ Filter top 30% by RS Rating (adjust quantile as needed)
top_30_percent = rs_df['RS_Rating'].quantile(0.7)
rs_top_df = rs_df[rs_df['RS_Rating'] >= top_30_percent]

print(rs_top_df[:10])
# Print sorted top RS stocks
print(rs_top_df.sort_values(by='RS_Rating', ascending=False).head(10))

# ✅ Export all RS-rated stocks without filtering
rs_df.to_csv("rs_full_list.csv", index=False)
print(f"📊 Full RS stock list: {len(rs_df)} saved to rs_full_list.csv")

In [None]:
# 📌 Initialize export DataFrame
detailedExportList = pd.DataFrame(columns=[
    'Stock', 'Price', '50 Day MA', '150 Day MA', '200 Day MA', 
    '52 Week Low', '52 Week High', 'RS_Rating', 'Status'  # 🔹 CHANGED
])

# ✅ Iterate only over top RS stocks
for stock in rs_top_df['Ticker']:
    try:
        # Use already loaded data
        df = yahoo_data.get(stock)
        if df is None or df.empty:
            print(f"⚠️ No data for {stock}, skipping")
            continue

        # Clean and prep columns
        df.columns = [col.split(". ")[-1] for col in df.columns]
        df[['close', 'high', 'low']] = df[['close', 'high', 'low']].astype(float)

        # Compute SMAs
        df['SMA_50'] = df['close'].rolling(window=50).mean()
        df['SMA_150'] = df['close'].rolling(window=150).mean()
        df['SMA_200'] = df['close'].rolling(window=200).mean()

        # Grab latest values
        currentClose = df['close'].iloc[-1]
        SMA_50 = df['SMA_50'].iloc[-1]
        SMA_150 = df['SMA_150'].iloc[-1]
        SMA_200 = df['SMA_200'].iloc[-1]

        # 52-week high/low
        low_52week = round(df['low'].iloc[-260:].min(), 2)
        high_52week = round(df['high'].iloc[-260:].max(), 2)

        # RS Rating
        RS_Rating = round(rs_top_df.loc[rs_top_df['Ticker'] == stock, 'RS_Rating'].iloc[0])

        # Handle insufficient SMA data
        if pd.isnull([SMA_50, SMA_150, SMA_200]).any():
            status_msg = f"❌ Skipped {stock}: Not enough data for SMAs ({len(df)} rows)"
            print(status_msg)
        else:
            # Compare 200-day SMA 20 days ago
            SMA_200_20 = df['SMA_200'].iloc[-20] if len(df) >= 220 else 0

            # Minervini conditions
            condition_1 = currentClose > SMA_150 > SMA_200
            condition_2 = SMA_150 > SMA_200
            condition_3 = SMA_200 > SMA_200_20
            condition_4 = SMA_50 > SMA_150 > SMA_200
            condition_5 = currentClose > SMA_50
            condition_6 = currentClose >= 1.3 * low_52week
            condition_7 = currentClose >= 0.75 * high_52week

            conditions = [condition_1, condition_2, condition_3, condition_4,
                          condition_5, condition_6, condition_7]

            if all(conditions):
                status_msg = "✅ Passed Minervini"
                print(f"✅ {stock} passed Minervini")
            else:
                failed = [str(i+1) for i, c in enumerate(conditions) if not c]
                status_msg = f"❌ Failed Minervini conditions: {', '.join(failed)}"
                print(f"❌ {stock} failed Minervini: {', '.join(failed)}")

        # Append to export list once per stock
        detailedExportList = pd.concat([detailedExportList, pd.DataFrame([{
            'Stock': stock,
            'Price': currentClose,
            '50 Day MA': SMA_50,
            '150 Day MA': SMA_150,
            '200 Day MA': SMA_200,
            '52 Week Low': low_52week,
            '52 Week High': high_52week,
            'RS_Rating': RS_Rating,
            'Status': status_msg
        }])], ignore_index=True)

    except Exception as e:
        print(f"⚠️ Could not process {stock}: {e}")

# ✅ Split results
passed_df = detailedExportList[detailedExportList['Status'] == "✅ Passed Minervini"]
failed_df = detailedExportList[detailedExportList['Status'] != "✅ Passed Minervini"]

# # 📤 Export to Excel
# passed_df.to_excel("Minervini_Passed.xlsx", index=False)
# failed_df.to_excel("Minervini_Failed.xlsx", index=False)

# 📤 Export to CSV
passed_df.to_csv("Minervini_Passed.csv", index=False)
failed_df.to_csv("Minervini_Failed.csv", index=False)

print(f"\n📄 {len(passed_df)} stocks passed Minervini, saved to Minervini_Passed.csv")
print(f"📄 {len(failed_df)} stocks failed Minervini, saved to Minervini_Failed.csv")
