# 📥 Ingestion & Exploration Notebook (`ingest_explore.ipynb`)

**Project:** Market Forecast Capstone  
**Purpose:**  
- Initialize data infrastructure for the project  
- Create and connect to our local SQLite database (`data/market_data.db`)  
- Define table schemas (prices, macro, news, etc.)  
- Prepare for safe, rate-limited ingestion from external APIs

**This notebook does NOT train models.**  
This is the data layer: pull, align, store, sanity check.

**Key ideas:**  
- All timestamps aligned to America/New_York  
- Trading calendar = NYSE (no weekends, no holidays in final merged view)  
- We keep raw API payloads for reproducibility  
- We build daily tables that can be joined cleanly by `date`


## ⚖️ Data Access & Rate Limit Policy

We will be using:
- **Alpha Vantage** (market data, SPY + ~100 tickers, VIX)
  - Free tier historically allows ~5 calls/minute and ~500 calls/day.
  - We MUST respect pacing. We'll batch symbols and sleep between calls.

- **FRED (macro data)**
  - Very lenient. We can pull full history in one request per series.

- **The Guardian API / Alpha Vantage news**
  - We'll pull historical headlines in pages.
  - We only keep headlines in the window *(prev day 16:00 ET → current day 09:29 ET)* for each trading day.

Principles for ingestion:
1. We never spam an API in a tight loop with no delay.
2. We log when/what we pulled (source, timestamp, ticker).
3. We save raw responses under `raw/` so we can reproduce later.
4. SQLite is our source of truth for cleaned, daily-aligned data.


In [None]:
# If you're in Colab, start by configuring git identity
!git config --global user.name "JoeGiwa"
!git config --global user.email "joebot17@gmail.com"

# Clone your repo (replace with your actual URL)
!git clone https://github.com/JoeGiwa/market-forecast-capstone

# Move into it
%cd market-forecast-capstone

# (Optional) create a new branch for ingestion work
!git checkout -b feature/ingest_explore


In [None]:
# Install all required dependencies
!pip install -r requirements.txt


In [None]:
!cp .env.template .env

In [None]:
import os
import pathlib
import sqlite3
from datetime import datetime

# Load environment variables & verify keys
from src.utils.env_loader import load_env, check_env
load_env()
check_env()

# --- Project paths ---
ROOT_DIR = pathlib.Path.cwd()  # assuming you're running from repo root in Colab
DATA_DIR = ROOT_DIR / "data"
RAW_DIR = ROOT_DIR / "raw"
DB_PATH = DATA_DIR / "market_data.db"

# make sure needed folders exist
DATA_DIR.mkdir(exist_ok=True, parents=True)
RAW_DIR.mkdir(exist_ok=True, parents=True)

print("📂 Project root:", ROOT_DIR)
print("📂 Data dir:", DATA_DIR)
print("📂 Raw dir:", RAW_DIR)
print("💾 DB path will be:", DB_PATH)

# We'll open the SQLite connection later after we define schemas


## 🗄 Database Plan (SQLite @ `data/market_data.db`)

We will create the following core tables:

### 1. `prices`
Daily market data for SPY, VIX, and selected S&P 500 names  
Columns (initial draft):
- `date` (TEXT, 'YYYY-MM-DD')
- `symbol` (TEXT, e.g. 'SPY')
- `open`, `high`, `low`, `close`, `volume` (REAL/INTEGER)
- `adjusted_close` (REAL)
- `source` (TEXT, e.g. 'alpha_vantage')
- PRIMARY KEY (`date`, `symbol`)

Later we will compute features off of this (RSI, SMA20, etc.).

---

### 2. `macro`
Macro + rates data from FRED, forward-filled to each trading date
- `date` (TEXT)
- `cpi` (REAL)
- `unemployment_rate` (REAL)
- `fed_funds_rate` (REAL)
- `term_spread` (REAL)  -- (10y - 3m yield)
- PRIMARY KEY (`date`)

---

### 3. `news_raw`
Raw news/headline entries (Guardian or Alpha Vantage news API)
- `id` (TEXT UNIQUE)          -- headline/article ID if available
- `published_utc` (TEXT)      -- original timestamp
- `published_et` (TEXT)       -- converted to America/New_York
- `headline` (TEXT)
- `section` (TEXT)
- `source` (TEXT)             -- 'guardian' or 'alpha_vantage'
- `raw_json` (TEXT)           -- store raw payload for reproducibility

We don't model directly from this table. We aggregate daily sentiment next.

---

### 4. `news_daily`
Daily pre-market sentiment features aligned to each trading day
- `date` (TEXT)
- `headline_count` (INTEGER)
- `sent_mean` (REAL)
- `sent_median` (REAL)
- `sent_std` (REAL)
- `pct_pos` (REAL)
- `pct_neg` (REAL)
- `no_news_day` (INTEGER 0/1)
- (optional) section-level stats, e.g. `sent_business_mean`
- PRIMARY KEY (`date`)

---

### 5. `politics`
Political / structural regime flags
- `date` (TEXT)
- `pres_dem` (INTEGER 0/1)
- `is_election_day` (INTEGER 0/1)
- `is_inauguration_day` (INTEGER 0/1)
- `is_shutdown_window` (INTEGER 0/1)
- PRIMARY KEY (`date`)

These will be optional in modeling, but we store them here.

---

### 6. `targets`
Future returns we are trying to predict
- `date` (TEXT)
- `target_return_1d` (REAL)
- `updown_1d` (INTEGER 0/1)
- `target_return_30d` (REAL)
- `updown_30d` (INTEGER 0/1)
- `target_return_60d` (REAL)
- `updown_60d` (INTEGER 0/1)
- PRIMARY KEY (`date`)

---

### 7. `features`
Engineered daily features (technical indicators, breadth, volatility structure, Prophet residuals, etc.)
- `date` (TEXT PRIMARY KEY)
- Columns to be added later:
  - technical indicators (SMA20, RSI14, MACD, Bollinger, etc.)
  - breadth stats from top 50 / bottom 50 tickers
  - VIX level and ΔVIX
  - rolling correlations / autocorr
  - macro-aligned lagged fields if we decide to include them here
  - Prophet trend / residual

---

### 8. `ingestion_log`
Audit trail for reproducibility (optional but good practice)
- `timestamp_utc` (TEXT)
- `source` (TEXT)           -- 'alpha_vantage', 'fred', 'guardian'
- `action` (TEXT)           -- 'fetch_prices', 'fetch_macro', etc.
- `notes` (TEXT)
- no primary key required; we append

This lets you prove when/how data was added, which helps in grading and in interviews.


In [None]:
import sqlite3

# Connect to SQLite (will create the file if it doesn't exist)
conn = sqlite3.connect(DB_PATH)
cur = conn.cursor()

# 1. prices table
cur.execute("""
CREATE TABLE IF NOT EXISTS prices (
    date TEXT NOT NULL,
    symbol TEXT NOT NULL,
    open REAL,
    high REAL,
    low REAL,
    close REAL,
    adjusted_close REAL,
    volume REAL,
    source TEXT,
    PRIMARY KEY (date, symbol)
);
""")

# 2. macro table
cur.execute("""
CREATE TABLE IF NOT EXISTS macro (
    date TEXT PRIMARY KEY,
    cpi REAL,
    unemployment_rate REAL,
    fed_funds_rate REAL,
    term_spread REAL
);
""")

# 3. news_raw table
cur.execute("""
CREATE TABLE IF NOT EXISTS news_raw (
    id TEXT PRIMARY KEY,
    published_utc TEXT,
    published_et TEXT,
    headline TEXT,
    section TEXT,
    source TEXT,
    raw_json TEXT
);
""")

# 4. news_daily table
cur.execute("""
CREATE TABLE IF NOT EXISTS news_daily (
    date TEXT PRIMARY KEY,
    headline_count INTEGER,
    sent_mean REAL,
    sent_median REAL,
    sent_std REAL,
    pct_pos REAL,
    pct_neg REAL,
    no_news_day INTEGER
    -- optional: add section-specific columns later
);
""")

# 5. politics table
cur.execute("""
CREATE TABLE IF NOT EXISTS politics (
    date TEXT PRIMARY KEY,
    pres_dem INTEGER,
    is_election_day INTEGER,
    is_inauguration_day INTEGER,
    is_shutdown_window INTEGER
);
""")

# 6. targets table
cur.execute("""
CREATE TABLE IF NOT EXISTS targets (
    date TEXT PRIMARY KEY,
    target_return_1d REAL,
    updown_1d INTEGER,
    target_return_30d REAL,
    updown_30d INTEGER,
    target_return_60d REAL,
    updown_60d INTEGER
);
""")

# 7. features table
cur.execute("""
CREATE TABLE IF NOT EXISTS features (
    date TEXT PRIMARY KEY
    -- feature columns (SMA20, RSI14, MACD, etc.) will be added later via ALTER TABLE
);
""")

# 8. ingestion_log table
cur.execute("""
CREATE TABLE IF NOT EXISTS ingestion_log (
    timestamp_utc TEXT,
    source TEXT,
    action TEXT,
    notes TEXT
);
""")

conn.commit()
conn.close()

print("✅ SQLite database initialized at", DB_PATH)


## 🗃️ Database Schema and Incremental Update Strategy

This notebook initializes the unified SQLite database (`data/market_data.db`) used throughout the capstone.  
It consolidates **market**, **macroeconomic**, **news/sentiment**, and **political** data into one warehouse for simplicity, robustness, and efficient feature joins.

### Schema Overview
| Table | Purpose | Primary Keys | Example Columns |
|--------|----------|---------------|-----------------|
| `prices` | Historical market data for S&P 500 tickers | `(date, symbol)` | open, high, low, close, volume, sma_20, rsi_14, etc. |
| `macro` | FRED macroeconomic indicators | `date` | cpi, unemployment, fed_funds, term_spread |
| `news_raw` | Raw Guardian API articles | `id` | published_at, title, url, sentiment_score |
| `news_daily` | Aggregated daily sentiment features | `date` | mean_sentiment, article_count |
| `politics` | Flags for key political/economic events | `date` | pres_dem, is_election_day, is_shutdown_window |
| `features` | Engineered model-ready dataset | `(date, symbol)` | merged from prices, macro, and sentiment |
| `targets` | Prediction targets (e.g. next-day or 30-day returns) | `(date, symbol, horizon)` | return_value, class_label |
| `ingestion_log` | Record of data pulls | `id` | source, action, timestamp, notes |

### Incremental Update Strategy
Each table uses unique primary keys to prevent duplicates and enable incremental ingestion:
- **Prices:** upsert by `(date, symbol)`
- **Macro:** upsert by `date`
- **News:** upsert by `id` (article GUID)
- **Logs:** append-only with timestamp

This design allows:
- Partial backfills (e.g., new tickers)
- Regular updates (e.g., daily pipeline)
- Historical reproducibility (via ingestion_log)


## 🔄 Step 3: Ingestion Layer Setup

We now prepare the helper functions that every data source (Alpha Vantage, FRED, Guardian) will use.

### Why this matters
- We want **incremental updates** (don’t re-pull 25 years every time).
- We want **clean inserts** into SQLite without creating duplicates.
- We want **traceability** (what we pulled, when, from which source).

### Core helper functions we’re about to define
1. `get_connection()`  
   - Opens a connection to `data/market_data.db`

2. `upsert_prices(df)` (pattern for all tables later)  
   - Inserts new rows into the `prices` table using `(date, symbol)` as the primary key.
   - If a row already exists for that `(date, symbol)`, we skip or replace it.

   We'll build this with SQL `INSERT OR REPLACE`, so you can safely rerun the same ingestion without duplicates.

3. `log_ingestion(source, action, notes="")`  
   - Appends to `ingestion_log`
   - Lets us record what symbols, date ranges, etc. we pulled

### Alpha Vantage rate limit notes
- Free tier is historically ~5 calls/minute and ~500 calls/day.
- We will respect this by:
  - pulling one ticker at a time,
  - inserting it,
  - sleeping between tickers (we'll add the sleep in a later cell),
  - using incremental logic to only request *missing* dates.

We'll start ingestion with SPY first, test the pipeline, then scale out to VIX and the 100 constituent tickers.


In [None]:
# === DB Utilities (canonical) ===

from datetime import datetime, timezone

import sqlite3

import pandas as pd

from pathlib import Path

# Use the same DB_PATH defined in your Paths cell:

# DB_PATH = ROOT_DIR / "data" / "market_data.db"

def get_connection():

    return sqlite3.connect(DB_PATH)

def log_ingestion(source: str, action: str, notes: str = ""):

    ts = datetime.now(timezone.utc).isoformat()

    with get_connection() as conn:

        conn.execute(

            "INSERT INTO ingestion_log (timestamp_utc, source, action, notes) VALUES (?, ?, ?, ?)",

            (ts, source, action, notes),

        )

    print(f"✅ Logged ingestion: {source} | {action} | {ts} | {notes}")

def get_last_date_for_symbol(symbol: str):

    with get_connection() as conn:

        row = conn.execute(

            "SELECT MAX(date) FROM prices WHERE symbol = ?;",

            (symbol,)

        ).fetchone()

    return row[0] if row and row[0] is not None else None

def upsert_prices(df: pd.DataFrame):

    required = ["date","symbol","open","high","low","close","volume","adjusted_close","source"]

    for col in required:

        if col not in df.columns:

            raise ValueError(f"Missing required column: {col}")

    df = df.copy()

    df["date"] = pd.to_datetime(df["date"]).dt.strftime("%Y-%m-%d")

    rows = list(df[required].itertuples(index=False, name=None))

    with get_connection() as conn:

        conn.executemany("""

            INSERT OR REPLACE INTO prices

            (date, symbol, open, high, low, close, volume, adjusted_close, source)

            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)

        """, rows)

    print(f"✅ Upserted {len(rows)} rows into prices.")



In [None]:
# --- Inspect all table schemas ---
def inspect_table_schema(connection, table_name):
    print(f"\n🧩 Schema for '{table_name}':")
    cursor = connection.cursor()
    cursor.execute(f"PRAGMA table_info({table_name});")
    rows = cursor.fetchall()
    for col in rows:
        print(f"  {col[1]:<25} | {col[2]:<10} | {'PK' if col[5] else ''}")
    print("-" * 60)

for t in ["prices", "macro", "news_raw", "news_daily", "politics", "features", "targets", "ingestion_log"]:
    inspect_table_schema(conn, t)


In [None]:
def ingest_symbol_alpaca(symbol: str, start_year: int = 2016) -> int:

    import os, time, requests

    import pandas as pd

    from datetime import timezone

    APCA_KEY_ID = os.getenv("APCA_API_KEY_ID")

    APCA_API_SECRET = os.getenv("APCA_API_SECRET_KEY")

    if not APCA_KEY_ID or not APCA_API_SECRET:

        raise RuntimeError("Missing Alpaca keys in .env")

    headers = {"APCA-API-KEY-ID": APCA_KEY_ID, "APCA-API-SECRET-KEY": APCA_API_SECRET}

    def iso(dt):

        return pd.to_datetime(dt, utc=True).strftime("%Y-%m-%dT%H:%M:%SZ")

    def fetch(symbol, start_dt, end_dt, limit_per_page=10000, sleep_sec=0.15):

        url = f"https://data.alpaca.markets/v2/stocks/{symbol}/bars"

        params = {"timeframe":"1Day","start":iso(start_dt),"end":iso(end_dt),"limit":limit_per_page,"feed":"iex"}

        all_rows, token = [], None

        while True:

            if token: params["page_token"] = token

            r = requests.get(url, headers=headers, params=params, timeout=60)

            if r.status_code != 200:

                raise RuntimeError(f"HTTP {r.status_code}: {r.text}")

            payload = r.json()

            for b in payload.get("bars", []):

                all_rows.append({

                    "date": pd.to_datetime(b["t"]).date(),

                    "open": float(b["o"]), "high": float(b["h"]),

                    "low": float(b["l"]), "close": float(b["c"]),

                    "volume": float(b["v"]), "adjusted_close": None,

                })

            token = payload.get("next_page_token")

            if not token: break

            time.sleep(sleep_sec)

        df = pd.DataFrame(all_rows)

        if df.empty: return df

        df["date"] = pd.to_datetime(df["date"])

        return df.sort_values("date").reset_index(drop=True)

    # incremental window (with 2016 clamp)
    last = get_last_date_for_symbol(symbol)
    clamp = pd.Timestamp(f"{start_year}-01-01", tz="UTC")

    # tz-aware start
    start_dt = (
        pd.to_datetime(last).tz_localize("UTC") + pd.Timedelta(days=1)
    ) if last else clamp

    # tz-aware end (always set, regardless of branch)
    end_dt = pd.Timestamp.now(tz="UTC")

    # ✅ Guard: nothing to fetch if we're already up to date
    if start_dt.date() > end_dt.date():
      print(f"ℹ️ {symbol}: already up to date (start={start_dt.date()} > end={end_dt.date()}).")
      return 0


    raw = fetch(symbol, start_dt, end_dt)
    if raw.empty:
      print(f"ℹ️ No new data for {symbol}.")
      return 0

    raw["symbol"] = symbol

    raw["source"] = "alpaca"

    final_df = raw[["date","symbol","open","high","low","close","volume","adjusted_close","source"]].copy()

    upsert_prices(final_df)

    inserted = len(final_df)

    log_ingestion("alpaca", "fetch_prices", f"symbol={symbol} range={final_df['date'].min().date()}→{final_df['date'].max().date()} rows={inserted}")

    print(f"✅ Done: {symbol} — inserted {inserted} rows")

    return inserted

# Call it on a symbol NOT yet in DB so you see output:

rows = ingest_symbol_alpaca("AAPL")

print("Inserted rows:", rows)



In [None]:
for sym in ["AAPL", "SPY"]:
    ingest_symbol_alpaca(sym, start_year=2016)

In [None]:
notes = f"symbol=SPY range={final_df['date'].min().date()}→{final_df['date'].max().date()} rows={len(final_df)}"
log_ingestion(source="alpaca", action="fetch_prices", notes=notes)


In [None]:
# Confirm rows landed
import pandas as pd
conn = get_connection()
print(pd.read_sql_query("SELECT COUNT(*) AS n FROM prices WHERE symbol='SPY';", conn))
print(pd.read_sql_query("SELECT * FROM ingestion_log ORDER BY rowid DESC LIMIT 3;", conn))
conn.close()


## 📈 Step 4 (Alt): Ingest SPY from **Alpaca** (free tier)

We’re pivoting to Alpaca’s free historical API for daily bars:

- Endpoint: `GET https://data.alpaca.markets/v2/stocks/{symbol}/bars`
- Auth: request headers  
  - `APCA-API-KEY-ID`  
  - `APCA-API-SECRET-KEY`
- Params:
  - `timeframe=1Day`
  - `start` / `end` (ISO 8601, UTC)
  - Pagination via `page_token`

We’ll:
- Use incremental logic (start from the day **after** our latest DB date if present; else from `DATA_START_YEAR`).
- Map Alpaca fields → our `prices` schema:
  - `t` → `date`
  - `o` → `open`
  - `h` → `high`
  - `l` → `low`
  - `c` → `close`
  - `v` → `volume`
  - `adjusted_close` → `None` (not provided by the free endpoint)
  - `source` → `"alpaca"`

Then we’ll call `upsert_prices(df)` + `log_ingestion(...)`.


In [None]:
import time
import requests
import pandas as pd
from datetime import datetime, timedelta, timezone

APCA_KEY_ID = os.getenv("APCA_API_KEY_ID")
APCA_SECRET_KEY = os.getenv("APCA_API_SECRET_KEY")
if not APCA_KEY_ID or not APCA_SECRET_KEY or "your_" in APCA_KEY_ID or "your_" in APCA_SECRET_KEY:
    raise RuntimeError("Alpaca API keys missing. Please add APCA_API_KEY_ID and APCA_API_SECRET_KEY to your .env.")

SYMBOL = os.getenv("TICKER", "SPY")
DATA_START_YEAR = int(os.getenv("DATA_START_YEAR", "2016"))  # your .env can still say 2000; we'll clamp below

headers = {
    "APCA-API-KEY-ID": APCA_KEY_ID,
    "APCA-API-SECRET-KEY": APCA_SECRET_KEY,
}

def iso(dt):
    if isinstance(dt, str):
        return dt
    return dt.replace(tzinfo=timezone.utc).isoformat().replace("+00:00", "Z")

def fetch_alpaca_daily(symbol: str, start_dt: datetime, end_dt: datetime,
                       limit_per_page: int = 10000, sleep_sec: float = 0.2):
    """
    Fetch daily bars from Alpaca (IEX feed on free plan) with pagination.
    Returns DataFrame with: date, open, high, low, close, volume, adjusted_close(None)
    """
    url = f"https://data.alpaca.markets/v2/stocks/{symbol}/bars"
    params = {
        "timeframe": "1Day",
        "start": iso(start_dt),
        "end": iso(end_dt),
        "limit": limit_per_page,
        "feed": "iex",              # <-- IMPORTANT: free plan requires IEX feed
        # "adjustment": "raw",      # optional; you can set "all" if your plan supports it
    }

    all_rows = []
    page_token = None

    while True:
        if page_token:
            params["page_token"] = page_token
        r = requests.get(url, headers=headers, params=params, timeout=60)
        if r.status_code != 200:
            raise RuntimeError(f"Alpaca request failed HTTP {r.status_code}: {r.text}")
        payload = r.json()
        bars = payload.get("bars", [])
        for b in bars:
            all_rows.append({
                "date": pd.to_datetime(b["t"]).date(),
                "open": float(b["o"]),
                "high": float(b["h"]),
                "low": float(b["l"]),
                "close": float(b["c"]),
                "volume": float(b["v"]),
                "adjusted_close": None,  # not provided on free IEX endpoint
            })
        page_token = payload.get("next_page_token")
        if not page_token:
            break
        time.sleep(sleep_sec)

    df = pd.DataFrame(all_rows)
    if df.empty:
        return df
    df["date"] = pd.to_datetime(df["date"])
    df = df.sort_values("date").reset_index(drop=True)
    return df

# ---------- incremental window with clamp to IEX coverage ----------
last_date = get_last_date_for_symbol(SYMBOL)
iex_min_start = pd.Timestamp("2016-01-01")  # free plan historical coverage (approx)

if last_date:
    start_dt = pd.to_datetime(last_date) + pd.Timedelta(days=1)
else:
    requested_start = pd.Timestamp(f"{DATA_START_YEAR}-01-01")
    start_dt = max(requested_start, iex_min_start)  # clamp to IEX coverage

end_dt = pd.Timestamp(datetime.now(timezone.utc))

print(f"📡 Alpaca (IEX): fetching {SYMBOL} 1D bars from {start_dt.date()} to {end_dt.date()} ...")
raw_df = fetch_alpaca_daily(SYMBOL, start_dt, end_dt)
print(f"⬇️ Pulled {len(raw_df)} rows for {SYMBOL} "
      f"({raw_df['date'].min().date() if not raw_df.empty else 'NA'} → {raw_df['date'].max().date() if not raw_df.empty else 'NA'})")

if raw_df.empty:
    print("ℹ️ No new data from Alpaca.")
else:
    raw_df["symbol"] = SYMBOL
    raw_df["source"] = "alpaca"
    final_df = raw_df[[
        "date", "symbol", "open", "high", "low", "close",
        "volume", "adjusted_close", "source"
    ]].copy()

    upsert_prices(final_df)
    notes = (f"symbol={SYMBOL} range={final_df['date'].min().date()}→{final_df['date'].max().date()} rows={len(final_df)}")
    log_ingestion(source="alpaca", action="fetch_prices", notes=notes)


## 🧭 Universe: load S&P 500 list, normalize, persist

Goal:

- Read the uploaded Excel once from `/mnt/data/StocksInSP500.xlsx`

- Normalize column names → (symbol, name, sector, weight)

- Upsert into SQLite `universe` table (idempotent)

- Save a normalized CSV to the repo: `data/reference/sp500_universe.csv` for reproducible future runs

- Build Top-50 and Bottom-50 lists (by weight if available, else alphabetical)

- Batch-ingest prices for the selected tickers (Alpaca, IEX)


In [None]:
with get_connection() as conn:

    conn.execute("""

        CREATE TABLE IF NOT EXISTS universe (

            symbol TEXT PRIMARY KEY,

            name   TEXT,

            sector TEXT,

            weight REAL,

            source TEXT

        );

    """)

print("✅ universe table ready.")


In [None]:
import pandas as pd

from pathlib import Path

# Paths

repo_csv = Path("data/reference/sp500_universe.csv")

repo_csv.parent.mkdir(parents=True, exist_ok=True)

# 1) Try repo CSV first (fast + reproducible), else read the uploaded Excel

if repo_csv.exists():

    dfu_raw = pd.read_csv(repo_csv)

    print(f"📄 Loaded repo CSV: {repo_csv}")

else:

    excel_path = Path("/content/StocksInSP500.xlsx")  # uploaded file path

    if not excel_path.exists():

        raise FileNotFoundError(f"Excel not found at {excel_path}. Upload it or commit the repo CSV first.")

    dfu_raw = pd.read_excel(excel_path)

    print(f"📄 Loaded Excel: {excel_path}")

# 2) Heuristic column detection (case-insensitive)

cols = {c.lower(): c for c in dfu_raw.columns}

def pick(*cands):

    for k in cands:

        if k in cols: return cols[k]

    return None

col_symbol = pick("symbol", "ticker", "ticker symbol", "sp500 ticker", "spx ticker")

col_name   = pick("security", "name", "company", "company name")

col_sector = pick("gics sector", "sector")

col_weight = pick("weight", "index weight", "spx weight", "wgt")

if not col_symbol:

    raise ValueError(f"Ticker column not found. Available columns: {list(dfu_raw.columns)}")

dfu = dfu_raw.copy()

# 3) Normalize columns → symbol, name, sector, weight, source

dfu["symbol"] = dfu[col_symbol].astype(str).str.strip().str.upper()

dfu["name"]   = dfu[col_name] if col_name else None

dfu["sector"] = dfu[col_sector] if col_sector else None

dfu["weight"] = pd.to_numeric(dfu[col_weight], errors="coerce") if col_weight else None

dfu["source"] = "snp_excel" if not repo_csv.exists() else "repo_csv"

dfu = dfu[["symbol","name","sector","weight","source"]].drop_duplicates(subset=["symbol"]).reset_index(drop=True)

# 4) Save normalized CSV to repo (so future loads don’t depend on Colab upload)

if not repo_csv.exists():

    dfu.to_csv(repo_csv, index=False)

    print(f"💾 Saved normalized universe to {repo_csv} (commit this to git).")

print(f"✅ Universe normalized. Rows: {len(dfu)} | Columns: {list(dfu.columns)}")

dfu.head()



In [None]:
with get_connection() as conn:
    conn.executemany("""
        INSERT OR REPLACE INTO universe (symbol, name, sector, weight, source)
        VALUES (?, ?, ?, ?, ?)
    """, list(dfu.itertuples(index=False, name=None)))
print(f"✅ Upserted {len(dfu)} rows into universe.")

In [None]:
with get_connection() as conn:

    u = pd.read_sql_query("SELECT * FROM universe", conn)

if u["weight"].notna().any():

    u_sorted = u.sort_values("weight", ascending=False)

    top50    = u_sorted.head(50)["symbol"].tolist()

    bottom50 = u_sorted.tail(50)["symbol"].tolist()

    basis = "weight"

else:

    # fallback: alphabetical (you can replace with market cap if present later)

    u_sorted = u.sort_values("symbol")

    top50    = u_sorted.head(50)["symbol"].tolist()

    bottom50 = u_sorted.tail(50)["symbol"].tolist()

    basis = "alphabetical"

print(f"✅ Built Top/Bottom 50 based on: {basis}")

print(f"Top50 sample: {top50[:10]}")

print(f"Bottom50 sample: {bottom50[:10]}")



In [None]:
import time

symbols = sorted(set(top50 + bottom50 + ["SPY"]))  # ensure SPY included once
print(f"📡 Ingesting {len(symbols)} symbols via Alpaca (IEX)...")

ok, fail = 0, []
for i, sym in enumerate(symbols, 1):
    try:
        inserted = ingest_symbol_alpaca(sym, start_year=2016)
        ok += 1
    except Exception as e:
        print(f"⚠️ {sym} failed: {e}")
        fail.append(sym)
    time.sleep(0.15)  # polite pacing (IEX is generous, this is just to be safe)

print(f"✅ Batch complete. OK={ok}, failed={len(fail)}")
if fail:
    print("Failed symbols:", fail)

In [None]:
# Try VIX directly; if not supported on IEX free, use VIXY ETF as a proxy

try:

    ingest_symbol_alpaca("VIX", start_year=2016)

except Exception as e:

    print("VIX failed on Alpaca; trying VIXY proxy...", e)

    try:

        ingest_symbol_alpaca("VIXY", start_year=2016)

    except Exception as e2:

        print("VIXY failed as well:", e2)



<details>
<summary><b>(Legacy) Alpha Vantage — Reference Only (Do Not Run)</b></summary>

*(This section remains for archival/reference. Alpaca is our primary provider for v1. Alpha Vantage’s adjusted endpoint is now premium-only.)*

</details>


## 📈 Step 4: Alpha Vantage Ingestion (SPY first pass)

Goal of this step:
- Pull full daily historical OHLCV for SPY from Alpha Vantage
- Clean/standardize the data
- Upsert it into the `prices` table in `market_data.db`
- Log the ingestion event in `ingestion_log`

Key rules:
- We respect incremental updates.
  - If `prices` already has SPY through some date, we'll only request data we don't have.
- We tag all inserted rows with `source='alpha_vantage'`.
- We will store `adjusted_close` from Alpha Vantage (important for modeling returns).

Note:
- Alpha Vantage free tier is ~5 calls/minute, ~500/day.
- Here we're only pulling 1 ticker (SPY), so this is safe.

After SPY works:
- We'll apply the same flow to VIX and then to each ticker in the top 50 / bottom 50 list from the S&P 500 universe.


In [None]:
RUN_ALPHA = False

import requests
import pandas as pd
from datetime import datetime

# ------------
# Config
# ------------
ALPHA_KEY = os.getenv("ALPHA_VANTAGE_API_KEY")
if not ALPHA_KEY or "your_" in ALPHA_KEY:
    raise RuntimeError("Alpha Vantage API key missing or placeholder. Please update .env and rerun load_env().")

BASE_URL = "https://www.alphavantage.co/query"
SYMBOL = os.getenv("TICKER", "SPY")  # default SPY
DATA_START_YEAR = int(os.getenv("DATA_START_YEAR", "2000"))

print(f"📡 Fetching data for {SYMBOL} starting from {DATA_START_YEAR}...")

# ------------
# Helper: fetch raw daily data from Alpha Vantage
# ------------
def fetch_alpha_vantage_daily_adjusted(symbol: str, api_key: str):
    """
    Calls Alpha Vantage TIME_SERIES_DAILY_ADJUSTED for `symbol`
    Returns a pandas DataFrame with columns:
      date, open, high, low, close, adjusted_close, volume
    """
    params = {
        "function": "TIME_SERIES_DAILY_ADJUSTED",
        "symbol": symbol,
        "outputsize": "full",         # full history (they usually go back ~20+ years)
        "datatype": "json",
        "apikey": api_key,
    }
    r = requests.get(BASE_URL, params=params)
    if r.status_code != 200:
        raise RuntimeError(f"Alpha Vantage request failed with HTTP {r.status_code}")

    payload = r.json()

    # Alpha Vantage returns time series under "Time Series (Daily)"
    ts = payload.get("Time Series (Daily)")
    if ts is None:
        raise RuntimeError(f"Unexpected Alpha Vantage response: {payload}")

    records = []
    for day_str, vals in ts.items():
        # vals keys look like: '1. open', '2. high', etc.
        records.append({
            "date": day_str,
            "open": float(vals["1. open"]),
            "high": float(vals["2. high"]),
            "low": float(vals["3. low"]),
            "close": float(vals["4. close"]),
            "adjusted_close": float(vals["5. adjusted close"]),
            "volume": float(vals["6. volume"]),
        })

    df = pd.DataFrame(records)

    # Make sure it's sorted ascending by date
    df["date"] = pd.to_datetime(df["date"])
    df = df.sort_values("date").reset_index(drop=True)

    return df

# ------------
# Step 1: check last date we already have for this symbol
# ------------
last_date = get_last_date_for_symbol(SYMBOL)
print(f"📅 Last stored date for {SYMBOL} in DB:", last_date)

# ------------
# Step 2: fetch from Alpha Vantage
# ------------
raw_df = fetch_alpha_vantage_daily_adjusted(SYMBOL, ALPHA_KEY)
print(f"⬇️ Pulled {len(raw_df)} rows from Alpha Vantage for {SYMBOL} "
      f"({raw_df['date'].min().date()} → {raw_df['date'].max().date()})")

# ------------
# Step 3: filter for incremental load
# ------------
if last_date is not None:
    # keep only rows with date > last_date
    last_dt = pd.to_datetime(last_date)
    new_df = raw_df[raw_df["date"] > last_dt].copy()
else:
    # first time: filter by DATA_START_YEAR so we don't store ancient data if any
    cutoff = pd.Timestamp(f"{DATA_START_YEAR}-01-01")
    new_df = raw_df[raw_df["date"] >= cutoff].copy()

print(f"🧮 New rows to insert for {SYMBOL}: {len(new_df)}")

if new_df.empty:
    print("ℹ️ No new data to insert. Skipping upsert.")
else:
    # ------------
    # Step 4: shape to match `prices` table schema
    # ------------
    new_df["symbol"] = SYMBOL
    new_df["source"] = "alpha_vantage"

    # ensure columns exist even if we don't yet use some technical features here
    final_df = new_df[[
        "date", "symbol", "open", "high", "low", "close",
        "volume", "adjusted_close", "source"
    ]].copy()

    # ------------
    # Step 5: write into SQLite
    # ------------
    upsert_prices(final_df)

    # ------------
    # Step 6: log ingestion
    # ------------
    notes = (
        f"symbol={SYMBOL} "
        f"range={final_df['date'].min().date()}→{final_df['date'].max().date()} "
        f"rows={len(final_df)}"
    )
    log_ingestion(
        source="alpha_vantage",
        action="fetch_prices",
        notes=notes
    )


In [None]:
# Go to repo root
%cd /content/market-forecast-capstone

# Make folders if missing
!mkdir -p notebooks data/reference

# Copy your current notebook and the Excel you uploaded into the repo
# (If your notebook is named differently, adjust the src path accordingly.)
!cp /mnt/data/ingest_explore.ipynb notebooks/01_ingest_prices.ipynb
!cp -n /mnt/data/StocksInSP500.xlsx data/reference/ || true

# If you already created a normalized CSV during the universe step, copy it in too (ignore if not found)
!cp -n data/reference/sp500_universe.csv data/reference/sp500_universe.csv 2>/dev/null || true

# Sanity check
!ls -lah notebooks
!ls -lah data/reference

In [None]:
!git config --global user.name "JoeGiwa"
!git config --global user.email "joebot17@gmail.com"


In [None]:
!# FILL THESE IN:JoeGiwa/market-forecast-capstone.git
GITHUB_USER = "JoeGiwa"

REPO_NAME   = "market-forecast-capstone"

TOKEN       = "<REDACTED>"  # keep private

remote = f"https://{TOKEN}@github.com/{GITHUB_USER}/{REPO_NAME}.git"

import subprocess, shlex

subprocess.run(shlex.split(f"git remote set-url origin {remote}"), check=False)

print("✅ Remote updated (with token). Ready to push.")



In [None]:
pwd


In [None]:
ls -lah


In [None]:
%cd /content/market-forecast-capstone


In [None]:
ls -lah


In [None]:

!git status

In [None]:
ls -lah

In [None]:
!git add notebooks/01_ingest_prices.ipynb data/reference/StocksInSP500.xlsx
!git commit -m "Add ingestion notebook and S&P500 Excel"

In [None]:
!git status