In [None]:
# Shared helpers

from pathlib import Path
import os, time, ssl, random
import urllib.request, urllib.error
import concurrent.futures
import re  # used later

# Repo-relative base (safe for reviewers)
BASE_DIR = Path("./data")
BASE_DIR.mkdir(parents=True, exist_ok=True)

# HTTPS context + polite UA (some hosts 403 without a UA)
_SSL_CTX = ssl.create_default_context()
_USER_AGENT = "Mozilla/5.0 (compatible; dataset-downloader/1.0)"

def fetch_url_to_file(
    url: str,
    local: Path,
    retries: int = 3,
    sleep_s: float = 2.0,
    chunk_size: int = 1 << 20
) -> str:
    """
    Download 'url' to 'local' with retries and atomic write (.part file).
    Returns: 'skip …', 'done …', or 'FAIL … (error)'.
    """
    try:
        if local.exists() and local.stat().st_size > 0:
            return f"skip  {local.name}"

        local.parent.mkdir(parents=True, exist_ok=True)
        tmp = local.with_suffix(local.suffix + ".part")
        if tmp.exists():
            try: tmp.unlink()
            except Exception: pass

        for attempt in range(1, retries + 1):
            try:
                req = urllib.request.Request(url, headers={"User-Agent": _USER_AGENT, "Accept": "*/*"})
                with urllib.request.urlopen(req, context=_SSL_CTX, timeout=120) as resp, open(tmp, "wb") as f:
                    while True:
                        block = resp.read(chunk_size)
                        if not block:
                            break
                        f.write(block)
                os.replace(tmp, local)  # atomic move
                return f"done  {local.name}"

            except urllib.error.HTTPError as e:
                if e.code in (429, 500, 502, 503, 504):
                    time.sleep(sleep_s * attempt * (1.0 + 0.25 * random.random()))
                    continue
                return f"FAIL  {local.name}  (HTTP {e.code}: {e.reason})"

            except Exception as e:
                if attempt == retries:
                    return f"FAIL  {local.name}  ({e})"
                time.sleep(sleep_s * attempt * (1.0 + 0.25 * random.random()))

        return f"FAIL  {local.name}  (exhausted retries)"

    finally:
        try:
            tmp = local.with_suffix(local.suffix + ".part")
            if tmp.exists():
                tmp.unlink()
        except Exception:
            pass

def run_parallel(items, worker_fn, max_workers: int = 8):
    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as exe:
        for msg in exe.map(worker_fn, items):
            print(msg)


In [None]:
# PRISM — daily precipitation (ppt), 4 km, CONUS (1991–2013)
# Source: https://prism.oregonstate.edu/time_series/us/4km/an/ppt/daily/YYYY/
# Files : PRISM_ppt_stable_4kmD2_YYYYMMDD_bil.zip
# Cite  : Daly et al., 2008; PRISM Climate Group, OSU

from datetime import date, timedelta

PRISM_OUT    = BASE_DIR / "prism" / "ppt_daily_zip"
PRISM_WORKERS= 8
PRISM_Y0, PRISM_Y1 = 1991, 2013
PRISM_BASE   = "https://data.prism.oregonstate.edu/daily"

def _date_range(y0: int, y1: int):
    d, end, step = date(y0,1,1), date(y1,12,31), timedelta(days=1)
    while d <= end:
        yield d
        d += step

def prism_tasks():
    for d in _date_range(PRISM_Y0, PRISM_Y1):
        fname = f"PRISM_ppt_stable_4kmD2_{d:%Y%m%d}_bil.zip"
        url   = f"{PRISM_BASE}/ppt/{d:%Y}/{fname}"
        local = PRISM_OUT / f"{d:%Y}" / fname
        yield (url, local)

def prism_worker(task):
    url, local = task
    return fetch_url_to_file(url, local)

PRISM_RUN = False  # flip True to download
if PRISM_RUN:
    tasks = list(prism_tasks())
    print(f"{len(tasks):,} files → {PRISM_OUT}")
    run_parallel(tasks, prism_worker, max_workers=PRISM_WORKERS)

In [None]:
# CHIRPS v2.0 — daily precipitation, 0.05° global (1991–2013)
# Dir   : https://data.chc.ucsb.edu/products/CHIRPS-2.0/global_daily/netcdf/p05/
# Files : chirps-v2.0.YYYY.days_p05.nc
# Cite  : Funk et al., 2015 (Sci Data)

CHIRPS_OUT     = BASE_DIR / "chirps" / "p05_daily_nc"
CHIRPS_WORKERS = 6
CHIRPS_Y0, CHIRPS_Y1 = 1991, 2013
CHIRPS_BASE    = "https://data.chc.ucsb.edu/products/CHIRPS-2.0/global_daily/netcdf/p05"

def chirps_tasks():
    for y in range(CHIRPS_Y0, CHIRPS_Y1 + 1):
        fname = f"chirps-v2.0.{y}.days_p05.nc"
        url   = f"{CHIRPS_BASE}/{fname}"
        local = CHIRPS_OUT / fname
        yield (url, local)

def chirps_worker(task):
    url, local = task
    return fetch_url_to_file(url, local)

CHIRPS_RUN = False
if CHIRPS_RUN:
    tasks = list(chirps_tasks())
    print(f"{len(tasks):,} files → {CHIRPS_OUT}")
    run_parallel(tasks, chirps_worker, max_workers=CHIRPS_WORKERS)

In [None]:
# ERA5 — hourly single levels (example: t2m, tp), 1991–2013
# Dataset : reanalysis-era5-single-levels
# Setup   : pip install cdsapi ; create ~/.cdsapirc with your CDS key
# Output  : one NetCDF per year (adjust area/vars as needed)
# Cite    : Hersbach et al., 2020; Copernicus C3S

import cdsapi

ERA5_OUT    = BASE_DIR / "era5" / "hourly_single_levels"
ERA5_Y0, ERA5_Y1 = 1991, 2013
ERA5_AREA   = [55.0, -95.0, 40.0, -74.0]  # N, W, S, E (edit if needed)
ERA5_VARS   = ["2m_temperature", "total_precipitation"]

def era5_download_year(y: int):
    target = ERA5_OUT / f"era5_hourly_t2m_tp_{y}.nc"
    if target.exists():
        print(f"skip  {target.name}")
        return
    c = cdsapi.Client()
    c.retrieve(
        "reanalysis-era5-single-levels",
        {
            "product_type": "reanalysis",
            "variable": ERA5_VARS,
            "year": f"{y}",
            "month": [f"{m:02d}" for m in range(1, 13)],
            "day":   [f"{d:02d}" for d in range(1, 32)],  # CDS accepts 1..31
            "time":  [f"{h:02d}:00" for h in range(24)],
            "format": "netcdf",
            "area": ERA5_AREA,
        },
        str(target),
    )
    print(f"done  {target.name}")

ERA5_RUN = False
if ERA5_RUN:
    ERA5_OUT.mkdir(parents=True, exist_ok=True)
    for y in range(ERA5_Y0, ERA5_Y1 + 1):
        era5_download_year(y)

In [None]:
# RDRS v2.1 (1-hour) — Ouranos THREDDS NetCDF Subset (NCSS)
# Catalog: https://pavics.ouranos.ca/.../thredds/catalog/datasets/reanalyses/catalog.html
# Dataset: datasets/reanalyses/1hr_RDRSv2.1_NAM.ncml
# Note   : Confirm variable names on NCSS UI
# Cite   : (project docs / Ouranos)

from urllib.parse import urlencode

RDRS_OUT     = BASE_DIR / "rdrs" / "v2_1_ncss"
RDRS_WORKERS = 4
RDRS_Y0, RDRS_Y1 = 1991, 2013

NCSS_BASE = (
    "https://pavics.ouranos.ca/twitcher/ows/proxy/thredds"
    "/ncss/datasets/reanalyses/1hr_RDRSv2.1_NAM.ncml"
)

# edit to your domain and vars
RDRS_BBOX = dict(miny=40.0, minx=-95.0, maxy=55.0, maxx=-74.0)
RDRS_VARS = ["tas", "pr"]

def rdrs_ncss_url_for_year(y: int) -> str:
    params = {
        "var": RDRS_VARS,
        "north": RDRS_BBOX["maxy"], "south": RDRS_BBOX["miny"],
        "east":  RDRS_BBOX["maxx"], "west":  RDRS_BBOX["minx"],
        "time_start": f"{y}-01-01T00:00:00Z",
        "time_end":   f"{y}-12-31T23:00:00Z",
        "accept": "netcdf",
    }
    return f"{NCSS_BASE}?{urlencode(params, doseq=True)}"

def rdrs_worker(y: int):
    url   = rdrs_ncss_url_for_year(y)
    local = RDRS_OUT / f"RDRSv2.1_{y}_subset.nc"
    return fetch_url_to_file(url, local)

RDRS_RUN = False
if RDRS_RUN:
    years = list(range(RDRS_Y0, RDRS_Y1 + 1))
    print(f"{len(years)} files → {RDRS_OUT}")
    run_parallel(years, rdrs_worker, max_workers=RDRS_WORKERS)

In [None]:
# EMDNA — download selected files (1991–2013) directly from FRDR (Dataverse APIs)
# DOI  : 10.20383/101.0275
# Docs : Native API (list files) + Data Access API (download by file id)
# Cite : Tang et al., 2021 (ESSD)

import json, urllib.request, urllib.parse

EMDNA_OUT   = BASE_DIR / "emdna"
FRDR_HOST   = "https://www.frdr-dfdr.ca"
DATASET_DOI = "doi:10.20383/101.0275"

LIST_URL = (f"{FRDR_HOST}/api/datasets/:persistentId/versions/:latest-published/files"
            f"?persistentId={urllib.parse.quote(DATASET_DOI)}")

# Filter to years/variables you need
YEAR_MIN, YEAR_MAX = 1991, 2013
KEEP_VARS = ("pr", "tmean", "trange")  # adapt as needed
_year_re = re.compile(r"(19|20)\d{2}")

def _is_keep(label: str) -> bool:
    lname = label.lower()
    if KEEP_VARS and not any(v in lname for v in KEEP_VARS):
        return False
    m = _year_re.search(lname)
    if not m:
        return False
    y = int(m.group(0))
    return YEAR_MIN <= y <= YEAR_MAX

def emdna_list_files():
    with urllib.request.urlopen(LIST_URL, context=_SSL_CTX, timeout=120) as r:
        payload = json.load(r)
    files = payload.get("data", []) if isinstance(payload, dict) else []
    recs = []
    for f in files:
        meta  = f.get("dataFile") or {}
        fid   = meta.get("id")
        label = f.get("label") or meta.get("filename")
        if fid and label and _is_keep(label):
            recs.append({"id": fid, "label": label})
    return recs

def emdna_tasks():
    for rec in emdna_list_files():
        fid, label = rec["id"], rec["label"]
        url   = f"{FRDR_HOST}/api/access/datafile/{fid}?format=original"
        local = EMDNA_OUT / label
        yield (url, local)

def emdna_worker(task):
    url, local = task
    return fetch_url_to_file(url, local)

EMDNA_RUN = False
if EMDNA_RUN:
    tasks = list(emdna_tasks())
    print(f"{len(tasks):,} files → {EMDNA_OUT}")
    run_parallel(tasks, emdna_worker, max_workers=6)


## Or you may simply go to this website and download any ensemble member of EMDNA: https://www.frdr-dfdr.ca/repo/dataset/4bb24ee2-73e1-43a8-a929-126d2eb2bfa3

In [None]:
# MERRA-2 — hourly single-level diagnostics (daily granules), 1991–2013
# Products: M2T1NXSLV.5.12.4 (T2M etc.), M2T1NXFLX.5.12.4 (precip rate/flux)
# Streams : 1980–1991:100, 1992–2000:200, 2001–2010:300, 2011–present:400
# Host   : goldsmr4.gesdisc.eosdis.nasa.gov (may vary to goldsmr5,…)
# Auth   : Earthdata Login via ~/.netrc (DO NOT commit credentials)
# Cite   : Gelaro et al., 2017

import requests
from datetime import date, timedelta

MERRA_OUT   = BASE_DIR / "merra2"
MERRA_HOST  = "https://goldsmr4.gesdisc.eosdis.nasa.gov"
COLL_SLV    = "MERRA2/M2T1NXSLV.5.12.4"
COLL_FLX    = "MERRA2/M2T1NXFLX.5.12.4"
DATE0, DATE1 = date(1991, 1, 1), date(2013, 12, 31)
PRODUCTS    = [("slv", COLL_SLV), ("flx", COLL_FLX)]  # trim as needed

def _stream(y: int) -> int:
    if y <= 1991:  return 100
    if y <= 2000:  return 200
    if y <= 2010:  return 300
    return 400

def _daily_url(prod_coll: str, d: date):
    stream = _stream(d.year)
    short  = prod_coll.split("/")[1].split(".", 1)[0]  # e.g., M2T1NXSLV
    group  = "slv" if "SLV" in short.upper() else "flx"
    fname  = f"MERRA2_{stream}.tavg1_2d_{group}_Nx.{d:%Y%m%d}.nc4"
    url    = f"{MERRA_HOST}/data/{prod_coll}/{d:%Y/%m}/{fname}"
    return url, fname

def _date_range(d0: date, d1: date):
    d, step = d0, timedelta(days=1)
    while d <= d1:
        yield d
        d += step

def merra_worker(task):
    url, local = task
    s = requests.Session()  # uses ~/.netrc for Earthdata during redirects
    try:
        with s.get(url, stream=True, timeout=120) as r:
            r.raise_for_status()
            if local.exists():
                return f"skip  {local.name}"
            local.parent.mkdir(parents=True, exist_ok=True)
            with open(local, "wb") as f:
                for chunk in r.iter_content(chunk_size=1 << 20):
                    if chunk:
                        f.write(chunk)
        return f"done  {local.name}"
    except Exception as e:
        return f"FAIL  {local.name}  ({e})"

def merra_tasks():
    for tag, coll in PRODUCTS:
        for d in _date_range(DATE0, DATE1):
            url, fname = _daily_url(coll, d)
            local = MERRA_OUT / tag / f"{d:%Y}" / fname
            yield (url, local)

MERRA_RUN = False  # WARNING: very large if True for full period+both products
if MERRA_RUN:
    tasks = list(merra_tasks())
    print(f"{len(tasks):,} daily files → {MERRA_OUT}  (consider narrowing dates/products)")
    run_parallel(tasks, merra_worker, max_workers=4)