In [None]:
import os
from datetime import datetime, timedelta, timezone
from concurrent.futures import ThreadPoolExecutor, as_completed

import pandas as pd
import requests


CSV_PATH = "sp1500_tickers_list.csv"      # input CSV with a 'symbol' column
SYMBOL_COL = "symbol"                     # column name containing tickers
OUT_CSV = "sp1500_history_30d.csv"        # output CSV for price history

N_DAYS = 30                               # number of trading days to keep per symbol
LOOKBACK_DAYS = 90                        # calendar days window to request from Massive
WORKERS = 20                              # max parallel threads
REF_DATE_STR = None                       # e.g. "2025-01-15" or None for "today UTC"

# Massive API config
BASE = "https://api.massive.com"
TIMESPAN = "day"
MULTIPLIER = 1
LIMIT = 50000
ADJUSTED = True
SORT = "asc"


def iso_date(dt_utc: datetime) -> str:
    """Format a timezone-aware datetime as YYYY-MM-DD."""
    return dt_utc.strftime("%Y-%m-%d")


def human_date(ms: int) -> str:
    """Convert milliseconds since epoch (UTC) to YYYY-MM-DD."""
    return datetime.fromtimestamp(ms / 1000, tz=timezone.utc).strftime("%Y-%m-%d")


def fetch_aggs(api_key: str, ticker: str, start_iso: str, end_iso: str) -> list[dict]:
    """
    Fetch daily aggregate bars for a ticker between start_iso and end_iso.
    Raises requests.HTTPError on non-200 responses.
    """
    url = f"{BASE}/v2/aggs/ticker/{ticker}/range/{MULTIPLIER}/{TIMESPAN}/{start_iso}/{end_iso}"
    headers = {"Authorization": f"Bearer {api_key}"}
    params = {
        "adjusted": str(ADJUSTED).lower(),
        "sort": SORT,
        "limit": LIMIT,
    }

    resp = requests.get(url, headers=headers, params=params, timeout=30)
    resp.raise_for_status()
    data = resp.json()
    return data.get("results") or []


def per_ticker_job(
    api_key: str,
    ticker: str,
    ref_dt_utc: datetime,
    lookback_days: int,
    n_days: int,
) -> list[dict]:
    """
    For a single ticker:
        - Fetch daily bars over a calendar window (lookback_days)
        - Sort by timestamp and keep the last n_days bars
        - Return a list of row dicts, one per trading day
    """
    start_utc = ref_dt_utc - timedelta(days=lookback_days)
    start_iso = iso_date(start_utc)
    end_iso = iso_date(ref_dt_utc)

    bars = fetch_aggs(api_key, ticker, start_iso, end_iso)
    if not bars:
        return []

    # sort ascending by timestamp, then take the last n_days trading sessions
    bars.sort(key=lambda b: b["t"])
    if len(bars) > n_days:
        bars = bars[-n_days:]

    rows = []
    for b in bars:
        rows.append(
            {
                "symbol": ticker,
                "date": human_date(b["t"]),
                "open": b.get("o"),
                "high": b.get("h"),
                "low": b.get("l"),
                "close": b.get("c"),
                "volume": b.get("v"),
                "vwap": b.get("vw"),
            }
        )
    return rows


# =========================
# Run in the notebook cell
# =========================

# 1. Set API Key
api_key = 'enter_api_key_here'

# 2. Load tickers
if not os.path.exists(CSV_PATH):
    raise FileNotFoundError(f"CSV file not found: {CSV_PATH}")

df_symbols = pd.read_csv(CSV_PATH)

if SYMBOL_COL not in df_symbols.columns:
    raise KeyError(
        f"Symbol column '{SYMBOL_COL}' not found. "
        f"Available columns: {list(df_symbols.columns)}"
    )

symbols = (
    df_symbols[SYMBOL_COL]
    .astype(str)
    .str.strip()
    .str.upper()
    .dropna()
    .unique()
    .tolist()
)

if not symbols:
    raise RuntimeError("No symbols found in input CSV. Nothing to do.")

# 3. Reference date
if REF_DATE_STR:
    ref_dt_utc = datetime.strptime(REF_DATE_STR, "%Y-%m-%d").replace(tzinfo=timezone.utc)
else:
    ref_dt_utc = datetime.now(timezone.utc)

print(
    f"Fetching last {N_DAYS} trading days for {len(symbols)} symbols "
    f"(lookback {LOOKBACK_DAYS} calendar days, ref={iso_date(ref_dt_utc)})"
)

# 4. Parallel fetch
all_rows: list[dict] = []

with ThreadPoolExecutor(max_workers=WORKERS) as executor:
    futures = {
        executor.submit(
            per_ticker_job,
            api_key,
            symbol,
            ref_dt_utc,
            LOOKBACK_DAYS,
            N_DAYS,
        ): symbol
        for symbol in symbols
    }

    for fut in as_completed(futures):
        symbol = futures[fut]
        rows = fut.result()
        print(f"{symbol}: {len(rows)} rows")
        all_rows.extend(rows)

if not all_rows:
    raise RuntimeError("No price data returned for any symbol.")

# 5. Build DataFrame and save
out_df = pd.DataFrame(all_rows)
out_df.sort_values(["symbol", "date"], inplace=True)
out_df.to_csv(OUT_CSV, index=False)

print(
    f"\nWrote price history for {out_df['symbol'].nunique()} symbols "
    f"and {len(out_df)} total rows to {OUT_CSV}"
)

out_df.head()

usage: ipykernel_launcher.py [-h] [--csv CSV] [--symbol-col SYMBOL_COL]
                             [--out-csv OUT_CSV] [--workers WORKERS]
                             [--ref-date REF_DATE] [--n-days N_DAYS]
                             [--lookback-days LOOKBACK_DAYS] [--verbose]
ipykernel_launcher.py: error: unrecognized arguments: --f=/Users/brendantorok/Library/Jupyter/runtime/kernel-v37b4f5183853663b8ffa040766db8e1ada0352761.json


SystemExit: 2