In [5]:
pip install yfinance pandas sqlalchemy

Note: you may need to restart the kernel to use updated packages.


In [1]:
import logging
import sqlite3
import time
from datetime import datetime, timedelta
from pathlib import Path

import pandas as pd
import yfinance as yf
from sqlalchemy import create_engine, text

In [3]:
# Logging setup
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s  %(levelname)-8s  %(message)s",
    handlers=[
        logging.FileHandler("pipeline.log"),
        logging.StreamHandler(),
    ],
)
log = logging.getLogger(__name__)

In [5]:
# Config
DB_PATH = "quant.db"
TICKER = "SPY"
PRICE_PERIOD = "2y"          # 2 years of daily price history
PRICE_INTERVAL = "1d"
MAX_EXPIRATIONS = 5          # how many option expiry dates to pull
RETRY_ATTEMPTS = 3
RETRY_DELAY = 5              # seconds between retries

In [7]:
# Database helpers 
def get_engine():
    engine = create_engine(f"sqlite:///{DB_PATH}", echo=False)
    return engine


def create_tables(engine):
    """Create all tables if they don't exist."""
    ddl = """
    CREATE TABLE IF NOT EXISTS price_history (
        id          INTEGER PRIMARY KEY AUTOINCREMENT,
        ticker      TEXT    NOT NULL,
        date        TEXT    NOT NULL,
        open        REAL,
        high        REAL,
        low         REAL,
        close       REAL,
        adj_close   REAL,
        volume      INTEGER,
        ingested_at TEXT    DEFAULT (datetime('now')),
        UNIQUE(ticker, date)
    );

    CREATE TABLE IF NOT EXISTS options_chain (
        id              INTEGER PRIMARY KEY AUTOINCREMENT,
        ticker          TEXT NOT NULL,
        expiration      TEXT NOT NULL,
        option_type     TEXT NOT NULL,   -- 'call' or 'put'
        strike          REAL NOT NULL,
        last_price      REAL,
        bid             REAL,
        ask             REAL,
        volume          INTEGER,
        open_interest   INTEGER,
        implied_vol     REAL,
        in_the_money    INTEGER,         -- boolean 0/1
        ingested_at     TEXT DEFAULT (datetime('now')),
        UNIQUE(ticker, expiration, option_type, strike)
    );

    CREATE TABLE IF NOT EXISTS etl_log (
        id          INTEGER PRIMARY KEY AUTOINCREMENT,
        run_time    TEXT DEFAULT (datetime('now')),
        ticker      TEXT,
        step        TEXT,
        status      TEXT,
        rows_loaded INTEGER,
        message     TEXT
    );
    """
    with engine.connect() as conn:
        for stmt in ddl.strip().split(";"):
            stmt = stmt.strip()
            if stmt:
                conn.execute(text(stmt))
        conn.commit()
    log.info("Tables verified / created.")


def log_etl_event(engine, ticker, step, status, rows=0, message=""):
    with engine.connect() as conn:
        conn.execute(
            text(
                "INSERT INTO etl_log (ticker, step, status, rows_loaded, message) "
                "VALUES (:ticker, :step, :status, :rows, :message)"
            ),
            {"ticker": ticker, "step": step, "status": status, "rows": rows, "message": message},
        )
        conn.commit()

In [9]:
# Retry wrapper
def retry_fetch(fn, *args, label="fetch", **kwargs):
    """Call fn(*args, **kwargs) up to RETRY_ATTEMPTS times on failure."""
    for attempt in range(1, RETRY_ATTEMPTS + 1):
        try:
            result = fn(*args, **kwargs)
            return result
        except Exception as exc:
            log.warning(f"[{label}] Attempt {attempt}/{RETRY_ATTEMPTS} failed: {exc}")
            if attempt < RETRY_ATTEMPTS:
                time.sleep(RETRY_DELAY)
    raise RuntimeError(f"[{label}] All {RETRY_ATTEMPTS} attempts failed.")

In [11]:
# Price history
def ingest_price_history(engine, ticker=TICKER):
    log.info(f"Ingesting price history for {ticker} ({PRICE_PERIOD})...")

    def _fetch():
        t = yf.Ticker(ticker)
        df = t.history(period=PRICE_PERIOD, interval=PRICE_INTERVAL, auto_adjust=False)
        return df

    try:
        df = retry_fetch(_fetch, label="price_history")
    except RuntimeError as e:
        log_etl_event(engine, ticker, "price_history", "FAILED", message=str(e))
        log.error(str(e))
        return pd.DataFrame()

    if df.empty:
        msg = f"No price data returned for {ticker}."
        log.warning(msg)
        log_etl_event(engine, ticker, "price_history", "WARN", message=msg)
        return df

    # Normalise columns
    df = df.reset_index()
    df.columns = [c.lower().replace(" ", "_") for c in df.columns]
    df["ticker"] = ticker

    # Convert date to string (SQLite-friendly)
    df["date"] = pd.to_datetime(df["date"]).dt.strftime("%Y-%m-%d")

    # Handle missing data
    missing_before = df.isnull().sum().sum()
    df[["open", "high", "low", "close", "adj_close"]] = (
        df[["open", "high", "low", "close", "adj_close"]]
        .ffill()          # forward-fill gaps (e.g. holidays)
        .bfill()          # back-fill any leading NaN
    )
    df["volume"] = df["volume"].fillna(0).astype(int)
    missing_after = df.isnull().sum().sum()
    if missing_before:
        log.info(f"  Filled {missing_before - missing_after} missing price values.")

    # Keep only the columns we need
    cols = ["ticker", "date", "open", "high", "low", "close", "adj_close", "volume"]
    df = df[[c for c in cols if c in df.columns]]

    # Upsert (INSERT OR REPLACE in SQLite)
    rows_loaded = 0
    with engine.connect() as conn:
        for _, row in df.iterrows():
            conn.execute(
                text(
                    "INSERT OR REPLACE INTO price_history "
                    "(ticker, date, open, high, low, close, adj_close, volume) "
                    "VALUES (:ticker, :date, :open, :high, :low, :close, :adj_close, :volume)"
                ),
                row.to_dict(),
            )
            rows_loaded += 1
        conn.commit()

    log.info(f"  Loaded {rows_loaded} price rows for {ticker}.")
    log_etl_event(engine, ticker, "price_history", "OK", rows=rows_loaded)
    return df

In [13]:
# Options chain 
def ingest_options_chain(engine, ticker=TICKER):
    log.info(f"Ingesting options chain for {ticker}...")

    def _get_ticker():
        return yf.Ticker(ticker)

    try:
        t = retry_fetch(_get_ticker, label="options_ticker")
        expirations = t.options
    except Exception as e:
        log_etl_event(engine, ticker, "options_chain", "FAILED", message=str(e))
        log.error(f"Could not fetch expirations: {e}")
        return pd.DataFrame()

    if not expirations:
        msg = "No option expirations found."
        log.warning(msg)
        log_etl_event(engine, ticker, "options_chain", "WARN", message=msg)
        return pd.DataFrame()

    # Limit to nearest N expirations
    expirations = expirations[:MAX_EXPIRATIONS]
    log.info(f"  Pulling {len(expirations)} expiration dates: {list(expirations)}")

    all_frames = []
    for exp in expirations:
        try:
            chain = retry_fetch(t.option_chain, exp, label=f"chain_{exp}")
            for opt_type, frame in [("call", chain.calls), ("put", chain.puts)]:
                frame = frame.copy()
                frame["ticker"] = ticker
                frame["expiration"] = exp
                frame["option_type"] = opt_type
                all_frames.append(frame)
        except Exception as e:
            log.warning(f"  Skipping expiration {exp}: {e}")
            continue

    if not all_frames:
        log.error("No options data collected.")
        log_etl_event(engine, ticker, "options_chain", "FAILED", message="No data")
        return pd.DataFrame()

    df = pd.concat(all_frames, ignore_index=True)

    # Normalise column names (yfinance uses camelCase)
    rename_map = {
        "contractSymbol": "contract_symbol",
        "lastPrice": "last_price",
        "openInterest": "open_interest",
        "impliedVolatility": "implied_vol",
        "inTheMoney": "in_the_money",
    }
    df = df.rename(columns=rename_map)

    # Handle missing values
    df["bid"] = df.get("bid", pd.Series(dtype=float)).fillna(0.0)
    df["ask"] = df.get("ask", pd.Series(dtype=float)).fillna(0.0)
    df["volume"] = df.get("volume", pd.Series(dtype=float)).fillna(0).astype(int)
    df["open_interest"] = df.get("open_interest", pd.Series(dtype=float)).fillna(0).astype(int)
    df["implied_vol"] = df.get("implied_vol", pd.Series(dtype=float)).fillna(0.0)
    df["in_the_money"] = df.get("in_the_money", pd.Series(dtype=bool)).fillna(False).astype(int)

    rows_loaded = 0
    with engine.connect() as conn:
        for _, row in df.iterrows():
            try:
                conn.execute(
                    text(
                        "INSERT OR REPLACE INTO options_chain "
                        "(ticker, expiration, option_type, strike, last_price, "
                        "bid, ask, volume, open_interest, implied_vol, in_the_money) "
                        "VALUES (:ticker, :expiration, :option_type, :strike, :last_price, "
                        ":bid, :ask, :volume, :open_interest, :implied_vol, :in_the_money)"
                    ),
                    {
                        "ticker": row["ticker"],
                        "expiration": row["expiration"],
                        "option_type": row["option_type"],
                        "strike": row.get("strike", None),
                        "last_price": row.get("last_price", None),
                        "bid": row.get("bid", None),
                        "ask": row.get("ask", None),
                        "volume": row.get("volume", None),
                        "open_interest": row.get("open_interest", None),
                        "implied_vol": row.get("implied_vol", None),
                        "in_the_money": row.get("in_the_money", None),
                    },
                )
                rows_loaded += 1
            except Exception as e:
                log.debug(f"  Row insert error (skipping): {e}")
        conn.commit()

    log.info(f"  Loaded {rows_loaded} option rows for {ticker}.")
    log_etl_event(engine, ticker, "options_chain", "OK", rows=rows_loaded)
    return df

In [15]:
# Entry point 
def run_etl():
    log.info("=" * 60)
    log.info("ETL PIPELINE START")
    log.info("=" * 60)

    engine = get_engine()
    create_tables(engine)

    price_df = ingest_price_history(engine, TICKER)
    options_df = ingest_options_chain(engine, TICKER)

    log.info("=" * 60)
    log.info(f"ETL COMPLETE — Price rows: {len(price_df)}, Option rows: {len(options_df)}")
    log.info("=" * 60)
    return price_df, options_df


if __name__ == "__main__":
    run_etl()

2026-02-17 22:30:25,864  INFO      ETL PIPELINE START
2026-02-17 22:30:25,887  INFO      Tables verified / created.
2026-02-17 22:30:25,888  INFO      Ingesting price history for SPY (2y)...
2026-02-17 22:30:27,098  INFO        Loaded 500 price rows for SPY.
2026-02-17 22:30:27,101  INFO      Ingesting options chain for SPY...
2026-02-17 22:30:27,272  INFO        Pulling 5 expiration dates: ['2026-02-18', '2026-02-19', '2026-02-20', '2026-02-23', '2026-02-24']
2026-02-17 22:30:28,353  INFO        Loaded 1046 option rows for SPY.
2026-02-17 22:30:28,354  INFO      ETL COMPLETE — Price rows: 500, Option rows: 1046
