In [None]:
import yfinance as yf
import pandas as pd
from google.colab import drive
import time
import os

In [None]:
# Mount Google Drive
drive.mount('/content/drive')

# Force synchronization using the drive module
drive.flush_and_unmount() # This will unmount and remount the drive
drive.mount('/content/drive', force_remount=True) # Forces remounting

Mounted at /content/drive
Mounted at /content/drive


In [None]:
tickers_df = pd.read_csv("/content/drive/MyDrive/S&P 500 List.csv") #<-- This to download S&P List
## Piotroski_df = pd.read_csv("/content/drive/MyDrive/Piotroski Scores.csv") #<-- This to give the data scores
tickers = tickers_df["Ticker"].dropna().tolist()  # Adjust column name if needed

In [None]:
stock = yf.Ticker("AAPL")
print(stock.info.keys())  # Shows all available metadata fields

dict_keys(['address1', 'city', 'state', 'zip', 'country', 'phone', 'website', 'industry', 'industryKey', 'industryDisp', 'sector', 'sectorKey', 'sectorDisp', 'longBusinessSummary', 'fullTimeEmployees', 'companyOfficers', 'auditRisk', 'boardRisk', 'compensationRisk', 'shareHolderRightsRisk', 'overallRisk', 'governanceEpochDate', 'compensationAsOfEpochDate', 'irWebsite', 'executiveTeam', 'maxAge', 'priceHint', 'previousClose', 'open', 'dayLow', 'dayHigh', 'regularMarketPreviousClose', 'regularMarketOpen', 'regularMarketDayLow', 'regularMarketDayHigh', 'dividendRate', 'dividendYield', 'exDividendDate', 'payoutRatio', 'fiveYearAvgDividendYield', 'beta', 'trailingPE', 'forwardPE', 'volume', 'regularMarketVolume', 'averageVolume', 'averageVolume10days', 'averageDailyVolume10Day', 'bid', 'ask', 'bidSize', 'askSize', 'marketCap', 'fiftyTwoWeekLow', 'fiftyTwoWeekHigh', 'priceToSalesTrailing12Months', 'fiftyDayAverage', 'twoHundredDayAverage', 'trailingAnnualDividendRate', 'trailingAnnualDividen

First part of this whole process is data collection and the Y(ahoo)finance API is the way to go.

In [None]:
def calculate_dcf_value(fcf, shares_outstanding, net_debt, growth_rate, discount_rate=0.10, terminal_growth_rate=0.02):
    """
    Estimate intrinsic value per share using a simplified DCF model.
    - fcf: Free Cash Flow (annual)
    - shares_outstanding: number of shares
    - net_debt: total debt minus cash
    - growth_rate: estimated FCF growth rate (based on industry)
    """
    try:
        # Project Free Cash Flows for 5 years
        projected_fcfs = [
        (fcf * ((1 + growth_rate) ** i)) / ((1 + discount_rate) ** i)
        for i in range(1, 6)
            ]

        # Terminal Value using Gordon Growth Model
        fcf_year_6 = fcf * ((1 + growth_rate) ** 5)
        terminal_value = (fcf_year_6 * (1 + terminal_growth_rate)) / (discount_rate - terminal_growth_rate)
        terminal_value_discounted = terminal_value / ((1 + discount_rate) ** 5)

        # Total enterprise value
        enterprise_value = sum(projected_fcfs) + terminal_value_discounted

        # Equity value = Enterprise Value - Net Debt
        equity_value = enterprise_value - net_debt

        # Intrinsic value per share
        intrinsic_value_per_share = equity_value / shares_outstanding if shares_outstanding > 0 else None

        return intrinsic_value_per_share

    except Exception as e:
            print(f"❌ Error in DCF calculation: {e}")
            return None

In [None]:
# Estimated industry FCF growth rates
industry_growth_estimates = {
    "Technology": 0.10,
    "Healthcare": 0.07,
    "Financial Services": 0.05,
    "Consumer Defensive": 0.04,
    "Consumer Cyclical": 0.06,
    "Energy": 0.03,
    "Industrials": 0.05,
    "Utilities": 0.02,
    "Real Estate": 0.03,
    "Materials": 0.04,
    "Communication Services": 0.06
}

def calculate_piotroski_score(fin, bal, cf):
    try:
        net_income = fin.loc["Net Income"].iloc[0]
        total_assets = bal.loc["Total Assets"].iloc[0]
        total_assets_prev = bal.loc["Total Assets"].iloc[1]
        cfo = cf.loc["Operating Cash Flow"].iloc[0]

        roa = net_income / total_assets if total_assets else 0

        ltd_curr = bal.loc["Long Term Debt"].iloc[0]
        ltd_prev = bal.loc["Long Term Debt"].iloc[1]

        ca_curr = bal.loc["Current Assets"].iloc[0]
        cl_curr = bal.loc["Current Liabilities"].iloc[0]
        ca_prev = bal.loc["Current Assets"].iloc[1]
        cl_prev = bal.loc["Current Liabilities"].iloc[1]
        cr_curr = ca_curr / cl_curr if cl_curr else 0
        cr_prev = ca_prev / cl_prev if cl_prev else 0

        shares_curr = bal.loc["Ordinary Shares Number"].iloc[0]
        shares_prev = bal.loc["Ordinary Shares Number"].iloc[1]

        gp_curr = fin.loc["Gross Profit"].iloc[0]
        rev_curr = fin.loc["Total Revenue"].iloc[0]
        gp_prev = fin.loc["Gross Profit"].iloc[1]
        rev_prev = fin.loc["Total Revenue"].iloc[1]
        gm_curr = gp_curr / rev_curr if rev_curr else 0
        gm_prev = gp_prev / rev_prev if rev_prev else 0

        at_curr = rev_curr / total_assets if total_assets else 0
        at_prev = rev_prev / total_assets_prev if total_assets_prev else 0

        capex = cf.loc["Capital Expenditure"].iloc[0]
        fcf = cfo + capex  # Because CapEx is negative
        retained_earnings = bal.loc["Retained Earnings"].iloc[0]

        score = 0
        score += net_income > 0
        score += roa > 0
        score += cfo > 0
        score += cfo > net_income
        score += ltd_curr < ltd_prev
        score += cr_curr > cr_prev
        score += shares_curr <= shares_prev
        score += gm_curr > gm_prev
        score += at_curr > at_prev

        return score, retained_earnings, free_cash_flow

    except Exception as e:
        print(f"⚠️ Error scoring {ticker}: {e}")
        return 0, 0, 0  # Give a 0 score with 0 metrics checked


# Output containers
combined_data = []
summary_list = []
errors = []

for i, ticker in enumerate(tickers):
    for attempt in range(3):
        try:
            stock = yf.Ticker(ticker)
            hist = stock.history(period="5y", interval="3mo")
            break
        except Exception as e:
          print(f"[{i+1}/{len(tickers)}] Fetching {ticker}...")
    try:
        stock = yf.Ticker(ticker)
        hist = stock.history(period="5y", interval="3mo")

        if hist.empty:
            print(f"❌ No price data for {ticker}")
            continue

        fin = stock.quarterly_financials
        bal = stock.quarterly_balance_sheet
        cf  = stock.quarterly_cashflow

        score, re, fcf = calculate_piotroski_score(fin, bal, cf)
        if score is None:
            continue

        hist = hist.copy()
        hist.reset_index(inplace=True)
        hist['Ticker'] = ticker
        hist['Piotroski'] = score
        hist['RetainedEarnings'] = re
        hist['FreeCashFlow'] = fcf
        combined_data.append(hist)

        # DCF valuation
        annual_fcf = fcf * 4
        cash = bal.loc["Cash And Cash Equivalents"].iloc[0]
        debt = bal.loc["Long Term Debt"].iloc[0] + bal.get("Short Term Debt", 0)
        net_debt = debt - cash
        shares_outstanding = bal.loc["Ordinary Shares Number"].iloc[0]
        sector = stock.info.get("sector", None)
        growth_rate = industry_growth_estimates.get(sector, 0.05)

        intrinsic_value = calculate_dcf_value(
            fcf=annual_fcf,
            shares_outstanding=shares_outstanding,
            net_debt=net_debt,
            growth_rate=growth_rate
        )

        current_price = hist['Close'].iloc[-1]
        if intrinsic_value is not None and current_price is not None:
            ratio = current_price / intrinsic_value
            if ratio < 0.75:
                valuation_label = "Undervalued"
            elif ratio > 1.25:
                valuation_label = "Overvalued"
            else:
                valuation_label = "Fairly Valued"
        else:
            valuation_label = "Unknown"

        summary_data = {
            'Ticker': ticker,
            'Piotroski': score,
            'RetainedEarnings': re,
            'FreeCashFlow': fcf,
            'MarketCap': stock.info.get('marketCap', None),
            'Sector': sector,
            'GrowthRate': growth_rate,
            'DCF_Estimate': intrinsic_value,
            'CurrentPrice': current_price,
            'ValuationLabel': valuation_label
        }
        summary_list.append(summary_data)

    except Exception as e:
        print(f"❌ Error with {ticker}: {e}")
        errors.append((ticker, str(e)))

    time.sleep(3)

# Final saves
final_df = pd.concat(combined_data, ignore_index=True)
final_df.to_csv("nyse_5yr_piotroski.csv", index=False)
print("✅ DONE! Saved historical data to nyse_5yr_piotroski.csv")

summary_df = pd.DataFrame(summary_list)
summary_df.to_csv("valuation_summary.csv", index=False)
print("✅ Saved summary data to valuation_summary.csv")

pd.DataFrame(errors, columns=["Ticker", "Error"]).to_csv("missing_data_tickers.csv", index=False)

[1/502] Fetching AAPL...
[1/502] Fetching AAPL...
[1/502] Fetching AAPL...
❌ Error with AAPL: Too Many Requests. Rate limited. Try after a while.
[2/502] Fetching NVDA...


KeyboardInterrupt: 