In [4]:
import polars as pl
import requests as req
import os
from dotenv import load_dotenv
from pathlib import Path
from io import StringIO
import time
import json


In [5]:
load_dotenv()

True

In [6]:
# Get NOAA token from environment variables
NOAA_TOKEN = os.getenv("NOAA_TOKEN")

# Define parameters
dataset = 'GHCND'
location = 'FIPS:19'
start = '2024-06-01'
end = '2024-06-30'
datatypes = 'TMAX,TMIN,PRCP'
units = 'standard'
limit = 100

# Build URL with f-string
url = f"https://www.ncei.noaa.gov/cdo-web/api/v2/data?datasetid={dataset}&locationid={location}&startdate={start}&enddate={end}&datatypeid={datatypes}&units={units}&limit={limit}"

headers = {'token': NOAA_TOKEN}
print(f"Full URL: {url}")


Full URL: https://www.ncei.noaa.gov/cdo-web/api/v2/data?datasetid=GHCND&locationid=FIPS:19&startdate=2024-06-01&enddate=2024-06-30&datatypeid=TMAX,TMIN,PRCP&units=standard&limit=100


In [4]:
response = req.get(url, headers=headers, timeout=120)

# Only parse JSON if status is 200
if response.status_code == 200:
    data = response.json()
else:
    print("\nError : ", response.status_code)

In [5]:
df_raw = pl.DataFrame(data['results'])
print(df_raw)
print("Unique datatypes:", df_raw['datatype'].unique())

shape: (100, 5)
┌─────────────────────┬──────────┬───────────────────┬────────────┬───────┐
│ date                ┆ datatype ┆ station           ┆ attributes ┆ value │
│ ---                 ┆ ---      ┆ ---               ┆ ---        ┆ ---   │
│ str                 ┆ str      ┆ str               ┆ str        ┆ f64   │
╞═════════════════════╪══════════╪═══════════════════╪════════════╪═══════╡
│ 2024-06-01T00:00:00 ┆ PRCP     ┆ GHCND:US1IAAD0002 ┆ ,,N,0800   ┆ 0.0   │
│ 2024-06-01T00:00:00 ┆ PRCP     ┆ GHCND:US1IAAL0003 ┆ T,,N,0700  ┆ 0.0   │
│ 2024-06-01T00:00:00 ┆ PRCP     ┆ GHCND:US1IAAL0005 ┆ ,,N,0700   ┆ 0.0   │
│ 2024-06-01T00:00:00 ┆ PRCP     ┆ GHCND:US1IAAL0006 ┆ ,,N,0700   ┆ 0.01  │
│ 2024-06-01T00:00:00 ┆ PRCP     ┆ GHCND:US1IAAL0007 ┆ ,,N,0700   ┆ 0.0   │
│ …                   ┆ …        ┆ …                 ┆ …          ┆ …     │
│ 2024-06-01T00:00:00 ┆ PRCP     ┆ GHCND:US1IAEM0003 ┆ ,,N,0700   ┆ 1.0   │
│ 2024-06-01T00:00:00 ┆ PRCP     ┆ GHCND:US1IAFM0001 ┆ ,,N,0800   ┆ 0.07

In [6]:
def fetch_all_iowa_weather(start_date, end_date):
    """
    Fetch all Iowa weather data with pagination.
    """
    NOAA_TOKEN = os.getenv('NOAA_TOKEN')

    all_results = []
    offset = 1
    limit = 1000  # Max allowed by NOAA

    base_url = "https://www.ncei.noaa.gov/cdo-web/api/v2/data"

    print(f"Fetching Iowa weather: {start_date} to {end_date}")

    while True:
        # Build URL with current offset
        url = f"{base_url}?datasetid=GHCND&locationid=FIPS:19&startdate={start_date}&enddate={end_date}&datatypeid=TMAX,TMIN,PRCP&units=standard&limit={limit}&offset={offset}"

        headers = {'token': NOAA_TOKEN}

        print(f"Fetching offset {offset}...", end=' ')

        try:
            response = req.get(url, headers=headers, timeout=300)

            if response.status_code != 200:
                print("\nError : ", response.status_code)
                break

            data = response.json()

            if 'results' not in data or len(data['results']) == 0:
                print("No more results")
                break

            batch_size = len(data['results'])
            all_results.extend(data['results'])

            print(f"got {batch_size} records (total: {len(all_results)})")

            # Check if we've reached the end
            if batch_size < limit:
                print("Reached end of data")
                break

            # Check metadata to see if there are more results
            if 'metadata' in data and 'resultset' in data['metadata']:
                total_count = data['metadata']['resultset']['count']
                if len(all_results) >= total_count:
                    print(f"Fetched all {total_count} records")
                    break

            offset += limit

            # NOAA rate limit: 5 requests per second
            # Be conservative: 4 requests per second
            time.sleep(0.3)

        except Exception as e:
            print(f"\nError: {e}")
            break

    print(f"\nTotal records fetched: {len(all_results)}")

    # Convert to Polars DataFrame
    df = pl.DataFrame(all_results)

    return df


# Fetch all data for June 2024
df_raw = fetch_all_iowa_weather('2024-06-01', '2024-06-30')

print("\n=== Final Dataset ===")
print(df_raw)

Fetching Iowa weather: 2024-06-01 to 2024-06-30
Fetching offset 1... got 1000 records (total: 1000)
Fetching offset 1001... got 1000 records (total: 2000)
Fetching offset 2001... got 1000 records (total: 3000)
Fetching offset 3001... got 1000 records (total: 4000)
Fetching offset 4001... got 1000 records (total: 5000)
Fetching offset 5001... got 1000 records (total: 6000)
Fetching offset 6001... got 1000 records (total: 7000)
Fetching offset 7001... got 1000 records (total: 8000)
Fetching offset 8001... got 1000 records (total: 9000)
Fetching offset 9001... got 1000 records (total: 10000)
Fetching offset 10001... got 1000 records (total: 11000)
Fetching offset 11001... got 1000 records (total: 12000)
Fetching offset 12001... got 1000 records (total: 13000)
Fetching offset 13001... 
Error :  503

Total records fetched: 13000

=== Final Dataset ===
shape: (13_000, 5)
┌─────────────────────┬──────────┬───────────────────┬────────────┬───────┐
│ date                ┆ datatype ┆ station    

In [7]:
def fetch_robust(
    start_date: str,
    end_date: str,
    max_retries: int = 3,
    checkpoint_file: str = 'weather_checkpoint.json'
) -> pl.DataFrame:
    """
    Weather fetcher more robust checkpointing in case of common 503 error
    - retries transient errors (503, etc)
    - checkpoints progress every N records to a file
    - resumes from checkpoint if interrupted

    Args:
        start_date (str): Start date in YYYY-MM-DD format.
        end_date (str): End date in YYYY-MM-DD format.
        max_retries (int): Maximum number of retries for failed requests.
        checkpoint_file (str): File to save checkpoint data.

    Returns:
        pl.DataFrame: DataFrame containing the fetched weather data.
    """
    NOAA_TOKEN = os.getenv('NOAA_TOKEN')
    checkpoint_path = Path(checkpoint_file)

    # Load checkpoint if it exists
    if checkpoint_path.exists():
        with open(checkpoint_path, 'r') as f:
            checkpoint_data = json.load(f)
        all_results = checkpoint_data['results']
        offset = checkpoint_data['offset'] + 1000
        print(f"Resuming from checkpoint: {len(all_results)} records, offset {offset}")
    else:
        print("Starting fresh fetch")
        all_results = []
        offset = 1

    base_url = 'https://www.ncei.noaa.gov/cdo-web/api/v2/data'
    limit = 1000  # Max allowed by NOAA

    try:
        while True:
            url = f"{base_url}?datasetid=GHCND&locationid=FIPS:19&startdate={start_date}&enddate={end_date}&datatypeid=TMAX,TMIN,PRCP&units=standard&limit={limit}&offset={offset}"

            # Retry logic
            success = False
            for attempt in range(max_retries):
                try:
                    response = req.get(url, headers={'token': NOAA_TOKEN}, timeout=300)

                    if response.status_code == 503:
                        wait = 2 ** attempt
                        print(f"503 error, retrying in {wait} seconds...")
                        time.sleep(wait)
                        continue

                    if response.status_code != 200:
                        print("\nError : ", response.status_code)
                        break

                    data = response.json()

                    if 'results' not in data or len(data['results']) == 0:
                        print("Done")
                        success = True
                        break

                    batch_size = len(data['results'])
                    all_results.extend(data['results'])

                    print(f"{batch_size} records (total: {len(all_results)})")

                    # Checkpoint every 5000 records
                    if len(all_results) % 5000 < 1000:
                        with open(checkpoint_path, 'w') as f:
                            json.dump({
                                'start_date': start_date,
                                'end_date': end_date,
                                'last_offset': offset,
                                'total_records': len(all_results),
                                'results': all_results
                            }, f)
                        print("Saved")

                    if batch_size < limit:
                        success = True
                        break

                    offset += limit
                    time.sleep(0.3)  # Rate limiting
                    success = True
                    break

                except Exception as e:
                    print(f"Error: {e}, attempt {attempt + 1} of {max_retries}")
                    if attempt < max_retries - 1:
                        time.sleep(2 ** attempt)

            if not success or (data and ('results' not in data or len(data['results']) == 0)):
                break

        # Clean up on success
        if checkpoint_path.exists():
            checkpoint_path.unlink()

        return pl.DataFrame(all_results)
    except KeyboardInterrupt:
        print("Interrupted, saving checkpoint...")
        with open(checkpoint_path, 'w') as f:
            json.dump({
                'start_date': start_date,
                'end_date': end_date,
                'last_offset': offset - 1000,
                'total_records': len(all_results),
                'results': all_results
            }, f)
        print("Checkpoint saved. Exiting.")
        return pl.DataFrame(all_results) if all_results else pl.DataFrame()


In [8]:
df = fetch_robust('2024-06-01', '2024-06-30')
print("\n=== Robust Fetch Dataset ===")
print(df)

Starting fresh fetch
1000 records (total: 1000)
1000 records (total: 2000)
1000 records (total: 3000)
1000 records (total: 4000)
1000 records (total: 5000)
Saved
1000 records (total: 6000)
1000 records (total: 7000)
1000 records (total: 8000)
1000 records (total: 9000)
1000 records (total: 10000)
Saved
1000 records (total: 11000)
1000 records (total: 12000)
1000 records (total: 13000)
503 error, retrying in 1 seconds...
1000 records (total: 14000)
1000 records (total: 15000)
Saved
1000 records (total: 16000)
1000 records (total: 17000)
1000 records (total: 18000)
1000 records (total: 19000)
900 records (total: 19900)
900 records (total: 20800)
Saved
900 records (total: 21700)
900 records (total: 22600)
900 records (total: 23500)
503 error, retrying in 1 seconds...
900 records (total: 24400)
900 records (total: 25300)
Saved
900 records (total: 26200)
900 records (total: 27100)
900 records (total: 28000)
900 records (total: 28900)
900 records (total: 29800)
900 records (total: 30700)
Sav

In [None]:
import requests
import polars as pl
import os
from dotenv import load_dotenv
from concurrent.futures import ThreadPoolExecutor, as_completed, wait, FIRST_COMPLETED
import time
from threading import Lock

load_dotenv()

def fetch_batch_safe(offset, start_date, end_date, token, max_retries=3):
    """Fetch a single batch with retries."""
    url = f"https://www.ncei.noaa.gov/cdo-web/api/v2/data?datasetid=GHCND&locationid=FIPS:19&startdate={start_date}&enddate={end_date}&datatypeid=TMAX,TMIN,PRCP&units=standard&limit=1000&offset={offset}"

    headers = {'token': token}

    for attempt in range(max_retries):
        try:
            response = requests.get(url, headers=headers, timeout=120)

            if response.status_code == 503:
                wait_time = 2 ** attempt
                print(f"  503 at offset {offset}, retry in {wait_time}s")
                time.sleep(wait_time)
                continue

            if response.status_code == 502:
                return (offset, None, 0)

            if response.status_code != 200:
                return (offset, None, 0)

            data = response.json()

            if 'results' not in data:
                return (offset, [], 0)

            results = data['results']
            return (offset, results, len(results))

        except Exception as e:
            if attempt < max_retries - 1:
                time.sleep(2 ** attempt)
            else:
                return (offset, None, 0)

    return (offset, None, 0)


def fetch_concurrent_sliding_window(start_date, end_date, max_workers=4):
    """
    Fetch with proper sliding window - FIXED VERSION.

    Key fix: Use while loop + wait() instead of as_completed()
    """
    NOAA_TOKEN = os.getenv('NOAA_TOKEN')

    if not NOAA_TOKEN:
        print("ERROR: NOAA_TOKEN not found!")
        return None

    print(f"Fetching Iowa weather: {start_date} to {end_date} (max {max_workers} workers)")

    all_results = []
    next_offset = 1
    empty_batches = 0
    max_empty = 3

    results_lock = Lock()
    start_time = time.monotonic()

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        active_futures = {}

        # Submit initial batch
        for i in range(max_workers):
            offset = next_offset
            future = executor.submit(fetch_batch_safe, offset, start_date, end_date, NOAA_TOKEN)
            active_futures[future] = offset
            next_offset += 1000

        print(f"Submitted initial {len(active_futures)} requests\n")

        # Process completions with while loop
        while active_futures:
            # Wait for at least one to complete
            done, pending = wait(active_futures.keys(), return_when=FIRST_COMPLETED, timeout=120)

            if not done:
                print("Timeout waiting for futures")
                break

            # Process all completed futures
            for future in done:
                offset = active_futures.pop(future)

                try:
                    fetch_offset, results, batch_size = future.result()

                    if results is None:
                        empty_batches += 1
                        print(f"Offset {fetch_offset:,}: Failed")

                        if empty_batches >= max_empty:
                            print(f"Stopping after {max_empty} failures")
                            # Cancel remaining
                            for f in active_futures.keys():
                                f.cancel()
                            active_futures.clear()
                            break

                    elif batch_size == 0:
                        empty_batches += 1
                        print(f"Offset {fetch_offset:,}: Empty")

                        if empty_batches >= max_empty:
                            print(f"End of data (after {max_empty} empty batches)")
                            for f in active_futures.keys():
                                f.cancel()
                            active_futures.clear()
                            break

                    else:
                        # Success!
                        empty_batches = 0

                        with results_lock:
                            all_results.extend(results)

                        print(f"Offset {fetch_offset:,}: {batch_size} records (total: {len(all_results):,})")

                        # Submit next batch (keep window full)
                        if next_offset < 100000:  # Safety limit
                            new_future = executor.submit(fetch_batch_safe, next_offset, start_date, end_date, NOAA_TOKEN)
                            active_futures[new_future] = next_offset
                            next_offset += 1000

                        time.sleep(0.25)  # Rate limit

                except Exception as e:
                    print(f"Offset {offset:,}: Exception {e}")
                    empty_batches += 1

            # Check stopping condition
            if empty_batches >= max_empty:
                break

    elapsed = time.monotonic() - start_time

    print(f"\n{'='*60}")
    print(f"Completed in {elapsed:.1f}s ({elapsed/60:.1f} min)")
    print(f"Total records: {len(all_results):,}")
    print(f"Speed: {len(all_results) / elapsed:.0f} records/sec")
    print(f"{'='*60}")

    return pl.DataFrame(all_results) if all_results else None



In [6]:
# Run with debug output
print("="*60)
print("STARTING CONCURRENT FETCH")
print("="*60)

df = fetch_concurrent_sliding_window('2024-06-01', '2024-06-30', max_workers=4)

if df is not None:
    print(f"\n=== Final Dataset ===")
    print(f"Shape: {df.shape}")
    print(df.head(10))
else:
    print("\nNo data returned")

STARTING CONCURRENT FETCH
Fetching Iowa weather: 2024-06-01 to 2024-06-30 (max 4 workers)
Submitted initial 4 requests

  503 at offset 3001, retry in 1s
Offset 1: 1000 records (total: 1,000)
Offset 2,001: 1000 records (total: 2,000)
Offset 1,001: 1000 records (total: 3,000)
  503 at offset 4001, retry in 1s
Offset 3,001: 1000 records (total: 4,000)
Offset 5,001: 1000 records (total: 5,000)
Offset 6,001: 1000 records (total: 6,000)
Offset 4,001: 1000 records (total: 7,000)
Offset 7,001: 1000 records (total: 8,000)
Offset 9,001: 1000 records (total: 9,000)
Offset 8,001: 1000 records (total: 10,000)
Offset 10,001: 1000 records (total: 11,000)
Offset 11,001: 1000 records (total: 12,000)
Offset 12,001: 1000 records (total: 13,000)
Offset 13,001: 1000 records (total: 14,000)
Offset 15,001: 1000 records (total: 15,000)
Offset 14,001: 1000 records (total: 16,000)
Offset 17,001: 1000 records (total: 17,000)
Offset 16,001: 1000 records (total: 18,000)
Offset 19,001: 900 records (total: 18,900)


In [7]:
print("\n=== Concurrent Sliding Window Fetch Dataset ===")
print(df)


=== Concurrent Sliding Window Fetch Dataset ===
shape: (19_900, 5)
┌─────────────────────┬──────────┬───────────────────┬────────────┬───────┐
│ date                ┆ datatype ┆ station           ┆ attributes ┆ value │
│ ---                 ┆ ---      ┆ ---               ┆ ---        ┆ ---   │
│ str                 ┆ str      ┆ str               ┆ str        ┆ f64   │
╞═════════════════════╪══════════╪═══════════════════╪════════════╪═══════╡
│ 2024-06-01T00:00:00 ┆ PRCP     ┆ GHCND:US1IAAD0002 ┆ ,,N,0800   ┆ 0.0   │
│ 2024-06-01T00:00:00 ┆ PRCP     ┆ GHCND:US1IAAL0003 ┆ T,,N,0700  ┆ 0.0   │
│ 2024-06-01T00:00:00 ┆ PRCP     ┆ GHCND:US1IAAL0005 ┆ ,,N,0700   ┆ 0.0   │
│ 2024-06-01T00:00:00 ┆ PRCP     ┆ GHCND:US1IAAL0006 ┆ ,,N,0700   ┆ 0.01  │
│ 2024-06-01T00:00:00 ┆ PRCP     ┆ GHCND:US1IAAL0007 ┆ ,,N,0700   ┆ 0.0   │
│ …                   ┆ …        ┆ …                 ┆ …          ┆ …     │
│ 2024-06-29T00:00:00 ┆ TMIN     ┆ GHCND:USC00133487 ┆ ,,7,0700   ┆ 61.0  │
│ 2024-06-29T00:00:0

In [9]:
# Check what dates are actually in your data
date_min = df['date'].str.strptime(pl.Date, '%Y-%m-%dT%H:%M:%S').min()
date_max = df['date'].str.strptime(pl.Date, '%Y-%m-%dT%H:%M:%S').max()

print("Actual date range in data:")
print(f"Earliest: {date_min}")
print(f"Latest: {date_max}")

# Count records per day
daily_counts = (
    df
    .with_columns([
        pl.col('date').str.strptime(pl.Date, '%Y-%m-%dT%H:%M:%S').alias('date_parsed')
    ])
    .group_by('date_parsed')
    .agg([
        pl.len().alias('num_records')
    ])
    .sort('date_parsed')
)

print("\nRecords per day:")
print(daily_counts)

# Check if June 30 exists
june_30_count = daily_counts.filter(pl.col('date_parsed') == pl.date(2024, 6, 30))
print(f"\nJune 30 records: {len(june_30_count)}")

Actual date range in data:
Earliest: 2024-06-01
Latest: 2024-06-30

Records per day:
shape: (30, 2)
┌─────────────┬─────────────┐
│ date_parsed ┆ num_records │
│ ---         ┆ ---         │
│ date        ┆ u32         │
╞═════════════╪═════════════╡
│ 2024-06-01  ┆ 674         │
│ 2024-06-02  ┆ 669         │
│ 2024-06-03  ┆ 684         │
│ 2024-06-04  ┆ 659         │
│ 2024-06-05  ┆ 717         │
│ …           ┆ …           │
│ 2024-06-26  ┆ 681         │
│ 2024-06-27  ┆ 647         │
│ 2024-06-28  ┆ 689         │
│ 2024-06-29  ┆ 670         │
│ 2024-06-30  ┆ 638         │
└─────────────┴─────────────┘

June 30 records: 1


In [10]:
df.write_parquet('C:\\Users\\samb2\\Documents\\GitHub\\ag-data-engineering-dashboard\\data\\raw\\iowa_weather_june2024.parquet')