https://nomads.ncep.noaa.gov/

https://nomads.ncep.noaa.gov/pub/data/nccf/com/aqm/prod/cs.20230331/

In [None]:
import shutil
import time
from pathlib import Path

DOWNLOAD_DIR = Path("./downloads")
DOWNLOAD_DIR.mkdir(exist_ok=True)

In [None]:
import requests
from bs4 import BeautifulSoup

url = "https://nomads.ncep.noaa.gov/pub/data/nccf/com/aqm/prod/cs.20230331"

markup = BeautifulSoup(requests.get(url).content, "html5lib")

In [None]:
tags = markup.find_all("a")
tags

In [None]:
links = [f"{url}/{href}" for tag in tags if (href := tag['href']).startswith('aqm')]
links

In [None]:
from pyrate_limiter import Duration, RequestRate, Limiter, SQLiteBucket


# Thread and process safe I/O rate limiter
limiter = Limiter(
    RequestRate(1, Duration.SECOND),
    RequestRate(60, Duration.MINUTE),
    bucket_class=SQLiteBucket
)

In [None]:
import threading

thread_local = threading.local()

# Multithreaded I/O
def get_thread_local_requests_session():
    try:
        return thread_local.session
    except AttributeError:
        thread_local.session = requests.Session()
        return thread_local.session


def delete_thread_local_requests_session():
    try:
        thread_local.session.close()
        del thread_local.session
    except AttributeError:
        pass

In [None]:
from requests import Response, RequestException, HTTPError

def get_url(url: str, max_tries: int = 5, stream=False) -> Response:
    """
    Try to get a URL, retry on HTTP exceptions
    """

    session = get_thread_local_requests_session()

    for i in range(max_tries):

        time.sleep(i and 2**i)

        try:
            with limiter.ratelimit("NOMADS", delay=True, max_delay=60):
                response = session.get(url, stream=stream, timeout=(6.05, 30))
                
                # For this notebook
                print(response.request.method, response.request.url)
                print(response.status_code, response.reason)

            if response.status_code == 302:
                # Treat this like an error.
                # NOMADS sometimes returns 302 but with no headers, no location to redirect.
                # These empty 302 responses seem to go away after a 30-second cooldown.
                time.sleep(30)
                raise HTTPError("Unhandled 302", response=response)

            if response.ok:
                return response

            response.raise_for_status()

        except (ConnectionError, RequestException) as exc:
            # It looks like requests might be catching native ConnectionError exceptions and re-raising
            # them as its own requests.exceptions.ConnectionError (unrelated).
            # For that reason we're catching the generic RequestException,
            # which covers both requests.exceptions.HTTPError and requests.exceptions.ConnectionError.
            if i == max_tries-1:
                raise

            # Don't retry on (most) client errors
            # NOMADS sometimes returns 404 for a valid url, so retry
            try:
                if exc.response.status_code < 500 and exc.response.status_code not in {302, 429, 404}:
                    raise
            except AttributeError:
                pass

In [None]:
def download_file(path: Path, url: str) -> None:
    # Outer retry loop for successful requests with incomplete downloads
    for i in range(4):

        time.sleep(i and 5**i)

        with get_url(url, max_tries=6, stream=True) as response:
            try:
                expected_size = int(response.headers["Content-Length"])

                with path.open(mode="wb") as f:
                    shutil.copyfileobj(response.raw, f)  # https://stackoverflow.com/a/39217788/8793243

                if path.stat().st_size == expected_size:
                    
                    # For this notebook
                    print(f"Done downloading {path}")
                    
                    return

            except (KeyError, ValueError, FileNotFoundError):
                pass

        # With these errors, a fresh connection might help
        delete_thread_local_requests_session()

    # Make sure we don't end with a partial file
    path.unlink(missing_ok=True)

    raise RuntimeError(f"Unable to download {url}")

In [None]:
# Alternate basic version
def download_file(dest: str, url: str):
    with requests.get(url, stream=True) as response:
        with Path(dest).open(mode="wb") as f:
            shutil.copyfileobj(response.raw, f)

In [None]:
import os

# Limited resources on mybinder
MAX_THREADS = 1 if os.getenv('BINDER_LAUNCH_HOST') == 'https://mybinder.org/' else None

In [None]:
from concurrent.futures import ThreadPoolExecutor

with ThreadPoolExecutor(max_workers=MAX_THREADS) as executor:
    for link in links:
        print(f"submitting job for {link}")
        
        filename = link.rsplit('/', maxsplit=1)[1]
        
        executor.submit(download_file, DOWNLOAD_DIR / filename, link)