In [1]:
import yfinance as yf
import pandas as pd
import os
import warnings
import datetime

BASE_DIR="/Users/dannyyu/Desktop/AI_Trader/data"
TICKERS = ['AAPL', 'MSFT', 'GOOGL', 'AMZN', 'META', 'NVDA', 'TSLA', 'INTC', 'AMD', 'IBM']
START_DATE = "2022-01-01"
END_DATE = datetime.datetime.today()

##create the folders
os.makedirs(os.path.join(BASE_DIR, "prices"), exist_ok=True)
os.makedirs(os.path.join(BASE_DIR, "fundamentals"), exist_ok=True)
os.makedirs(os.path.join(BASE_DIR, "metadata"), exist_ok=True)

In [2]:
"""
Returns the most recent available closing price on or before target_date
by checking historical data within the past `lookback_days`.
"""
def get_closest_price(ticker_obj, target_date, lookback_days=3):
    try:
        start_date = target_date - pd.Timedelta(days=lookback_days)
        end_date = target_date + pd.Timedelta(days=1)

        hist = ticker_obj.history(start=start_date, end=end_date)
        
        if not hist.empty:
            last_row = hist.iloc[-1]
            timestamp_str = hist.index[-1].strftime("%Y-%m-%d")
           # print("Data retrieved on date "+timestamp_str)
            return last_row['Close']
        else:
            print(f"No price data found for {ticker_obj.ticker} between {start_date.date()} and {end_date.date()}")
            return None

    except Exception as e:
        print(f"Error retrieving price for {ticker_obj.ticker} near {target_date.date()}: {e}")
        return None

In [3]:
def get_fundamentals_history(ticker, years=3):
    ticker_obj = yf.Ticker(ticker)
    info = ticker_obj.info

    # Get quarterly and annual statements
    q_income_stmt = ticker_obj.quarterly_financials
    q_balance_sheet = ticker_obj.quarterly_balance_sheet
    q_cashflow_stmt = ticker_obj.quarterly_cashflow

    y_income_stmt = ticker_obj.financials
    y_balance_sheet = ticker_obj.balance_sheet
    y_cashflow_stmt = ticker_obj.cashflow

    # Initialize
    fundamentals = []

    # Helper to extract data row by row from financials
    def extract_fundamentals(date, is_quarterly=True):
        try:
            stmt_src = "quarterly" if is_quarterly else "yearly"
            inc = q_income_stmt if is_quarterly else y_income_stmt
            bal = q_balance_sheet if is_quarterly else y_balance_sheet
            cf  = q_cashflow_stmt if is_quarterly else y_cashflow_stmt

            revenue = inc.loc["Total Revenue", date]
            gross_profit = inc.loc["Gross Profit", date]
            ebitda = inc.loc["EBITDA", date]
            net_income = inc.loc["Net Income", date]

            operating_cashflow = cf.loc["Operating Cash Flow", date]
            total_equity = bal.loc["Common Stock Equity", date]
            total_debt = bal.loc["Total Debt", date]

            roe = net_income / total_equity if total_equity else None
            debt_to_equity = (total_debt / total_equity)*100 if total_equity else None
            operating_margin = net_income / revenue if revenue else None

            diluted_eps = inc.loc["Diluted EPS", date]
            hist_price = get_closest_price(ticker_obj, date)
            pe_ratio = (hist_price / diluted_eps) if hist_price and diluted_eps else info.get("trailingPE", None)

            return {
                "Date": date.strftime('%Y-%m-%d'),
                "DilutedEPS": diluted_eps,
                "PE": pe_ratio,
                "Revenue": revenue,
                "CashFlow": operating_cashflow,
                "EBITDA": ebitda,
                "GrossProfit": gross_profit,
                "OperatingMargin": round(operating_margin, 8),
                "ROE": round(roe, 3),
                "DebtToEquity": round(debt_to_equity, 5),
                "Source": stmt_src
            }

        except Exception as e:
            print(f"Error processing {ticker} for {date.date()} [{stmt_src}]: {e}")
            return None

    # Include most recent static values
    fundamentals.append({
        "Date": pd.Timestamp.today().strftime('%Y-%m-%d'),
        "DilutedEPS": info.get("trailingEps", None),
        "PE": info.get("trailingPE", None),
        "Revenue": info.get("totalRevenue", None),
        "CashFlow": info.get("operatingCashflow", None),
        "EBITDA": info.get("ebitda", None),
        "GrossProfit": info.get("grossProfits", None),
        "OperatingMargin": info.get("operatingMargins", None),
        "ROE": info.get("returnOnEquity", None),
        "DebtToEquity": info.get("debtToEquity", None),
        "Source": "info"
    })

    # Add quarterly reports (within last ~15 months)
    q_dates = q_income_stmt.columns.intersection(q_balance_sheet.columns).intersection(q_cashflow_stmt.columns)
    for q_date in q_dates:
        if (pd.Timestamp.today() - q_date).days <= 450:
            row = extract_fundamentals(q_date, is_quarterly=True)
            if row:
                fundamentals.append(row)

    # Add yearly reports, excluding any years already covered
    y_dates = y_income_stmt.columns.intersection(y_balance_sheet.columns).intersection(y_cashflow_stmt.columns)
    included_years = {pd.to_datetime(f["Date"]).year for f in fundamentals}

    for y_date in y_dates:
        if y_date.year in included_years or pd.Timestamp.today().year - y_date.year > years:
            continue
        row = extract_fundamentals(y_date, is_quarterly=False)
        if row:
            fundamentals.append(row)

    return pd.DataFrame(fundamentals)

In [4]:
def read_CSV_File(ticker,folderName):
    dir = BASE_DIR+"/"+folderName+"/"+ticker+"_"+folderName+".csv"
    table= pd.read_csv(dir,index_col=0,low_memory=False)
    return table

In [5]:
#merge the table for historical price and fundamental indicators, indexed on Date
#merged on Year so that all historical price data are matched to the fundamental indicators of the same year
def merge_table(price, fund):
    price = price.copy()
    price.index = pd.to_datetime(price.index, utc=True)
    fund = fund.copy()
    fund.index = pd.to_datetime(fund.index, utc=True)

         # Mark report source date
    fund["FundamentalReportDate"] = fund.index
    
    # Reset both so that Date is a column

    price = price.reset_index()
    fund = fund.reset_index()
    
    # Then proceed as usual
    #merged = pd.merge(price, fund, on="Date", how="left")

    merged = pd.concat([price, fund], ignore_index=True, sort=False)

    merged = merged.sort_values("Date", ascending=False)

    # Get list of fundamental columns (excluding 'Date' and 'FundamentalReportDate')
    fund_cols = fund.columns.drop(["Date", "FundamentalReportDate"], errors='ignore')
    # Backward fill fundamental data
    merged[fund_cols] = merged[fund_cols].bfill()
    merged["FundamentalReportDate"] = merged["FundamentalReportDate"].bfill()

    merged[fund_cols] = merged[fund_cols].ffill()
    merged["FundamentalReportDate"] = merged["FundamentalReportDate"].ffill()

    # Sort back to chronological
    merged = merged.sort_values("Date")

    # Add Year column
    merged["Year"] = merged["Date"].dt.year
    merged = merged.dropna(subset=["Close"])
    merged = merged.drop(columns=["Source","FundamentalReportDate"], errors="ignore")

    # Set Date as index
    merged.set_index("Date", inplace=True)

    return merged

In [6]:
def has_divident(metadata):
    metadata["Year"] = pd.to_datetime(metadata.index, utc=True).year #pd.to_datetime(metadata['Date'], utc=True).dt.year
    dividend_by_year = metadata.groupby('Year')['Dividends'].apply(lambda x: int((x > 0).any()))
    metadata['HasDividend'] = metadata['Year'].map(dividend_by_year)
    return metadata

In [7]:
def missing_values(table):
    total_rows = len(table)
    hasMissing=False
    print("Analyzing "+table.iloc[1]["Company"])
    for column in table.columns:
        num_missing = table[column].isna().sum()
        if num_missing == 0:
            #print(f"📊 '{column}': no missing values")
            continue  # skip columns with no missing data
        else:
            hasMissing=True
        percent_missing = 100 * num_missing / total_rows
        print(f"📊 '{column}': {num_missing} missing ({percent_missing:.2f}%)")
    if not hasMissing:
        print(table.iloc[1]["Company"] + " has no missing values")
    return hasMissing

In [8]:
def get_shares_outstanding(table, year):
    ticker_symbol = table['Company'].iloc[0]
    ticker = yf.Ticker(ticker_symbol)

    current_year = datetime.datetime.now().year
    if year == current_year:
        shares = ticker.info.get('sharesOutstanding', None)
        return round(shares, 0)
    else:
        income = ticker.financials
        income.columns = pd.to_datetime(income.columns)
        matching_cols = [col for col in income.columns if col.year == year]
        col = matching_cols[0]
        net_income = income.at['Net Income', col]
        row = table[table["Year"] == year]
        eps = row['DilutedEPS'].iloc[0]
        estimated_shares = net_income / eps
        return round(estimated_shares,0)

In [9]:
#Adds per-share normalized versions of Revenue, CashFlow, EBITDA, and GrossProfit using estimated shares outstanding for each row.
def normalize_per_share(table):
    table['Revenue_perShare'] = None
    table['CashFlow_perShare'] = None
    table['EBITDA_perShare'] = None
    table['GrossProfit_perShare'] = None
    
    for year in table["Year"].unique():
        mask = (table['Year'] == year)
        row_subset = table[mask]
        shares = get_shares_outstanding(table, year)

        table.loc[mask, 'Revenue_perShare']     = table.loc[mask, 'Revenue']     / shares
        table.loc[mask, 'CashFlow_perShare']    = table.loc[mask, 'CashFlow']    / shares
        table.loc[mask, 'EBITDA_perShare']      = table.loc[mask, 'EBITDA']      / shares
        table.loc[mask, 'GrossProfit_perShare'] = table.loc[mask, 'GrossProfit'] / shares

    per_share_cols = ['Revenue_perShare', 'CashFlow_perShare', 'EBITDA_perShare', 'GrossProfit_perShare']
    table[per_share_cols] = table[per_share_cols].astype(float).round(5)
    return table

In [10]:
def combine_metadata_files(folderName="metadata"):
    combined=[]
    for file in os.listdir(os.path.join(BASE_DIR, folderName)):
        if file.endswith("_metadata.csv") and file!= "MASTER_metadata.csv":
            ticker = file.split("_")[0]
            df = read_CSV_File(ticker, folderName)
            combined.append(df)

    if combined:
        master_df = pd.concat(combined, ignore_index=False)
        return master_df
    else:
        print("error occured")
        return None

In [11]:
def compute_percentile(table):
    indicators = ["OperatingMargin","ROE","DebtToEquity","Revenue_perShare","CashFlow_perShare","EBITDA_perShare","GrossProfit_perShare"]
    for indicator in indicators:
        table[f"{indicator}_Pct"]=table.groupby('Year')[indicator].rank(pct=True)
        table[f"{indicator}_Pct"]=table[f"{indicator}_Pct"].round(4)
    return table

In [12]:
## Preliminary Data Collection
meta_data = []
for ticker in TICKERS:
    print(f"Fetching data for {ticker}...")
    stock = yf.Ticker(ticker)

    # --- Prices ---
    df_price = stock.history(start=START_DATE, end=END_DATE,interval="1d")
    df_price.reset_index(inplace=True)
    df_price.to_csv(os.path.join(BASE_DIR, "prices", f"{ticker}_prices.csv"), index=False)

    # --- Fundamentals ---
    df_fund = get_fundamentals_history(ticker)
    df_fund.to_csv(os.path.join(BASE_DIR, "fundamentals", f"{ticker}_fundamentals.csv"), index=False)

    
    # --- Metadata for overview ---
    meta_data.append({
        "Ticker": ticker,
        "Name": stock.info.get("shortName", ""),
        "Sector": stock.info.get("sector", "")
    })

# === SAVE COMPANY LIST ===
df_meta = pd.DataFrame(meta_data)
df_meta.to_csv(os.path.join(BASE_DIR, "metadata", "company_list.csv"), index=False)   

print("Preliminary data collection complete!")

Fetching data for AAPL...
Fetching data for MSFT...
Fetching data for GOOGL...
Fetching data for AMZN...
Fetching data for META...
Fetching data for NVDA...
Fetching data for TSLA...
Fetching data for INTC...
Fetching data for AMD...
Fetching data for IBM...
Preliminary data collection complete!


In [13]:
for ticker in TICKERS:
    fund=read_CSV_File(ticker,"fundamentals")
    price=read_CSV_File(ticker,"prices")
    merged=merge_table(price,fund)
    merged["Company"]=ticker
    merged.to_csv(os.path.join(BASE_DIR, "metadata", f"{ticker}_metadata.csv"), index=True)

In [14]:
## estimating the missing values for Diluted EPS and the PE ratio for IBM
#estimate the EPS for 2024 using the currently shares outstanding as a proxy
ibm=read_CSV_File("IBM","metadata")
ibm_ticker= yf.Ticker("IBM")
ibm_fin=ibm_ticker.financials
net_income = ibm_fin["2024-12-31"]["Net Income"]
shares_outstanding = ibm_ticker.info.get("sharesOutstanding", None)
eps_estimate=net_income / shares_outstanding
# Estimating the PE ratio for 2024 using the EPS estimation from the previous step
hist_price=get_closest_price(ibm_ticker, pd.to_datetime("2024-12-31"))
pe_ratio = hist_price / eps_estimate

ibm['DilutedEPS'] =ibm['DilutedEPS'].fillna(eps_estimate)
ibm["DilutedEPS"] = ibm["DilutedEPS"].round(2)
ibm['PE'] = ibm['PE'].fillna(pe_ratio)
ibm['PE'] = ibm['PE'].round(5)
ibm.to_csv(os.path.join(BASE_DIR, "metadata","IBM_metadata.csv"), index=True)


## estimate the missing PE ratio for INTC using today's price
intc=read_CSV_File("INTC","metadata")
intc_ticker= yf.Ticker("INTC")
price=intc_ticker.history(period="1d")["Close"].iloc[-1]
eps=intc.iloc[-1]["DilutedEPS"]
pe=price/eps
intc["PE"]=intc["PE"].fillna(pe)
intc['PE'] = intc['PE'].round(5)
intc.to_csv(os.path.join(BASE_DIR, "metadata","INTC_metadata.csv"), index=True)


In [15]:
for ticker in TICKERS:
    table=read_CSV_File(ticker,"metadata")
    table=has_divident(table)
    table["DebtToEquity"]=table["DebtToEquity"].round(3)
    table["PE"]=table["PE"].round(6)
    table["ROE"]=table["ROE"].round(3)
    missing_value=missing_values(table)
    table=normalize_per_share(table)
    table.to_csv(os.path.join(BASE_DIR, "metadata", f"{ticker}_metadata.csv"), index=True)

Analyzing AAPL
AAPL has no missing values
Analyzing MSFT
MSFT has no missing values
Analyzing GOOGL
GOOGL has no missing values
Analyzing AMZN
AMZN has no missing values
Analyzing META
META has no missing values
Analyzing NVDA
NVDA has no missing values
Analyzing TSLA
TSLA has no missing values
Analyzing INTC
INTC has no missing values
Analyzing AMD
AMD has no missing values
Analyzing IBM
IBM has no missing values


In [16]:
master = combine_metadata_files()
master=compute_percentile(master)
master.to_csv(os.path.join(BASE_DIR, "metadata", "MASTER_metadata.csv"), index=True)