In [None]:
BASE='https://api.stlouisfed.org'

In [5]:
import time, random
from collections import deque

class PerMinuteLimiter:
    def __init__(self, max_per_minute=110):  # headroom under 120/min
        self.max = max_per_minute
        self.ts = deque()

    def wait(self):
        now = time.time()
        while self.ts and now - self.ts[0] >= 60:
            self.ts.popleft()
        if len(self.ts) >= self.max:
            sleep_for = 60 - (now - self.ts[0])
            time.sleep(max(0, sleep_for))
        # tiny jitter to avoid lockstep retries
        time.sleep(random.uniform(0.0, 0.08))
        self.ts.append(time.time())

limiter = PerMinuteLimiter()

# pip install pandas requests python-dateutil
import os, time, requests, pandas as pd
from dateutil.relativedelta import relativedelta
START, END = "1980-01-01", "2019-12-31"  # data range of interest
SLEEP=0.05 # sleep
DEF_MIN_REALTIME, DEF_MAX_REALTIME='1776-07-04', '9999-12-31'  # default FRED realtime range
TITLE_EXCLUDE_RX = (
    "forecast|projection|expectation|outlook|estimate|nowcast|"
    "discontinued|lag|lead|advance|future"
)

def _get(path, **params):
    params.update({"api_key": API_KEY, "file_type": "json", **params})
    max_retries = 20
    backoff = 1

    for attempt in range(max_retries):
        limiter.wait()  # <-- global pacing (~2 rps)
        r = requests.get(f"{BASE}{path}", params=params, timeout=60)

        if r.status_code == 429:
            ra = r.headers.get("Retry-After")
            if ra is not None:
                time.sleep(float(ra))   # honor server exactly
                backoff = 1             # reset our own backoff
            else:
                time.sleep(backoff)
                backoff = min(backoff * 1.5, 60)
            continue

        try:
            r.raise_for_status()
            return r.json()
        except requests.HTTPError as e:
            if attempt < max_retries - 1:
                time.sleep(backoff)
                backoff = min(backoff * 2, 60)
                continue
            raise

    raise RuntimeError(f"Failed after {max_retries} retries for path {path}")

# ---------- Category traversal ----------
def get_category_children(category_id: int):
    js = _get("/fred/category/children", category_id=category_id)
    return js.get("categories", [])

def get_category_series(category_id: int, realtime_start='1980-01-01', limit=1000, sleep=SLEEP):
    out, offset = [], 0
    while True:
        js = _get("/fred/category/series", category_id=category_id,
                  limit=limit, offset=offset, order_by="series_id",
                  realtime_start=realtime_start)
        print(f'{js.get("count",0)} series so far...', end='\r')
        ser = js.get("seriess", [])
        out.extend(ser)
        if len(ser) < limit: break
        offset += limit
        time.sleep(sleep)
    return out

def prime_categories():
    """Children of category 0 (the 8 top buckets)."""
    kids = get_category_children(0)
    return pd.DataFrame(kids)[["id","name"]].sort_values("name").reset_index(drop=True)
# ---------- Category traversal (recursive/iterative) ----------
def iter_category_ids(root_id: int):
    """Depth-first walk yielding root and all descendant category IDs."""
    stack, seen = [root_id], set()
    while stack:
        cid = stack.pop()
        if cid in seen:
            continue
        seen.add(cid)
        yield cid
        for kid in get_category_children(cid):
            stack.append(kid["id"])

def get_all_series_under_category(root_id: int, realtime_start='1980-01-01', limit=1000, sleep=SLEEP):
    """Collect series metadata for root category and all descendants; de-duplicate by series_id."""
    series_by_id = {}
    for cid in iter_category_ids(root_id):
        for m in get_category_series(cid, realtime_start=realtime_start, limit=limit, sleep=sleep):
            sid = m["id"]
            if sid not in series_by_id:   # keep first seen; avoids duplicates across multiple cats
                series_by_id[sid] = m
    return list(series_by_id.values())

# ---------- First release (ALFRED) ----------
def first_release_date_for_series(series_id: str) -> str | None:
    """Return the earliest release date for the series' release (YYYY-MM-DD)."""
    rel = _get("/fred/series/release", series_id=series_id).get("releases", [])
    if not rel: return None
    rel_id = rel[0]["id"]
    dates = _get("/fred/release/dates", release_id=rel_id).get("release_dates", [])
    if not dates: return None
    return dates[-1]['date']  # earliest vintage date

# ---------- Observations helpers ----------
def _obs_to_series(js, series_id: str, freq: str) -> pd.Series:
    obs = pd.DataFrame(js.get("observations", []))
    if obs.empty: return pd.Series(dtype="float64", name=series_id)
    obs["date"] = pd.to_datetime(obs["date"])
    obs["value"] = pd.to_numeric(obs["value"].replace(".", pd.NA), errors="coerce")
    s = obs.set_index("date")["value"]
    return s.rename(series_id)

def fetch_first_vintage_monthly(series_id: str, series_freq_short: str) -> pd.Series:
    """
    Pull earliest-vintage data and return MONTHLY series converted if needed:
      - D/W -> monthly via FRED server-side frequency='m', aggregation_method='avg'
      - M   -> monthly as-is
    """
    first_vintage = first_release_date_for_series(series_id)
    if not first_vintage:
        return pd.Series(dtype="float64", name=series_id)

    params_common = dict(
        series_id=series_id,
        realtime_start=first_vintage, realtime_end=first_vintage,
        observation_start=START, observation_end=END
    )

    if series_freq_short in ("D", "W"):  # higher-than-monthly -> aggregate to monthly
        js = _get("/fred/series/observations", **params_common,
                  frequency="m", aggregation_method="avg")
        return _obs_to_series(js, series_id, "M")

    elif series_freq_short == "M":
        js = _get("/fred/series/observations", **params_common)
        return _obs_to_series(js, series_id, "M")

    else:
        # Unknown/other frequencies: try FRED's monthly conversion
        js = _get("/fred/series/observations", **params_common,
                  frequency="m", aggregation_method="avg")
        return _obs_to_series(js, series_id, "M")


# ---------- Filters ----------
def meta_passes_title_filters(meta: dict) -> bool:
    title = (meta.get("title") or "")
    return pd.Series(title).str.contains(TITLE_EXCLUDE_RX, case=False, regex=True).iloc[0] == False

def is_discontinued(meta: dict) -> bool:
    title = (meta.get("title") or "")
    return "discontinued" in title.lower()

def keep_by_frequency(meta: dict) -> bool:
    # Accept monthly or quarterly as-is; allow D/W to be aggregated to monthly; skip others if you prefer.
    f = (meta.get("frequency_short") or "").upper()
    return f in ("M","D","W")

def series_is_complete_monthly(s: pd.Series) -> bool:
    """Return True if the Series has no missing values."""
    # print na count
    return (not s.empty) and s.notna().all()

# ---------- Pipeline (one top category as example) ----------
from tqdm import tqdm

def build_category_panel(cat_id: int, include_descendants: bool = True, rate_sleep=SLEEP):
    if include_descendants:
        meta_list = get_all_series_under_category(cat_id, realtime_start=START, sleep=rate_sleep)
    else:
        meta_list = get_category_series(cat_id, realtime_start=START, sleep=rate_sleep)

    print(f"Meta candidates before filters: {len(meta_list)}")
    meta_list = [m for m in meta_list
                 if keep_by_frequency(m) and meta_passes_title_filters(m) and not is_discontinued(m)]
    n=len(meta_list)
    save_points = {int(n * p) for p in [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9]}  # indices for 20%, 40%, 60%, 80%

    done_ids = set()
    kept = []
    for i, m in enumerate(tqdm(meta_list, desc="Fetching series", unit="series"), start=1):
        sid = m["id"]
        fshort = (m.get("frequency_short") or "").upper()
        try:
            s = fetch_first_vintage_monthly(sid, fshort)
            if series_is_complete_monthly(s):
                kept.append(s)
        except requests.HTTPError:
            pass
            # Save progress at 20%, 40%, 60%, 80%
        if i in save_points:
            with open("done_ids.txt", "w") as f:
                f.write("\n".join(map(str, done_ids)))
            tqdm.write(f"Progress saved at {i/n:.0%} ({i}/{n})")

        time.sleep(rate_sleep)  # rate limit

    if kept:
        return pd.concat(kept, axis=1)
    else:
        return pd.DataFrame(index=pd.period_range("1980-01","2019-12",freq="M").to_timestamp("M"))

if __name__ == "__main__":
  cid=32263
  cname='International Data'
  panel = build_category_panel(cid)
  print(f"  Panel shape: {panel.shape}")
  panel.to_csv(f"fred_category_{cid}_{cname.lower().replace(' ','_')}_first_vintage_monthly_1980_2019.csv")

6058 series so far....

ConnectionError: ('Connection aborted.', RemoteDisconnected('Remote end closed connection without response'))