# Ingest plan - StatCan and CMHC sources

This notebook is the checklist for all inputs used by the Affordability Stress Index (ASI). It does not analyze results. It tells us what to download, where to save it, and how to keep the files consistent so later notebooks can run.

Use this notebook when you:
- add or replace a data source
- refresh data for a new reporting cycle
- want to show a collaborator where the raw data comes from

In [1]:
# Core imports + shared helpers
from __future__ import annotations

import time
from dataclasses import dataclass, asdict
from pathlib import Path

import pandas as pd
import requests
from bs4 import BeautifulSoup
from urllib.parse import urljoin, urlsplit

PROJECT = Path.cwd().resolve()
if PROJECT.name == "notebooks":
    PROJECT = PROJECT.parent

DATA_ROOT = PROJECT / "data"
RAW = DATA_ROOT / "raw"
PROCESSED = DATA_ROOT / "processed"
for path in (RAW, PROCESSED):
    path.mkdir(parents=True, exist_ok=True)

STATCAN_BASE = "https://www150.statcan.gc.ca/t1/wds/rest"
FULL_TABLE_CSV = f"{STATCAN_BASE}/getFullTableDownloadCSV"
DEFAULT_HEADERS = {
    "User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120 Safari/537.36",
    "Accept": "application/json, text/plain, */*",
}


def ensure_raw_destination(path: str | Path) -> Path:
    dest = Path(path)
    if not dest.is_absolute():
        dest = RAW / dest
    dest = dest.resolve()
    if RAW not in dest.parents and dest != RAW:
        raise ValueError(f"Destination {dest} must live under {RAW}")
    dest.parent.mkdir(parents=True, exist_ok=True)
    return dest


def retry_request(
    url: str,
    *,
    method: str = "GET",
    retries: int = 3,
    backoff: float = 1.5,
    **kwargs,
) -> requests.Response:
    """Issue an HTTP request with simple exponential backoff."""

    base_kwargs = dict(kwargs)
    last_error: Exception | None = None
    for attempt in range(1, retries + 1):
        try:
            request_kwargs = dict(base_kwargs)
            headers = DEFAULT_HEADERS.copy()
            extra_headers = request_kwargs.pop("headers", None)
            if extra_headers:
                headers.update(extra_headers)
            resp = requests.request(
                method,
                url,
                headers=headers,
                timeout=30,
                **request_kwargs,
            )
            resp.raise_for_status()
            return resp
        except Exception as exc:  # pragma: no cover - network operations are best-effort
            last_error = exc
            if attempt == retries:
                raise
            time.sleep(backoff * attempt)
    if last_error:
        raise last_error
    raise RuntimeError("retry_request exhausted without raising but no response returned")


@dataclass
class Dataset:
    dataset: str
    provider: str
    metric: str
    pid: str | None
    frequency: str
    geo_scope: str
    delivery: str
    target_file: Path | None
    automation_status: str
    status_note: str
    page_url: str | None = None
    direct_url: str | None = None

    def destination(self) -> Path | None:
        if self.target_file is None:
            return None
        return ensure_raw_destination(self.target_file)

    @property
    def table_number(self) -> str | None:
        if not self.pid:
            return None
        pid = str(self.pid)
        if len(pid) == 8:
            return f"{pid[:2]}-{pid[2:4]}-{pid[4:]}"
        return pid

## Workflow at a glance
- List each StatCan table ID (PID) and its expected filename in `data/raw/`.
- Record CMHC landing pages for tables that still require manual download.
- Use the helper functions to download files or build a manifest of what is missing.
- Hand the raw files to the ingest notebooks, which refuse to run if required files are missing.

In [2]:
# Curated dataset catalog (StatCan + CMHC)
datasets: list[Dataset] = [
    Dataset(
        dataset="cpi_all_items",
        provider="statcan",
        metric="Consumer Price Index, all-items (NSA)",
        pid="18100004",
        frequency="Monthly",
        geo_scope="Canada + provinces (CMA deflators derived downstream)",
        delivery="download_statcan_table",
        target_file=RAW / "cpi_all_items_18100004.csv",
        automation_status="automatic",
        status_note="Verify the latest CPI release (usually mid-month) before re-running.",
    ),
    Dataset(
        dataset="median_household_income",
        provider="statcan",
        metric="Median after-tax income by economic family type (CIS)",
        pid="11100035",
        frequency="Annual",
        geo_scope="Canada, provinces, and major CMAs",
        delivery="download_statcan_table",
        target_file=RAW / "median_household_income_11100035.csv",
        automation_status="automatic",
        status_note="CIS table provides CMA-level coverage for major metros; confirm vector availability for smaller metros before modeling.",
    ),
    Dataset(
        dataset="population_estimates",
        provider="statcan",
        metric="Population estimates, July 1 (CMA/CA, 2021 boundaries)",
        pid="17100148",
        frequency="Annual",
        geo_scope="Census metropolitan areas and agglomerations",
        delivery="download_statcan_table",
        target_file=RAW / "population_estimates_17100148.csv",
        automation_status="automatic",
        status_note="Release every February; used to scale metrics per 100k residents.",
    ),
    Dataset(
        dataset="unemployment_rate",
        provider="statcan",
        metric="Labour force characteristics by CMA (3-month moving avg, SA)",
        pid="14100459",
        frequency="Monthly",
        geo_scope="Census metropolitan areas",
        delivery="download_statcan_table",
        target_file=RAW / "unemployment_rate_14100459.csv",
        automation_status="automatic",
        status_note="Seasonally adjusted 3-month moving average preferred for stability.",
    ),
    Dataset(
        dataset="rental_market_rents",
        provider="cmhc",
        metric="Rental Market Report data tables",
        pid=None,
        frequency="Annual",
        geo_scope="Canada + major CMAs",
        delivery="download_cmhc_asset",
        target_file=RAW / "rental_market_report_latest.xlsx",
        automation_status="semi-automatic",
        status_note="Uses the last verified CMHC Azure blob URL; update when the 2026 release ships.",
        page_url="https://www.cmhc-schl.gc.ca/professionals/housing-markets-data-and-research/housing-data/rental-market/rental-market-report-data-tables",
    ),
    Dataset(
        dataset="housing_starts",
        provider="cmhc",
        metric="Monthly housing starts + under construction",
        pid=None,
        frequency="Monthly",
        geo_scope="Canada + CMAs",
        delivery="download_cmhc_asset",
        target_file=RAW / "housing_starts_latest.xlsx",
        automation_status="semi-automatic",
        status_note="Pinned to the November 2025 CMHC housing starts release; refresh when the next workbook is published.",
        page_url="https://www.cmhc-schl.gc.ca/professionals/housing-markets-data-and-research/housing-data/data-tables/housing-market-data/monthly-housing-starts-construction-data-tables",
    ),
]


def build_dataset_catalog(datasets: list[Dataset]) -> pd.DataFrame:
    catalog_records: list[dict[str, object]] = []
    for ds in datasets:
        record = asdict(ds)
        record["table_number"] = ds.table_number
        destination = ds.destination()
        record["target_file"] = str(destination) if destination else None
        catalog_records.append(record)

    return (
        pd.DataFrame(catalog_records)
        .sort_values("dataset")
        .reset_index(drop=True)
    )[
        [
            "dataset",
            "provider",
            "metric",
            "pid",
            "table_number",
            "frequency",
            "geo_scope",
            "delivery",
            "automation_status",
            "page_url",
            "direct_url",
            "target_file",
            "status_note",
        ]
    ]


dataset_catalog = build_dataset_catalog(datasets)

dataset_catalog

Unnamed: 0,dataset,provider,metric,pid,table_number,frequency,geo_scope,delivery,automation_status,page_url,direct_url,target_file,status_note
0,cpi_all_items,statcan,"Consumer Price Index, all-items (NSA)",18100004.0,18-10-0004,Monthly,Canada + provinces (CMA deflators derived down...,download_statcan_table,automatic,,,/Users/andrewharris/Projects/housing-affordabi...,Verify the latest CPI release (usually mid-mon...
1,housing_starts,cmhc,Monthly housing starts + under construction,,,Monthly,Canada + CMAs,download_cmhc_asset,semi-automatic,https://www.cmhc-schl.gc.ca/professionals/hous...,,/Users/andrewharris/Projects/housing-affordabi...,Pinned to the November 2025 CMHC housing start...
2,median_household_income,statcan,Median after-tax income by economic family typ...,11100035.0,11-10-0035,Annual,"Canada, provinces, and major CMAs",download_statcan_table,automatic,,,/Users/andrewharris/Projects/housing-affordabi...,CIS table provides CMA-level coverage for majo...
3,population_estimates,statcan,"Population estimates, July 1 (CMA/CA, 2021 bou...",17100148.0,17-10-0148,Annual,Census metropolitan areas and agglomerations,download_statcan_table,automatic,,,/Users/andrewharris/Projects/housing-affordabi...,Release every February; used to scale metrics ...
4,rental_market_rents,cmhc,Rental Market Report data tables,,,Annual,Canada + major CMAs,download_cmhc_asset,semi-automatic,https://www.cmhc-schl.gc.ca/professionals/hous...,,/Users/andrewharris/Projects/housing-affordabi...,Uses the last verified CMHC Azure blob URL; up...
5,unemployment_rate,statcan,Labour force characteristics by CMA (3-month m...,14100459.0,14-10-0459,Monthly,Census metropolitan areas,download_statcan_table,automatic,,,/Users/andrewharris/Projects/housing-affordabi...,Seasonally adjusted 3-month moving average pre...


### Catalog guidance
- `pid` and `table_number` are the IDs for StatCan tables. If they change, update them here first.
- `automation_status` shows whether a file can be fetched by script or needs manual download.
- `target_file` is the exact filename expected in `data/raw/`.
- `status_note` is plain-language context on lags or coverage limits.

In [3]:
# Download helpers — thin wrappers that other notebooks/scripts can call
import shutil
import zipfile


def download_statcan_table(pid: str | int, dest_path: Path, *, lang: str = "en") -> Path:
    """Fetch a full StatCan table (CSV) via WDS and write it to dest_path."""
    dest = ensure_raw_destination(dest_path)
    if dest.exists():
        print(f"{dest.name} already present; skipping download.")
        return dest

    manifest_url = f"{FULL_TABLE_CSV}/{pid}/{lang}"
    manifest = retry_request(manifest_url, method="GET").json()
    if isinstance(manifest, dict):
        object_url = manifest.get("object")
        status = manifest.get("status", "SUCCESS")
        if status != "SUCCESS":
            raise RuntimeError(f"StatCan returned status={status} for pid={pid}: {manifest}")
    elif isinstance(manifest, list) and manifest:
        object_url = manifest[0].get("object")
    else:
        raise RuntimeError(f"Unexpected manifest payload for pid={pid}: {manifest}")

    if not object_url:
        raise RuntimeError(f"Manifest did not include a download object for pid={pid}")

    zip_path = dest.with_suffix(dest.suffix + ".zip")
    resp = retry_request(object_url, stream=True)
    with open(zip_path, "wb") as fh:
        for chunk in resp.iter_content(1024 * 1024):
            if chunk:
                fh.write(chunk)
    resp.close()

    with zipfile.ZipFile(zip_path, "r") as zf:
        csv_members = [name for name in zf.namelist() if name.lower().endswith(".csv")]
        if not csv_members:
            raise RuntimeError(f"Zip for pid={pid} did not include a CSV file: {zf.namelist()}")
        first_csv = csv_members[0]
        with zf.open(first_csv) as src, open(dest, "wb") as dst:
            shutil.copyfileobj(src, dst)

    zip_path.unlink(missing_ok=True)
    print(f"Wrote {dest.relative_to(PROJECT)}")
    return dest


def download_cmhc_asset(url: str, dest_path: Path) -> Path:
    """Download an XLSX/CSV from CMHC once a stable direct URL is known."""
    dest = ensure_raw_destination(dest_path)
    if dest.exists():
        print(f"{dest.name} already present; skipping download.")
        return dest
    resp = retry_request(url, stream=True, headers={"Accept": "application/octet-stream"})
    ctype = (resp.headers.get("Content-Type") or "").lower()
    if "text/html" in ctype:
        resp.close()
        raise RuntimeError(f"Expected a file download, received HTML for {url}")
    with open(dest, "wb") as fh:
        for chunk in resp.iter_content(1024 * 256):
            if chunk:
                fh.write(chunk)
    resp.close()
    print(f"Wrote {dest.relative_to(PROJECT)}")
    return dest


def scrape_cmhc_direct_url(page_url: str) -> str | None:
    """Grab the first XLSX/CSV link from a CMHC landing page (best-effort)."""
    try:
        resp = retry_request(page_url, headers={"Accept": "text/html"})
    except Exception as exc:  # pragma: no cover - network guard
        print(f"Failed to load CMHC page {page_url}: {exc}")
        return None
    soup = BeautifulSoup(resp.text, "html.parser")
    candidates: list[str] = []

    def consider(raw_url: str | None):
        if not raw_url:
            return
        normalized = urljoin(page_url, raw_url.strip())
        path_with_no_query = urlsplit(normalized).path.lower()
        # CMHC blob URLs often end with .xlsx but add ?rev= query params
        if path_with_no_query.endswith((".xlsx", ".xls", ".csv")):
            candidates.append(normalized)

    for tag in soup.find_all("a"):
        consider(tag.get("href"))
    for tag in soup.find_all("input"):
        consider(tag.get("value"))

    seen = set()
    for link in candidates:
        if link not in seen:
            seen.add(link)
            return link
    return None


def build_run_manifest(catalog: pd.DataFrame) -> pd.DataFrame:
    """Summarize what needs to run, whether the file already exists, and next steps."""
    def action(row: pd.Series) -> str:
        if row["provider"] == "statcan" and row["pid"]:
            return f"download_statcan_table({row['pid']}, target_file)"
        if row["provider"] == "cmhc" and row.get("direct_url"):
            return "download_cmhc_asset(direct_url, target_file)"
        if row["provider"] == "cmhc":
            return "Attempt scrape_cmhc_direct_url(page) or download manually"
        return "Review configuration"

    manifest = catalog.copy()
    manifest["target_file"] = manifest["target_file"].apply(lambda p: str(ensure_raw_destination(Path(p))) if isinstance(p, str) else None)
    manifest["exists"] = manifest["target_file"].apply(lambda p: Path(p).exists() if isinstance(p, str) else False)
    manifest["action"] = manifest.apply(action, axis=1)
    return manifest[[
        "dataset",
        "provider",
        "automation_status",
        "action",
        "target_file",
        "exists",
        "status_note",
    ]]


## Operating instructions
1. Run the catalog cell and confirm every `target_file` lives under `data/raw/`.
2. Run the helper cell so the download functions are available in memory.
3. Optional: run the auto-download cell to fetch missing StatCan tables and any CMHC files with a direct link.
4. Re-run `build_run_manifest` to confirm what is still missing.

### Automated refresh (saves into `data/raw/`)
When `AUTO_DOWNLOAD = True`, the script downloads missing StatCan tables and tries to fetch CMHC files if a direct URL is known. If CMHC does not provide a stable URL, the script will stop and remind you to download the file manually.

In [4]:
from datetime import datetime

AUTO_DOWNLOAD = True  # Set False to preview actions without fetching files
FORCE_DOWNLOAD = False  # Set True to re-download even if the file already exists

refresh_records = []
run_started = datetime.utcnow().isoformat()

for ds in datasets:
    dest = ds.destination()
    record = {
        "dataset": ds.dataset,
        "provider": ds.provider,
        "target_file": str(dest) if dest else None,
        "run_started_utc": run_started,
        "result": "skipped",
        "notes": "",
    }

    if dest is None:
        record["result"] = "missing_target"
        record["notes"] = "No target_file configured"
        refresh_records.append(record)
        continue

    if not AUTO_DOWNLOAD:
        record["notes"] = "AUTO_DOWNLOAD disabled"
        refresh_records.append(record)
        continue

    if ds.provider == "statcan" and ds.pid:
        if dest.exists() and not FORCE_DOWNLOAD:
            record["result"] = "exists"
            record["notes"] = "File already present"
        else:
            download_statcan_table(ds.pid, dest)
            record["result"] = "downloaded"
    elif ds.provider == "cmhc":
        resolved_url = ds.direct_url
        scrape_note = ""
        if not resolved_url and ds.page_url:
            resolved_url = scrape_cmhc_direct_url(ds.page_url)
            if resolved_url:
                scrape_note = "Scraped direct URL from landing page"
        if not resolved_url:
            record["result"] = "manual_required"
            record["notes"] = "No direct_url available — download from landing page"
        else:
            if dest.exists() and not FORCE_DOWNLOAD:
                record["result"] = "exists"
                record["notes"] = "File already present"
                if scrape_note:
                    record["notes"] += f"; {scrape_note}"
            else:
                note_parts: list[str] = [scrape_note] if scrape_note else []
                try:
                    download_cmhc_asset(resolved_url, dest)
                    record["result"] = "downloaded"
                except requests.HTTPError as exc:
                    status_code = exc.response.status_code if exc.response else "unknown"
                    note_parts.append(
                        f"HTTPError {status_code} — direct_url likely stale; grab manually"
                    )
                    record["result"] = "manual_required"
                except Exception as exc:  # pragma: no cover - defensive fallback
                    note_parts.append(f"Unexpected download error: {exc}")
                    record["result"] = "manual_required"
                finally:
                    record["notes"] = "; ".join(part for part in note_parts if part)
    else:
        record["result"] = "unknown_provider"
        record["notes"] = f"Unhandled provider {ds.provider}"

    refresh_records.append(record)

refresh_df = pd.DataFrame(refresh_records)
refresh_df

Failed to load CMHC page https://www.cmhc-schl.gc.ca/professionals/housing-markets-data-and-research/housing-data/rental-market/rental-market-report-data-tables: 404 Client Error: Page not found for url: https://www.cmhc-schl.gc.ca/404


Unnamed: 0,dataset,provider,target_file,run_started_utc,result,notes
0,cpi_all_items,statcan,/Users/andrewharris/Projects/housing-affordabi...,2026-01-04T17:53:37.182238,exists,File already present
1,median_household_income,statcan,/Users/andrewharris/Projects/housing-affordabi...,2026-01-04T17:53:37.182238,exists,File already present
2,population_estimates,statcan,/Users/andrewharris/Projects/housing-affordabi...,2026-01-04T17:53:37.182238,exists,File already present
3,unemployment_rate,statcan,/Users/andrewharris/Projects/housing-affordabi...,2026-01-04T17:53:37.182238,exists,File already present
4,rental_market_rents,cmhc,/Users/andrewharris/Projects/housing-affordabi...,2026-01-04T17:53:37.182238,manual_required,No direct_url available — download from landin...
5,housing_starts,cmhc,/Users/andrewharris/Projects/housing-affordabi...,2026-01-04T17:53:37.182238,exists,File already present; Scraped direct URL from ...


In [5]:
# Manifest of pending actions (re-run after each refresh)
if "dataset_catalog" not in globals():
    dataset_catalog = build_dataset_catalog(datasets)
manifest = build_run_manifest(dataset_catalog)
manifest

Unnamed: 0,dataset,provider,automation_status,action,target_file,exists,status_note
0,cpi_all_items,statcan,automatic,"download_statcan_table(18100004, target_file)",/Users/andrewharris/Projects/housing-affordabi...,True,Verify the latest CPI release (usually mid-mon...
1,housing_starts,cmhc,semi-automatic,Attempt scrape_cmhc_direct_url(page) or downlo...,/Users/andrewharris/Projects/housing-affordabi...,True,Pinned to the November 2025 CMHC housing start...
2,median_household_income,statcan,automatic,"download_statcan_table(11100035, target_file)",/Users/andrewharris/Projects/housing-affordabi...,True,CIS table provides CMA-level coverage for majo...
3,population_estimates,statcan,automatic,"download_statcan_table(17100148, target_file)",/Users/andrewharris/Projects/housing-affordabi...,True,Release every February; used to scale metrics ...
4,rental_market_rents,cmhc,semi-automatic,Attempt scrape_cmhc_direct_url(page) or downlo...,/Users/andrewharris/Projects/housing-affordabi...,True,Uses the last verified CMHC Azure blob URL; up...
5,unemployment_rate,statcan,automatic,"download_statcan_table(14100459, target_file)",/Users/andrewharris/Projects/housing-affordabi...,True,Seasonally adjusted 3-month moving average pre...
