In [1]:
#!/usr/bin/env python3
"""
stage04_ingest.py

Single-file ingestion workflow for:
- API pull (Alpha Vantage preferred; yfinance fallback)
- Web scrape (Wikipedia S&P 500 constituents)
- Validation, saving to data/raw/
- Documentation written to notebooks/

Usage:
    python stage04_ingest.py

Ensure you have an optional .env in the repo root with:
    API_KEY=your_alpha_vantage_key
    TICKER=AAPL
If you do not create a .env, the script will fall back to yfinance.
"""

import os
import sys
import subprocess
import datetime
import time
import io
import textwrap

# ---------- Helper: ensure required packages ----------
REQUIRED = ["requests", "python-dotenv", "pandas", "beautifulsoup4", "yfinance", "lxml"]
def ensure_packages(pkgs):
    import importlib
    missing = []
    for p in pkgs:
        try:
            importlib.import_module(p if p != "python-dotenv" else "dotenv")
        except Exception:
            missing.append(p)
    if missing:
        print("Installing missing packages:", missing)
        cmd = [sys.executable, "-m", "pip", "install"] + missing
        subprocess.check_call(cmd)
        print("Installation finished.")

ensure_packages(REQUIRED)

# ---------- Imports (after ensuring packages) ----------
import requests
import pandas as pd
from bs4 import BeautifulSoup
from dotenv import load_dotenv
import yfinance as yf

# ---------- Paths & timestamps ----------
ROOT = os.getcwd()
RAW_DIR = os.path.join(ROOT, "data", "raw")
NOTEBOOKS_DIR = os.path.join(ROOT, "notebooks")
os.makedirs(RAW_DIR, exist_ok=True)
os.makedirs(NOTEBOOKS_DIR, exist_ok=True)

TS = datetime.datetime.now().strftime("%Y%m%d-%H%M")
LOG_PATH = os.path.join(RAW_DIR, f"ingestion_log_{TS}.txt")

# ---------- Create .env.example if not present ----------
ENV_EXAMPLE_PATH = os.path.join(ROOT, ".env.example")
if not os.path.exists(ENV_EXAMPLE_PATH):
    with open(ENV_EXAMPLE_PATH, "w") as f:
        f.write("API_KEY=dummy_key_123\nTICKER=AAPL\n")
    print(f"Created .env.example at {ENV_EXAMPLE_PATH} (use this to create your local .env)")

# ---------- Load environment ----------
load_dotenv()  # loads .env if present in working directory
import os
API_KEY = os.getenv("API_KEY")  # Alpha Vantage / other
TICKER = os.getenv("TICKER", "AAPL")

# ---------- Utility: write log ----------
def log(msg, to_console=True):
    ts = datetime.datetime.now().isoformat()
    line = f"[{ts}] {msg}"
    if to_console:
        print(line)
    with open(LOG_PATH, "a", encoding="utf-8") as fh:
        fh.write(line + "\n")

log("Starting ingestion workflow")

# ---------- Validation helpers ----------
def validate_api_df(df: pd.DataFrame):
    """
    Validate API DataFrame:
    - ensure date column exists and is datetime
    - ensure numeric columns (open, high, low, close, volume) exist
    - report NA counts and shape
    Returns (ok: bool, diagnostics: dict)
    """
    diag = {}
    ok = True

    # Normalize column names to lower for checking
    cols_lower = {c.lower(): c for c in df.columns}
    diag['columns'] = list(df.columns)
    log(f"API DataFrame columns: {diag['columns']}")

    # find date column
    date_col = None
    for candidate in ["timestamp", "date", "datetime", "time", "trade_time"]:
        if candidate in cols_lower:
            date_col = cols_lower[candidate]
            break
    if date_col is None:
        # maybe index is datetime; try to reset_index
        if isinstance(df.index, pd.DatetimeIndex):
            df = df.reset_index()
            if isinstance(df.columns[0], str):
                date_col = df.columns[0]
    if date_col is None:
        log("Validation error: no date/timestamp column found")
        ok = False
        diag['date_error'] = "no date column"
    else:
        df[date_col] = pd.to_datetime(df[date_col], errors="coerce")
        if df[date_col].isna().any():
            log("Warning: some date parsing failed (NA present)")
        diag['date_col'] = date_col

    # required numeric columns (common names)
    required = ["open", "high", "low", "close", "volume"]
    found_required = []
    for req in required:
        # search ignoring case in columns
        matches = [c for c in df.columns if c.lower() == req]
        if matches:
            found_required.append(matches[0])
        else:
            # also accept variations like 'adj close'
            if req == "close":
                alt = [c for c in df.columns if "close" in c.lower()]
                if alt:
                    found_required.append(alt[0])
                    continue
            diag[f"missing_{req}"] = True
            ok = ok and False

    diag['found_required'] = found_required
    diag['shape'] = df.shape
    diag['na_counts'] = df.isna().sum().to_dict()

    return ok, diag, df

def validate_scrape_df(df: pd.DataFrame, required_cols=None):
    diag = {}
    ok = True
    diag['columns'] = list(df.columns)
    if required_cols:
        missing = [c for c in required_cols if c not in df.columns]
        if missing:
            diag['missing'] = missing
            ok = False
    diag['na_counts'] = df.isna().sum().to_dict()
    diag['shape'] = df.shape
    return ok, diag

# ---------- API Pull ----------
def fetch_via_alphavantage(ticker, apikey, outputsize="compact"):
    """
    Attempt to fetch daily adjusted time series as CSV from Alpha Vantage.
    Returns DataFrame on success, or raises.
    """
    base = "https://www.alphavantage.co/query"
    params = {
        "function": "TIME_SERIES_DAILY_ADJUSTED",
        "symbol": ticker,
        "outputsize": outputsize,
        "datatype": "csv",
        "apikey": apikey
    }
    headers = {"User-Agent": "stage04-ingest-script/1.0 (python requests)"}
    r = requests.get(base, params=params, headers=headers, timeout=30)
    if r.status_code != 200:
        raise RuntimeError(f"AlphaVantage request failed: {r.status_code}")
    # the CSV returned has columns like timestamp, open, high, low, close, adjusted close, volume, etc
    df = pd.read_csv(io.StringIO(r.text))
    return df

def fetch_via_yfinance(ticker, period="2y", interval="1d"):
    """
    Fetch history via yfinance
    """
    t = yf.Ticker(ticker)
    hist = t.history(period=period, interval=interval, auto_adjust=False)
    if hist.empty:
        raise RuntimeError("yfinance returned empty DataFrame")
    df = hist.reset_index()  # Date -> column
    # rename Date->timestamp for compatibility
    if "Date" in df.columns:
        df = df.rename(columns={"Date": "timestamp"})
    return df

api_df = None
api_source = None
api_ok = False
api_diag = {}

if API_KEY:
    log("API_KEY found in .env; attempting Alpha Vantage")
    try:
        api_df = fetch_via_alphavantage(TICKER, API_KEY)
        api_source = "alphavantage"
        ok, diag, api_df = validate_api_df(api_df)
        api_ok = ok
        api_diag = diag
        if ok:
            log("Alpha Vantage fetch & validation OK")
        else:
            log("Alpha Vantage fetch completed but validation failed; details in log")
    except Exception as e:
        log(f"Alpha Vantage fetch failed: {e}. Will fallback to yfinance.")
        API_KEY = None  # force fallback

if api_df is None:
    log("Using yfinance fallback for API pull")
    try:
        api_df = fetch_via_yfinance(TICKER)
        api_source = "yfinance"
        ok, diag, api_df = validate_api_df(api_df)
        api_ok = ok
        api_diag = diag
        if ok:
            log("yfinance fetch & validation OK")
        else:
            log("yfinance fetch completed but validation flagged issues")
    except Exception as e:
        log(f"yfinance fetch failed: {e}")
        api_df = None

# Save API CSV if we have it
if api_df is not None:
    api_filename = f"api_{api_source}_{TICKER}_{TS}.csv"
    api_path = os.path.join(RAW_DIR, api_filename)
    # ensure date column normalized to 'timestamp' if possible
    cols_lower = {c.lower(): c for c in api_df.columns}
    if "timestamp" in cols_lower:
        # ok
        pass
    else:
        # find a date-like column and rename to timestamp
        for candidate in ["date", "datetime"]:
            if candidate in cols_lower:
                api_df = api_df.rename(columns={cols_lower[candidate]: "timestamp"})
                break
    # coerce numeric columns to floats where possible
    for c in api_df.columns:
        if c.lower() not in ("timestamp", "date", "datetime", "symbol"):
            try:
                api_df[c] = pd.to_numeric(api_df[c], errors="ignore")
            except Exception:
                pass
    api_df.to_csv(api_path, index=False)
    log(f"Saved API CSV to {api_path}")
    # write diagnostics
    with open(os.path.join(RAW_DIR, f"api_diag_{TS}.txt"), "w", encoding="utf-8") as fh:
        fh.write("Source: " + (api_source or "none") + "\n")
        fh.write("Ticker: " + TICKER + "\n")
        fh.write("Validation diag:\n")
        fh.write(textwrap.indent(str(api_diag), "  "))
else:
    log("No API data available; skipping API CSV save")

# ---------- Scrape small table (Wikipedia S&P 500 constituents) ----------
SCRAPE_URL = "https://en.wikipedia.org/wiki/List_of_S%26P_500_companies"
scrape_df = None
scrape_diag = {}

try:
    headers = {"User-Agent": "stage04-ingest-script/1.0 (python requests)"}
    resp = requests.get(SCRAPE_URL, headers=headers, timeout=30)
    resp.raise_for_status()
    soup = BeautifulSoup(resp.text, "lxml")

    # Best guess: look for table with id 'constituents' (this matches current WP page)
    table = soup.find("table", {"id": "constituents"})
    if table is None:
        # Fallback: first wikitable sortable
        table = soup.find("table", {"class": "wikitable sortable"})
    if table is None:
        raise RuntimeError("Couldn't find a suitable table on the page")

    # parse headers
    headers_row = table.find("tr")
    headers_cols = [th.get_text(strip=True) for th in headers_row.find_all("th")]
    # parse data rows
    rows = []
    for tr in table.find_all("tr")[1:]:
        cols = [td.get_text(strip=True) for td in tr.find_all(["td", "th"])]
        if len(cols) == 0:
            continue
        # pad if needed
        if len(cols) < len(headers_cols):
            cols += [""] * (len(headers_cols) - len(cols))
        rows.append(cols[:len(headers_cols)])
    scrape_df = pd.DataFrame(rows, columns=headers_cols)

    # quick cleaning: common column rename (Symbol -> Symbol)
    # Validate that we have a symbol/ticker column
    possible_symbol_cols = [c for c in scrape_df.columns if c.lower() in ("symbol", "ticker", "ticker symbol")]
    if not possible_symbol_cols:
        # try to guess first column as symbol
        scrape_df = scrape_df.rename(columns={scrape_df.columns[0]: "Symbol"})
    # Validate required columns
    required = ["Symbol", "Security"]
    ok, diag = validate_scrape_df(scrape_df, required_cols=required)
    scrape_diag = diag
    if ok:
        log("Scrape validation OK")
    else:
        log(f"Scrape validation issues: {diag.get('missing', [])}")

    # Save scrape CSV
    scrape_filename = f"scrape_wikipedia_sp500_constituents_{TS}.csv"
    scrape_path = os.path.join(RAW_DIR, scrape_filename)
    scrape_df.to_csv(scrape_path, index=False)
    log(f"Saved scraped CSV to {scrape_path}")

    # Save scrape diagnostics
    with open(os.path.join(RAW_DIR, f"scrape_diag_{TS}.txt"), "w", encoding="utf-8") as fh:
        fh.write("Source URL: " + SCRAPE_URL + "\n")
        fh.write("Validation diag:\n")
        fh.write(textwrap.indent(str(scrape_diag), "  "))
except Exception as e:
    log(f"Scraping failed: {e}")

# ---------- Documentation & notebook note ----------
doc_path = os.path.join(NOTEBOOKS_DIR, "stage04_ingestion_notes.md")
doc_text = f"""\
# Stage 04 — Data Acquisition & Ingestion (auto-generated notes)

**Timestamp:** {datetime.datetime.now().isoformat()}

## API ingestion
- Preferred source: Alpha Vantage (CSV) using `TIME_SERIES_DAILY_ADJUSTED` endpoint.
- Fallback: `yfinance` Python package.
- Config / env:
  - .env (local, not committed): API_KEY (Alpha Vantage key), TICKER (e.g., AAPL)
  - .env.example is present in repo root.
- Example Alpha Vantage params used:
  - function=TIME_SERIES_DAILY_ADJUSTED, symbol={TICKER}, outputsize=compact, datatype=csv
- Validations performed:
  - check presence of a date/timestamp column and convert to datetime
  - check for numeric columns: open, high, low, close, volume (or nearest variants)
  - report NA counts and DataFrame shape
- Output file: data/raw/api_{{source}}_{TICKER}_{TS}.csv

## Scraping ingestion
- Source URL: {SCRAPE_URL}
- Table: S&P 500 constituents (table id='constituents' or first wikitable sortable)
- Validation:
  - required columns: Symbol, Security
  - check NA counts and shape
- Output file: data/raw/scrape_wikipedia_sp500_constituents_{TS}.csv

## Assumptions & Risks
- Alpha Vantage has rate limits (5 calls/minute for free tier). Use API key responsibly.
- Wikipedia's HTML structure can change; the scraper uses common selectors and falls back to generic table selection.
- Do NOT commit `.env` with real API keys. Use `.env.example` in your repo.
- Data freshness depends on API / source availability.

## Files created by this run
- API CSV, scrape CSV in `data/raw/`
- diagnostics text files in `data/raw/`
- ingestion log: {os.path.join(RAW_DIR, os.path.basename(LOG_PATH))}
"""

with open(doc_path, "w", encoding="utf-8") as fh:
    fh.write(doc_text)
log(f"Wrote ingestion documentation to {doc_path}")

# ---------- Final summary printed ----------
log("Ingestion workflow completed")
summary_lines = [
    "Summary of outputs:",
]
# list created files in raw dir for this timestamp
for fn in os.listdir(RAW_DIR):
    if TS in fn:
        summary_lines.append(f" - {os.path.join(RAW_DIR, fn)}")
for line in summary_lines:
    log(line)

# end of script

Installing missing packages: ['beautifulsoup4']
Installation finished.
Created .env.example at c:\Users\sarda\Desktop\bootcamp_darshit_sarda\homework\.env.example (use this to create your local .env)
[2025-08-26T10:00:58.906969] Starting ingestion workflow
[2025-08-26T10:00:58.908963] Using yfinance fallback for API pull
[2025-08-26T10:00:59.694449] API DataFrame columns: ['timestamp', 'Open', 'High', 'Low', 'Close', 'Adj Close', 'Volume', 'Dividends', 'Stock Splits']
[2025-08-26T10:00:59.696647] yfinance fetch & validation OK
[2025-08-26T10:00:59.708207] Saved API CSV to c:\Users\sarda\Desktop\bootcamp_darshit_sarda\homework\data\raw\api_yfinance_AAPL_20250826-1000.csv
[2025-08-26T10:01:00.306170] Scrape validation OK
[2025-08-26T10:01:00.309389] Saved scraped CSV to c:\Users\sarda\Desktop\bootcamp_darshit_sarda\homework\data\raw\scrape_wikipedia_sp500_constituents_20250826-1000.csv
[2025-08-26T10:01:00.312100] Wrote ingestion documentation to c:\Users\sarda\Desktop\bootcamp_darshit_s