# HW6 — Data Grab (BoE OIS + Nasdaq FX + Emerging Market Swap Curves)

This notebook fetches the **required inputs** for *FX Carry Strategy*:

- UK OIS short rate via **BoE IUDSOIA** (preferred, simplest)
- Spot FX via **Nasdaq Data Link Tables API** `EDI/CUR` for: GBP, TRY, ZAR, PKR, BRL, NGN
- Emerging-market swap curves from the **“Emerging Mkt YC”** file (class website download)

Outputs are written to `data_clean/` as Parquet files.


In [1]:

from __future__ import annotations

import os
import io
import re
import json
import zipfile
import datetime as dt
from pathlib import Path
from typing import Any, Iterable, Optional

import numpy as np
import pandas as pd
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

# -----------------------------
# Paths
# -----------------------------
NOTEBOOK_DIR = Path.cwd().resolve()
RAW_DIR   = NOTEBOOK_DIR / "data_raw"
CLEAN_DIR = NOTEBOOK_DIR / "data_clean"
RAW_DIR.mkdir(parents=True, exist_ok=True)
CLEAN_DIR.mkdir(parents=True, exist_ok=True)

# -----------------------------
# Load .env (Nasdaq API key; optional EMYC url)
# -----------------------------
def _load_dotenv(env_path: Path = NOTEBOOK_DIR / ".env") -> None:
    if not env_path.exists():
        return
    try:
        from dotenv import load_dotenv
    except Exception as e:
        raise RuntimeError("Install python-dotenv: `pip install python-dotenv`") from e
    load_dotenv(dotenv_path=env_path, override=True)

_load_dotenv()

# -----------------------------
# HTTP session with retry
# -----------------------------
def _session() -> requests.Session:
    s = requests.Session()
    retry = Retry(
        total=8,
        backoff_factor=0.5,
        status_forcelist=(429, 500, 502, 503, 504),
        allowed_methods=("GET", "HEAD"),
        raise_on_status=False,
    )
    s.mount("https://", HTTPAdapter(max_retries=retry))
    s.headers.update({"User-Agent": "hw6-data-grab/1.0"})
    return s

SESSION = _session()

def _download(url: str, dst: Path, *, force: bool = False, timeout: int = 180) -> Path:
    dst.parent.mkdir(parents=True, exist_ok=True)
    if dst.exists() and not force:
        return dst
    r = SESSION.get(url, stream=True, timeout=timeout)
    r.raise_for_status()
    tmp = dst.with_suffix(dst.suffix + ".tmp")
    with tmp.open("wb") as f:
        for chunk in r.iter_content(chunk_size=1 << 20):
            if chunk:
                f.write(chunk)
    tmp.replace(dst)
    return dst

print("NOTEBOOK_DIR:", NOTEBOOK_DIR)
print("RAW_DIR     :", RAW_DIR)
print("CLEAN_DIR   :", CLEAN_DIR)
print("NASDAQ key loaded:", bool(os.getenv("NASDAQ_DATA_LINK_API_KEY") or os.getenv("QUANDL_API_KEY")))


NOTEBOOK_DIR: C:\Users\baile\Box\Winter26\QTS\HW_Assignment_Files\HW6_carry
RAW_DIR     : C:\Users\baile\Box\Winter26\QTS\HW_Assignment_Files\HW6_carry\data_raw
CLEAN_DIR   : C:\Users\baile\Box\Winter26\QTS\HW_Assignment_Files\HW6_carry\data_clean
NASDAQ key loaded: True


In [2]:

# -----------------------------
# Bank of England: IUD series (CSV endpoint)
# -----------------------------
def _boe_date_str(ts: pd.Timestamp) -> str:
    ts = pd.Timestamp(ts).normalize()
    return f"{ts.day:02d}/{ts.strftime('%b')}/{ts.year}"

def _find_csv_header_row(text: str) -> int:
    lines = text.splitlines()
    for i, ln in enumerate(lines):
        s = ln.strip().strip("\ufeff")
        if not s:
            continue
        if re.match(r'^"?DATE"?[,;]', s.upper()):
            return i
    return 0

def fetch_boe_iud_series(
    series_code: str,
    start: str = "1963-01-01",
    end: str | None = None,
    *,
    cache_dir: Path = RAW_DIR / "boe",
    force: bool = False,
    rate_unit: str = "decimal",  # "decimal" or "percent"
) -> pd.DataFrame:
    '''
    Fetch BoE IADB series via CSV endpoint.

    Returns: DataFrame indexed by date with one float column named `series_code`.
    BoE rates are typically in percent; set rate_unit="decimal" to divide by 100.
    '''
    end_ts = pd.Timestamp(end) if end else pd.Timestamp.today().normalize()
    start_ts = pd.Timestamp(start).normalize()

    cache_dir.mkdir(parents=True, exist_ok=True)
    out_path = cache_dir / f"{series_code}_{start_ts.date()}_{end_ts.date()}_{rate_unit}.parquet"
    if out_path.exists() and not force:
        return pd.read_parquet(out_path)

    urls = [
        "https://www.bankofengland.co.uk/boeapps/database/_iadb-fromshowcolumns.asp",
        "https://www.bankofengland.co.uk/boeapps/iadb/fromshowcolumns.asp",
    ]

    params = {
        "csv.x": "yes",
        "Datefrom": _boe_date_str(start_ts),
        "Dateto": _boe_date_str(end_ts),
        "SeriesCodes": series_code,
        "CSVF": "CN",       # try columnar first
        "UsingCodes": "Y",
        "VPD": "Y",
    }

    last_err: Exception | None = None
    for url in urls:
        for csvf in ("CN", "TN"):
            params["CSVF"] = csvf
            try:
                r = SESSION.get(url, params=params, timeout=120)
                r.raise_for_status()
                txt = r.content.decode("utf-8", errors="replace")
                if "<html" in txt.lower():
                    raise RuntimeError(f"BoE returned HTML (not CSV): {url} CSVF={csvf}")

                skip = _find_csv_header_row(txt)
                df = pd.read_csv(io.StringIO(txt), skiprows=skip)
                df.columns = [c.strip().strip("\ufeff") for c in df.columns]

                # Tabular layout: DATE + series column
                if "DATE" in df.columns and series_code in df.columns:
                    out = df[["DATE", series_code]].copy()
                    out["DATE"] = pd.to_datetime(out["DATE"], errors="coerce", dayfirst=True)
                    out[series_code] = pd.to_numeric(out[series_code], errors="coerce")
                    out = out.dropna(subset=["DATE"]).drop_duplicates(subset=["DATE"]).sort_values("DATE")
                    out = out.set_index("DATE")
                else:
                    # Columnar layout: DATE + SERIES + VALUE
                    lcols = {c.lower(): c for c in df.columns}
                    date_col = lcols.get("date") or next((c for c in df.columns if "date" in c.lower()), None)
                    series_col = lcols.get("series") or lcols.get("seriescode") or lcols.get("series_code")
                    value_col = lcols.get("value") or lcols.get("obs_value") or lcols.get("rate")
                    if not (date_col and series_col and value_col):
                        raise RuntimeError(f"Unrecognized CSV layout: cols={df.columns.tolist()}")
                    out = df[[date_col, series_col, value_col]].copy()
                    out = out[out[series_col].astype(str).str.strip().eq(series_code)]
                    out[date_col] = pd.to_datetime(out[date_col], errors="coerce", dayfirst=True)
                    out[value_col] = pd.to_numeric(out[value_col], errors="coerce")
                    out = out.dropna(subset=[date_col]).drop_duplicates(subset=[date_col]).sort_values(date_col)
                    out = out.set_index(date_col)[[value_col]].rename(columns={value_col: series_code})

                if rate_unit == "decimal":
                    out[series_code] = out[series_code] / 100.0

                out.to_parquet(out_path)
                return out

            except Exception as e:
                last_err = e
                continue

    raise RuntimeError(f"Failed to fetch BoE series {series_code}. Last error: {last_err}")

# Required by assignment:
sonia = fetch_boe_iud_series("IUDSOIA", rate_unit="decimal")
sonia.tail()


Unnamed: 0_level_0,IUDSOIA
DATE,Unnamed: 1_level_1
2026-02-06,0.037279
2026-02-09,0.037271
2026-02-10,0.03727
2026-02-11,0.037274
2026-02-12,0.037272


In [3]:

# -----------------------------
# OPTIONAL: BoE OIS curve archive (oisddata.zip)
# The assignment allows using this in place of IUDSOIA, but it is not required if IUDSOIA works.
# This cell is safe-by-default: it will NOT run unless you set USE_OIS_ARCHIVE=True.
# -----------------------------
USE_OIS_ARCHIVE = False

BOE_OIS_DAILY_ZIP = "https://www.bankofengland.co.uk/-/media/boe/files/statistics/yield-curves/oisddata.zip"

_TENOR_RE = re.compile(r"^(?P<num>\d+(?:\.\d+)?)\s*(?P<unit>D|W|M|Y|YR|YEARS?)$", re.I)

def _tenor_to_years(label: str) -> float | None:
    s = str(label).strip().upper()
    if s in {"ON", "O/N", "OVERNIGHT"}:
        return 1.0 / 365.0
    m = _TENOR_RE.match(s.replace(" ", ""))
    if not m:
        # common patterns: '1W', '3M', '6M', '10Y'
        m = re.match(r"^(\d+(?:\.\d+)?)(D|W|M|Y)$", s)
    if not m:
        return None
    num = float(m.group(1))
    unit = m.group(2)
    if unit == "D":
        return num / 365.0
    if unit == "W":
        return (7.0 * num) / 365.0
    if unit == "M":
        return num / 12.0
    if unit in {"Y", "YR"}:
        return num
    return None

def _find_header_row(mat: pd.DataFrame) -> int | None:
    # locate a row containing 'date' and multiple tenor-like labels
    for i in range(min(len(mat), 60)):
        row = mat.iloc[i].astype(str).str.strip().str.lower()
        if (row == "date").any():
            # require at least 3 tenor-ish labels in the row
            hits = 0
            for v in row.values:
                if _tenor_to_years(v) is not None:
                    hits += 1
            if hits >= 3:
                return i
    return None

def _parse_ois_sheet(xls: pd.ExcelFile, sheet: str) -> pd.DataFrame | None:
    mat = pd.read_excel(xls, sheet_name=sheet, header=None)
    hdr = _find_header_row(mat)
    if hdr is None:
        return None

    header = mat.iloc[hdr].astype(str).str.strip()
    data = mat.iloc[hdr+1:].copy()
    data.columns = header

    # normalize date col
    date_col = next((c for c in data.columns if str(c).strip().lower() == "date"), None)
    if date_col is None:
        return None

    data[date_col] = pd.to_datetime(data[date_col], errors="coerce", dayfirst=True)
    data = data.dropna(subset=[date_col])

    # tenor columns
    ten_cols = []
    ten_map = {}
    for c in data.columns:
        if c == date_col:
            continue
        ty = _tenor_to_years(str(c))
        if ty is not None:
            ten_cols.append(c)
            ten_map[c] = ty

    if len(ten_cols) < 3:
        return None

    out = data[[date_col] + ten_cols].melt(id_vars=[date_col], var_name="tenor_label", value_name="rate_pct")
    out["tenor_years"] = out["tenor_label"].map(ten_map).astype(float)
    out["rate_pct"] = pd.to_numeric(out["rate_pct"], errors="coerce")
    out = out.dropna(subset=["rate_pct", "tenor_years"])
    out = out.rename(columns={date_col: "date"})
    out["rate"] = out["rate_pct"] / 100.0  # decimal
    out = out[["date", "tenor_years", "rate"]].sort_values(["date", "tenor_years"]).reset_index(drop=True)
    return out

def fetch_boe_ois_archive_tidy(*, force: bool = False) -> pd.DataFrame:
    zip_path = _download(BOE_OIS_DAILY_ZIP, RAW_DIR / "boe" / "oisddata.zip", force=force)
    tidy_frames: list[pd.DataFrame] = []
    with zipfile.ZipFile(zip_path) as z:
        members = [n for n in z.namelist() if not n.endswith("/")]
        for name in sorted(members):
            if "__MACOSX" in name or Path(name).name.startswith("."):
                continue
            ext = Path(name).suffix.lower()
            if ext not in {".xlsx", ".xls"}:
                continue
            b = z.read(name)
            xls = pd.ExcelFile(io.BytesIO(b))
            for sh in xls.sheet_names:
                parsed = _parse_ois_sheet(xls, sh)
                if parsed is None or parsed.empty:
                    continue
                parsed["__source_file"] = name
                parsed["__sheet"] = sh
                tidy_frames.append(parsed)

    if not tidy_frames:
        raise RuntimeError("Could not locate any parseable OIS curve tables inside oisddata.zip.")
    out = pd.concat(tidy_frames, ignore_index=True).drop_duplicates(subset=["date","tenor_years","__source_file","__sheet"])
    return out

if USE_OIS_ARCHIVE:
    ois_curve = fetch_boe_ois_archive_tidy()
    ois_curve.to_parquet(CLEAN_DIR / "boe_ois_curve_tidy.parquet", index=False)
    print("Saved:", CLEAN_DIR / "boe_ois_curve_tidy.parquet", "rows:", len(ois_curve))
    display(ois_curve.head())
else:
    print("Skipping BoE OIS curve archive (set USE_OIS_ARCHIVE=True if you need it).")


Skipping BoE OIS curve archive (set USE_OIS_ARCHIVE=True if you need it).


In [4]:

# -----------------------------
# Nasdaq Data Link Tables API utilities
# -----------------------------
def nasdaq_api_key() -> str:
    key = os.getenv("NASDAQ_DATA_LINK_API_KEY") or os.getenv("QUANDL_API_KEY")
    if not key:
        raise RuntimeError("Missing NASDAQ_DATA_LINK_API_KEY (or QUANDL_API_KEY). Put it in .env or your environment.")
    return key

def nasdaq_datatable_metadata(datatable_code: str, *, api_key: str | None = None) -> dict[str, Any]:
    api_key = api_key or nasdaq_api_key()
    url = f"https://data.nasdaq.com/api/v3/datatables/{datatable_code}/metadata.json"
    r = SESSION.get(url, params={"api_key": api_key}, timeout=60)
    r.raise_for_status()
    return r.json()

def nasdaq_datatable_fetch(
    datatable_code: str,
    params: dict[str, Any],
    *,
    api_key: str | None = None,
    max_pages: int = 10_000,
) -> pd.DataFrame:
    api_key = api_key or nasdaq_api_key()
    url = f"https://data.nasdaq.com/api/v3/datatables/{datatable_code}.json"

    cursor: str | None = None
    out: list[pd.DataFrame] = []

    for _ in range(max_pages):
        p = dict(params)
        p["api_key"] = api_key
        if cursor:
            p["qopts.cursor_id"] = cursor

        r = SESSION.get(url, params=p, timeout=120)
        r.raise_for_status()
        js = r.json()

        cols = [c["name"] for c in js["datatable"]["columns"]]
        data = js["datatable"]["data"]
        out.append(pd.DataFrame(data, columns=cols))

        cursor = js.get("meta", {}).get("next_cursor_id")
        if not cursor:
            break

    return pd.concat(out, ignore_index=True) if out else pd.DataFrame()


In [5]:

# -----------------------------
# FX from EDI/CUR
# -----------------------------
def _guess_fx_columns(meta: dict[str, Any]) -> tuple[str, str, str]:
    cols = [c["name"] for c in meta["datatable"]["columns"]]
    lc = {c.lower(): c for c in cols}

    date_col = lc.get("date") or next((c for c in cols if "date" in c.lower()), None)
    if not date_col:
        raise RuntimeError(f"Could not identify date column from columns={cols}")

    ccy_col = next((c for c in cols if c.lower() in {"code", "currency", "iso_code", "ccy", "ticker"}), None)
    if not ccy_col:
        ccy_col = next((c for c in cols if "cur" in c.lower() or "code" in c.lower() or "tick" in c.lower()), None)
    if not ccy_col:
        raise RuntimeError(f"Could not identify currency column from columns={cols}")

    candidates = [c for c in cols if c not in {date_col, ccy_col}]
    prefer = next((c for c in candidates if c.lower() in {"rate", "value", "fx", "exchange_rate"}), None)
    value_col = prefer or (candidates[0] if candidates else None)
    if not value_col:
        raise RuntimeError(f"Could not identify value column from columns={cols}")

    return date_col, ccy_col, value_col

def fetch_fx_edi_cur(
    currencies: Iterable[str],
    start: str = "1990-01-01",
    end: str | None = None,
    *,
    cache_dir: Path = RAW_DIR / "nasdaq",
    force: bool = False,
) -> pd.DataFrame:
    cache_dir.mkdir(parents=True, exist_ok=True)
    end_ts = pd.Timestamp(end) if end else pd.Timestamp.today().normalize()
    start_ts = pd.Timestamp(start).normalize()

    out_path = cache_dir / f"EDI_CUR_{start_ts.date()}_{end_ts.date()}_{'-'.join(currencies)}.parquet"
    if out_path.exists() and not force:
        return pd.read_parquet(out_path)

    meta = nasdaq_datatable_metadata("EDI/CUR")
    date_col, ccy_col, val_col = _guess_fx_columns(meta)

    frames: list[pd.DataFrame] = []
    for ccy in currencies:
        params = {
            f"{date_col}.gte": str(start_ts.date()),
            f"{date_col}.lte": str(end_ts.date()),
            ccy_col: ccy,
            "qopts.columns": ",".join([date_col, ccy_col, val_col]),
        }
        df = nasdaq_datatable_fetch("EDI/CUR", params=params)
        if df.empty:
            continue
        df = df.rename(columns={date_col: "date", ccy_col: "ccy", val_col: "value"})
        df["date"] = pd.to_datetime(df["date"], errors="coerce")
        df["value"] = pd.to_numeric(df["value"], errors="coerce")
        df = df.dropna(subset=["date", "value"]).sort_values("date")
        frames.append(df)

    out = pd.concat(frames, ignore_index=True) if frames else pd.DataFrame(columns=["date", "ccy", "value"])
    out.to_parquet(out_path, index=False)
    return out

FX_CODES = ["GBP", "TRY", "ZAR", "PKR", "BRL", "NGN"]
fx_long = fetch_fx_edi_cur(FX_CODES, start="1990-01-01")

fx_wide = (
    fx_long.pivot_table(index="date", columns="ccy", values="value", aggfunc="last")
           .sort_index()
)

# We keep both interpretations because EDI/CUR is USD-based but can be quoted either way by vendor.
fx_ccy_per_usd = fx_wide.copy()
fx_usd_per_ccy = 1.0 / fx_wide.replace(0, np.nan)

fx_long.to_parquet(CLEAN_DIR / "nasdaq_edi_cur_fx_long.parquet", index=False)
fx_ccy_per_usd.to_parquet(CLEAN_DIR / "nasdaq_edi_cur_ccy_per_usd_wide.parquet")
fx_usd_per_ccy.to_parquet(CLEAN_DIR / "nasdaq_edi_cur_usd_per_ccy_wide.parquet")

print("Saved FX files to:", CLEAN_DIR)
display(fx_long.tail())


Saved FX files to: C:\Users\baile\Box\Winter26\QTS\HW_Assignment_Files\HW6_carry\data_clean


Unnamed: 0,date,ccy,value
56364,2026-02-12,NGN,1354.01
56365,2026-02-13,NGN,1355.58
56366,2026-02-14,NGN,1353.578275
56367,2026-02-15,NGN,1353.578275
56368,2026-02-16,NGN,1354.15


In [6]:

# -----------------------------
# Emerging market swap curves ("Emerging Mkt YC" file from class website)
#
# Expected output: tidy long dataframe with columns:
#   date, ccy, tenor_years, swap_rate
# where swap_rate is in DECIMAL (e.g., 0.12 for 12%).
#
# How to provide the file:
#   (A) Place it in ./data_raw/ (recommended), OR
#   (B) Set EMERGING_MKT_YC_URL in .env to a direct-download URL.
# -----------------------------
EM_CCY = ["TRY", "ZAR", "PKR", "BRL", "NGN"]

def _find_emerging_yc_file() -> Path | None:
    pats = [
        "*Emerging*Mkt*YC*.*",
        "*Emerging*YC*.*",
        "*Emerging*Market*YC*.*",
        "*Mkt*YC*.*",
    ]
    search_dirs = [RAW_DIR, NOTEBOOK_DIR, NOTEBOOK_DIR.parent]
    for d in search_dirs:
        for pat in pats:
            hits = sorted(d.glob(pat))
            if hits:
                return hits[0]
    return None

def _flatten_columns(cols) -> list[str]:
    if isinstance(cols, pd.MultiIndex):
        flat = []
        for a, b in cols.to_list():
            a = "" if a is None else str(a).strip()
            b = "" if b is None else str(b).strip()
            if a and b:
                flat.append(f"{a}_{b}")
            else:
                flat.append(a or b)
        return flat
    return [str(c).strip() for c in cols]

_CCY_TEN_RE = re.compile(
    r"^(?P<ccy>[A-Z]{3})[\s_\-]*(?P<ten>\d+(?:\.\d+)?)(?P<unit>Y|YR|YEARS?|M|MO|MONTHS?)$",
    re.I
)
_TEN_CCY_RE = re.compile(
    r"^(?P<ten>\d+(?:\.\d+)?)(?P<unit>Y|YR|YEARS?|M|MO|MONTHS?)[\s_\-]*(?P<ccy>[A-Z]{3})$",
    re.I
)

def _tenor_years_from(ten: float, unit: str) -> float:
    u = unit.upper()
    if u in {"Y", "YR", "YEAR", "YEARS"}:
        return float(ten)
    if u in {"M", "MO", "MONTH", "MONTHS"}:
        return float(ten) / 12.0
    raise ValueError(f"Unsupported tenor unit: {unit}")

def _coerce_rate_to_decimal(s: pd.Series) -> pd.Series:
    x = pd.to_numeric(s, errors="coerce")
    med = float(np.nanmedian(x.values)) if np.isfinite(x).any() else np.nan
    # Heuristic: swap rates in emerging mkts are rarely < 1% in percent units; percent series typically > 1.
    if np.isfinite(med) and med > 1.5:
        return x / 100.0
    return x

def _tidy_from_wide(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()
    df.columns = _flatten_columns(df.columns)

    # locate date column
    date_col = next((c for c in df.columns if c.lower() == "date"), None)
    if date_col is None:
        # common alternative: first column is date-like
        c0 = df.columns[0]
        maybe_date = pd.to_datetime(df[c0], errors="coerce", dayfirst=True)
        if maybe_date.notna().mean() >= 0.8:
            date_col = c0
        else:
            raise RuntimeError("Could not locate date column in Emerging Mkt YC file.")

    df = df.rename(columns={date_col: "date"})
    df["date"] = pd.to_datetime(df["date"], errors="coerce", dayfirst=True)
    df = df.dropna(subset=["date"]).sort_values("date")

    records = []
    for c in df.columns:
        if c == "date":
            continue
        col = str(c).strip()
        m = _CCY_TEN_RE.match(col)
        if not m:
            m = _TEN_CCY_RE.match(col)
        if not m:
            continue

        ccy = m.group("ccy").upper()
        ten = float(m.group("ten"))
        unit = m.group("unit")
        ty = _tenor_years_from(ten, unit)

        if ccy not in EM_CCY:
            continue

        tmp = pd.DataFrame({"date": df["date"].values, "swap_rate": _coerce_rate_to_decimal(df[c])})
        tmp["ccy"] = ccy
        tmp["tenor_years"] = ty
        records.append(tmp)

    if not records:
        raise RuntimeError("Could not parse any {CCY,Tenor} columns from Emerging Mkt YC wide layout.")

    out = pd.concat(records, ignore_index=True)
    out = out.dropna(subset=["swap_rate"])
    out = out[["date", "ccy", "tenor_years", "swap_rate"]].sort_values(["ccy","date","tenor_years"]).reset_index(drop=True)
    return out

def _tidy_from_long(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()
    df.columns = [str(c).strip() for c in df.columns]
    lcols = {c.lower(): c for c in df.columns}

    date_col = lcols.get("date") or next((c for c in df.columns if "date" in c.lower()), None)
    ccy_col = lcols.get("ccy") or lcols.get("currency") or lcols.get("code")
    ten_col = lcols.get("tenor") or lcols.get("maturity") or lcols.get("term")
    rate_col = lcols.get("swap_rate") or lcols.get("rate") or lcols.get("value")

    if not (date_col and ccy_col and ten_col and rate_col):
        raise RuntimeError("Emerging Mkt YC long layout not recognized (need date/ccy/tenor/rate).")

    out = df[[date_col, ccy_col, ten_col, rate_col]].rename(
        columns={date_col: "date", ccy_col: "ccy", ten_col: "tenor", rate_col: "swap_rate"}
    )
    out["date"] = pd.to_datetime(out["date"], errors="coerce", dayfirst=True)
    out["ccy"] = out["ccy"].astype(str).str.upper().str.strip()

    # tenor can be numeric years or labels like '5Y'
    def _parse_ten(x: object) -> float | None:
        if x is None or (isinstance(x, float) and np.isnan(x)):
            return None
        s = str(x).strip().upper()
        if re.fullmatch(r"\d+(?:\.\d+)?", s):
            return float(s)
        m = re.match(r"^(\d+(?:\.\d+)?)(Y|YR|YEARS?|M|MO|MONTHS?)$", s)
        if not m:
            return None
        return _tenor_years_from(float(m.group(1)), m.group(2))

    out["tenor_years"] = out["tenor"].map(_parse_ten)
    out["swap_rate"] = _coerce_rate_to_decimal(out["swap_rate"])
    out = out.dropna(subset=["date", "ccy", "tenor_years", "swap_rate"])
    out = out[out["ccy"].isin(EM_CCY)]
    return out[["date","ccy","tenor_years","swap_rate"]].sort_values(["ccy","date","tenor_years"]).reset_index(drop=True)

def load_emerging_mkt_yc(*, force_download: bool = False) -> pd.DataFrame:
    url = os.getenv("EMERGING_MKT_YC_URL")
    if url:
        dst = RAW_DIR / "Emerging_Mkt_YC.xlsx"
        _download(url, dst, force=force_download)
        path = dst
    else:
        path = _find_emerging_yc_file()

    if path is None:
        raise FileNotFoundError(
            "Missing Emerging Mkt YC file. Download it from the class site and place it in ./data_raw/ "
            "(or set EMERGING_MKT_YC_URL in .env to a direct-download URL)."
        )

    suf = path.suffix.lower()
    if suf in {".parquet"}:
        df = pd.read_parquet(path)
        # try both long and wide tidiers
        try:
            return _tidy_from_long(df)
        except Exception:
            return _tidy_from_wide(df)

    if suf in {".csv", ".txt"}:
        df = pd.read_csv(path)
        try:
            return _tidy_from_long(df)
        except Exception:
            return _tidy_from_wide(df)

    if suf in {".xlsx", ".xls"}:
        # first try multiindex header (common in financial spreadsheets)
        try:
            df2 = pd.read_excel(path, header=[0,1])
            df2.columns = _flatten_columns(df2.columns)
            try:
                return _tidy_from_wide(df2)
            except Exception:
                pass
        except Exception:
            pass

        # fall back: try each sheet
        sheets = pd.read_excel(path, sheet_name=None, header=None)
        for sh, mat in sheets.items():
            # try to find a header row containing date and currency/tenor columns
            # heuristic: scan first 50 rows for 'date'
            hdr = None
            for i in range(min(len(mat), 60)):
                row = mat.iloc[i].astype(str).str.strip().str.lower()
                if (row == "date").any():
                    hdr = i
                    break
            if hdr is None:
                continue
            df = pd.read_excel(path, sheet_name=sh, header=hdr)
            df.columns = _flatten_columns(df.columns)
            try:
                return _tidy_from_wide(df)
            except Exception:
                try:
                    return _tidy_from_long(df)
                except Exception:
                    continue

        raise RuntimeError(f"Could not parse Emerging Mkt YC from: {path}")

    raise RuntimeError(f"Unsupported file type for Emerging Mkt YC: {path}")

# Load + save
em_yc = load_emerging_mkt_yc()
em_yc.to_parquet(CLEAN_DIR / "emerging_mkt_swap_curves.parquet", index=False)

print("Saved:", CLEAN_DIR / "emerging_mkt_swap_curves.parquet", "rows:", len(em_yc))
display(em_yc.head())

# quick requirement check: do we have 1Y and 5Y in each currency?
req_tenors = {1.0, 5.0}
chk = (
    em_yc.assign(tenor_round=em_yc["tenor_years"].round(6))
         .groupby("ccy")["tenor_round"]
         .apply(lambda s: req_tenors.issubset(set(s.unique())))
)
print("Has both 1Y and 5Y per currency?")
display(chk)


FileNotFoundError: Missing Emerging Mkt YC file. Download it from the class site and place it in ./data_raw/ (or set EMERGING_MKT_YC_URL in .env to a direct-download URL).

In [7]:

# -----------------------------
# Final: write required artifacts + sanity checks
# -----------------------------
sonia.to_parquet(CLEAN_DIR / "boe_IUDSOIA_sonia.parquet")

# Also save a simple USD-vs-GBP series for convenience
if "GBP" in fx_ccy_per_usd.columns:
    usdgbp = fx_ccy_per_usd[["GBP"]].rename(columns={"GBP": "GBP_per_USD"})
    gbpusd = fx_usd_per_ccy[["GBP"]].rename(columns={"GBP": "USD_per_GBP"})
    usdgbp.to_parquet(CLEAN_DIR / "usdgbp_ccy_per_usd.parquet")
    gbpusd.to_parquet(CLEAN_DIR / "gbpusd_usd_per_ccy.parquet")

print("Files in data_clean/:")
for p in sorted(CLEAN_DIR.glob("*.parquet")):
    print(" -", p.name)

print("\nCoverage checks:")
print("SONIA:", sonia.index.min().date(), "→", sonia.index.max().date(), "n=", len(sonia))
print("FX:", fx_long["date"].min().date(), "→", fx_long["date"].max().date(), "n=", len(fx_long))
print("EM YC:", em_yc["date"].min().date(), "→", em_yc["date"].max().date(), "n=", len(em_yc))


Files in data_clean/:
 - boe_IUDSOIA_sonia.parquet
 - boe_ois_daily_archive_raw.parquet
 - gbpusd_usd_per_ccy.parquet
 - nasdaq_edi_cur_ccy_per_usd_wide.parquet
 - nasdaq_edi_cur_fx_long.parquet
 - nasdaq_edi_cur_usd_per_ccy_wide.parquet
 - usdgbp_ccy_per_usd.parquet

Coverage checks:
SONIA: 1997-01-02 → 2026-02-12 n= 7357
FX: 2000-01-01 → 2026-02-16 n= 56369


NameError: name 'em_yc' is not defined