Context: parameters relevant to the propagation of the West Nile Virus.

Years: 2017 to 2024

Months: July to September

Aggregation: yearly mean (July to September only for each year).

Parameters: 
* Climate -> Relative Humidity (2m), Precipitation, Max Temperature (2m), Min Temperature (2m), Wind Speed (10m). 
* Water -> Land Surface Temperature Day, Land Surface Temperature Night, Chlorophyll
* NDVI (mean) -> Agricultural (or croplands/farmlands), Forested land cover (Deciduous forest, Evergreen forest, Mixed forest), Rangeland (Grassland, Shrubland, Pasture, Natural grazing land) 

Region: County-level in the United States.

In [1]:
import pandas as pd
import geopandas as gpd

import numpy as np
import os
import time
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
from datetime import datetime

### OpenMeteo API - Obtainable Parameters:
* Climate -> Relative Humidity (2m), Precipitation, Max Temperature (2m), Min Temperature (2m), Wind Speed (10m). 
* Need source geoids (county fips codes) and the corresponding lat/lon using the TIGER API.

In [2]:
CENSUS_COUNTIES = "https://www2.census.gov/geo/tiger/GENZ2023/shp/cb_2023_us_county_500k.zip"
# connecticut used counties up until 2022, then switched to planning regions, which my WNV case data reflects
# thus, I will get the lat/lon and corresponding data for both time periods (and FIPS code variations). 
LEGACY_COUNTIES_CT = "https://www2.census.gov/geo/tiger/GENZ2018/shp/cb_2018_us_county_500k.zip"

In [3]:
df_all = pd.read_csv("https://raw.githubusercontent.com/Amoreno73/wnv_embeddings/refs/heads/main/notebooks/national_embeddings/national_wnv_case_data/all_data_normalized.csv")
df_all["GEOID"] = df_all["GEOID"].astype(str).str.zfill(5)
counties = gpd.read_file(CENSUS_COUNTIES).to_crs(4326)
legacy_counties = gpd.read_file(LEGACY_COUNTIES_CT).to_crs(4326)

In [4]:
legacy_subset = legacy_counties[legacy_counties["GEOID"].str.startswith("09")]

In [5]:
all_counties = pd.concat([counties, legacy_subset], ignore_index=True)
all_counties # now includes both legacy and new fips codes for CT

Unnamed: 0,STATEFP,COUNTYFP,COUNTYNS,GEOIDFQ,GEOID,NAME,NAMELSAD,STUSPS,STATE_NAME,LSAD,ALAND,AWATER,geometry,AFFGEOID
0,01,003,00161527,0500000US01003,01003,Baldwin,Baldwin County,AL,Alabama,06,4117725048,1132887203,"POLYGON ((-88.02858 30.22676, -88.02399 30.230...",
1,01,069,00161560,0500000US01069,01069,Houston,Houston County,AL,Alabama,06,1501742235,4795415,"POLYGON ((-85.71209 31.19727, -85.70934 31.198...",
2,01,005,00161528,0500000US01005,01005,Barbour,Barbour County,AL,Alabama,06,2292160151,50523213,"POLYGON ((-85.74803 31.61918, -85.74544 31.618...",
3,01,119,00161585,0500000US01119,01119,Sumter,Sumter County,AL,Alabama,06,2340898915,24634880,"POLYGON ((-88.41492 32.36452, -88.41471 32.366...",
4,05,091,00069166,0500000US05091,05091,Miller,Miller County,AR,Arkansas,06,1616257232,36848741,"POLYGON ((-94.04343 33.55158, -94.04332 33.552...",
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
3238,09,011,00212799,,09011,New London,,,,06,1722716728,276657755,"POLYGON ((-72.46673 41.5839, -72.42886 41.5889...",0500000US09011
3239,09,005,00212796,,09005,Litchfield,,,,06,2384116952,62334525,"POLYGON ((-73.51795 41.67086, -73.51678 41.687...",0500000US09005
3240,09,013,00212668,,09013,Tolland,,,,06,1062807467,17549693,"POLYGON ((-72.51733 41.8699, -72.51692 41.8736...",0500000US09013
3241,09,015,00212801,,09015,Windham,,,,06,1328478475,21477921,"POLYGON ((-72.25208 41.72706, -72.25264 41.728...",0500000US09015


In [6]:
all_counties.columns

Index(['STATEFP', 'COUNTYFP', 'COUNTYNS', 'GEOIDFQ', 'GEOID', 'NAME',
       'NAMELSAD', 'STUSPS', 'STATE_NAME', 'LSAD', 'ALAND', 'AWATER',
       'geometry', 'AFFGEOID'],
      dtype='str')

In [7]:
# Ensure county geometries are in WGS84 first
all_counties = all_counties.to_crs("EPSG:4326")

# Compute centroids in projected CRS, then convert back to WGS84 for API
centroids = all_counties.to_crs("EPSG:5070").geometry.centroid.to_crs("EPSG:4326")

all_counties["longitude"] = centroids.x
all_counties["latitude"] = centroids.y

# Hard guard before requests
bad = all_counties[
    ~all_counties["latitude"].between(-90, 90) |
    ~all_counties["longitude"].between(-180, 180)
]
assert bad.empty, f"Invalid lat/lon rows found: {len(bad)}"


In [8]:
cols_to_keep = ['GEOID', 'NAME', 'STUSPS', 'STATE_NAME',
       'geometry', 'latitude', 'longitude']

all_counties = all_counties[cols_to_keep]

In [9]:
# keep only the ones present in the national WNV embedding analysis. 
counties_interest = [c for c in set(all_counties["GEOID"]) if c in set(df_all["GEOID"])]
len(counties_interest)

3150

In [10]:
all_counties = all_counties[all_counties["GEOID"].isin(counties_interest)]
all_counties

Unnamed: 0,GEOID,NAME,STUSPS,STATE_NAME,geometry,latitude,longitude
0,01003,Baldwin,AL,Alabama,"POLYGON ((-88.02858 30.22676, -88.02399 30.230...",30.727083,-87.722293
1,01069,Houston,AL,Alabama,"POLYGON ((-85.71209 31.19727, -85.70934 31.198...",31.153231,-85.302390
2,01005,Barbour,AL,Alabama,"POLYGON ((-85.74803 31.61918, -85.74544 31.618...",31.869536,-85.393439
3,01119,Sumter,AL,Alabama,"POLYGON ((-88.41492 32.36452, -88.41471 32.366...",32.590808,-88.198762
4,05091,Miller,AR,Arkansas,"POLYGON ((-94.04343 33.55158, -94.04332 33.552...",33.312183,-93.892121
...,...,...,...,...,...,...,...
3238,09011,New London,,,"POLYGON ((-72.46673 41.5839, -72.42886 41.5889...",41.486562,-72.101497
3239,09005,Litchfield,,,"POLYGON ((-73.51795 41.67086, -73.51678 41.687...",41.792223,-73.245412
3240,09013,Tolland,,,"POLYGON ((-72.51733 41.8699, -72.51692 41.8736...",41.854871,-72.336511
3241,09015,Windham,,,"POLYGON ((-72.25208 41.72706, -72.25264 41.728...",41.829889,-71.987442


In [11]:
print(all_counties["latitude"].isna().sum())
print(all_counties["longitude"].isna().sum())

0
0


I will be iterating over `all_counties` to get the mean yearly weather for each `GEOID`, using the Open-Meteo API.

In [12]:
def filter_and_aggregate_by_year(data, year, county_id, lat, lon):
    '''
    Filter raw API data for a specific year (July-Sept) and aggregate to yearly metrics.

    Parameters:
    -----------
    data : dict
        Raw JSON response from Open-Meteo API
    year : int
        Year to filter for
    county_id : str
        GEOID of the county

    Returns:
    --------
    DataFrame with one row containing aggregated yearly metrics
    '''

    # Define date range for this year's mosquito season
    start_date = f"{year}-07-01"
    end_date = f"{year}-09-30"

    aggregated = {
        'GEOID': county_id,
        'year': year,
        'latitude': lat,
        'longitude': lon
    }

    # Process daily data (temperature)
    if "daily" in data and data["daily"]:
        daily_df = pd.DataFrame(data["daily"])
        daily_df['time'] = pd.to_datetime(daily_df['time'])

        # Filter for this year's July-Sept period
        mask = (daily_df['time'] >= start_date) & (daily_df['time'] <= end_date)
        daily_filtered = daily_df[mask]

        if not daily_filtered.empty:
            aggregated['mean_temp_2m_max'] = daily_filtered['temperature_2m_max'].mean()
            aggregated['mean_temp_2m_min'] = daily_filtered['temperature_2m_min'].mean()
        else:
            aggregated['mean_temp_2m_max'] = None
            aggregated['mean_temp_2m_min'] = None
    else:
        aggregated['mean_temp_2m_max'] = None
        aggregated['mean_temp_2m_min'] = None

    # Process hourly data (humidity, wind, precipitation)
    if "hourly" in data and data["hourly"]:
        hourly_df = pd.DataFrame(data["hourly"])
        hourly_df['time'] = pd.to_datetime(hourly_df['time'])

        # Filter for this year's July-Sept period
        mask = (hourly_df['time'] >= start_date) & (hourly_df['time'] <= end_date)
        hourly_filtered = hourly_df[mask].copy()

        if not hourly_filtered.empty:
            # Add date column for daily aggregation
            hourly_filtered['date'] = hourly_filtered['time'].dt.date

            # Aggregate hourly data to daily first
            daily_agg = hourly_filtered.groupby('date').agg({
                'relative_humidity_2m': 'mean',
                'wind_speed_10m': 'mean',
                'precipitation': 'sum'
            }).reset_index()

            # Then take mean across all days in the period
            aggregated['mean_humidity_2m'] = daily_agg['relative_humidity_2m'].mean()
            aggregated['mean_wind_speed_10m'] = daily_agg['wind_speed_10m'].mean()
            aggregated['mean_precipitation'] = daily_agg['precipitation'].mean()  # Mean daily precipitation
        else:
            aggregated['mean_humidity_2m'] = None
            aggregated['mean_wind_speed_10m'] = None
            aggregated['mean_precipitation'] = None
    else:
        aggregated['mean_humidity_2m'] = None
        aggregated['mean_wind_speed_10m'] = None
        aggregated['mean_precipitation'] = None

    return pd.DataFrame([aggregated])


def request_with_backoff(url, params, max_retries=8, base_wait_seconds=65):
    '''
    Robust request wrapper for Open-Meteo rate limiting.
    Retries on HTTP 429 and API payload errors mentioning limit exceeded.
    '''
    for attempt in range(max_retries):
        response = requests.get(url, params=params, timeout=60)

        # HTTP-level rate limit
        if response.status_code == 429:
            wait = min(120, base_wait_seconds + attempt * 10) + np.random.uniform(0, 2)
            print(f"Rate limited (HTTP 429). Waiting {wait:.1f}s then retrying...")
            time.sleep(wait)
            continue

        # API-level error payload
        try:
            payload = response.json()
            if isinstance(payload, dict) and payload.get("error") and "limit exceeded" in str(payload.get("reason", "")).lower():
                wait = min(120, base_wait_seconds + attempt * 10) + np.random.uniform(0, 2)
                print(f"Rate limited ({payload.get('reason')}). Waiting {wait:.1f}s then retrying...")
                time.sleep(wait)
                continue
        except Exception:
            payload = None

        response.raise_for_status()
        return payload if payload is not None else response.json()

    raise RuntimeError(f"Rate limit retries exhausted after {max_retries} attempts")


def fetch_county_all_years(row, years=range(2017, 2025), save_dir="weather_data"):
    '''
    Fetch ALL years for a single county in ONE API call.
    Caches each year separately for resumability.

    Parameters:
    -----------
    row : Series
        Must contain: GEOID, latitude, longitude
    years : range
        Years to fetch (default: 2017-2024)
    save_dir : str
        Directory to cache results

    Returns:
    --------
    DataFrame with one row per year, aggregated metrics
    '''

    county_id = row["GEOID"]
    lat = row["latitude"]
    lon = row["longitude"]

    # Guard against bad CRS / projected coordinates
    if not (-90 <= float(lat) <= 90) or not (-180 <= float(lon) <= 180):
        print(f"Skipping county {county_id}: invalid lat/lon ({lat}, {lon})")
        return None

    # Check if all years are already cached
    all_cached = True
    cached_results = []

    for year in years:
        filename = os.path.join(save_dir, f"county_{county_id}_year_{year}.parquet")
        if os.path.exists(filename):
            try:
                cached_results.append(pd.read_parquet(filename))
            except Exception as e:
                print(f"Failed to read cache for county {county_id}, year {year}: {e}")
                all_cached = False
                break
        else:
            all_cached = False
            break

    # If all years cached, return immediately
    if all_cached and cached_results:
        return pd.concat(cached_results, ignore_index=True)

    # Make API call for entire date range (all years at once)
    url = "https://archive-api.open-meteo.com/v1/archive"
    params = {
        "latitude": lat,
        "longitude": lon,
        "start_date": f"{min(years)}-07-01",  # July 1 of first year
        "end_date": f"{max(years)}-09-30",    # Sept 30 of last year
        "daily": ["temperature_2m_max", "temperature_2m_min"],
        "hourly": ["relative_humidity_2m", "wind_speed_10m", "precipitation"],
        "timezone": "auto"
    }

    try:
        # small per-request delay for gentler pacing
        time.sleep(0.2)
        data = request_with_backoff(url, params)

        # Check if data is valid
        if "hourly" not in data and "daily" not in data:
            raise ValueError(f"No weather data returned for county {county_id}")

        # Process and split by year
        results = []
        os.makedirs(save_dir, exist_ok=True)

        for year in years:
            # Filter and aggregate for this specific year
            year_aggregated = filter_and_aggregate_by_year(data, year, county_id, lat, lon)
            results.append(year_aggregated)

            # Cache each year separately (for resumability)
            filename = os.path.join(save_dir, f"county_{county_id}_year_{year}.parquet")
            year_aggregated.to_parquet(filename, index=False)

        return pd.concat(results, ignore_index=True)

    except requests.exceptions.Timeout:
        print(f"Timeout for county {county_id} at ({lat}, {lon})")
        return None
    except requests.exceptions.RequestException as e:
        print(f"Request failed for county {county_id}: {e}")
        return None
    except Exception as e:
        print(f"Unexpected error for county {county_id} at ({lat}, {lon}): {e}")
        return None


def get_missing_counties(counties_df, years, save_dir="weather_data"):
    '''
    Identify counties that need data (not fully cached).

    Parameters:
    -----------
    counties_df : DataFrame
        Must have GEOID column
    years : range
        Years to check
    save_dir : str
        Cache directory

    Returns:
    --------
    DataFrame of counties that need fetching
    '''

    missing_counties = []

    for _, row in counties_df.iterrows():
        county_id = row['GEOID']

        # Check if all years are cached
        all_years_cached = True
        for year in years:
            filename = os.path.join(save_dir, f"county_{county_id}_year_{year}.parquet")
            if not os.path.exists(filename):
                all_years_cached = False
                break

        if not all_years_cached:
            missing_counties.append(row)

    if missing_counties:
        return pd.DataFrame(missing_counties)
    else:
        return pd.DataFrame()


def chunk_dataframe(df, batch_size):
    '''Yield DataFrame chunks of size batch_size.'''
    for start in range(0, len(df), batch_size):
        yield df.iloc[start:start + batch_size]


def fetch_all_weather_data(
    counties_df,
    years=range(2017, 2025),
    max_workers=1,
    save_dir="weather_data",
    batch_size=25,
    batch_pause_seconds=65,
):
    '''
    Fetch weather data for all counties across all years with batching + throttling.

    Parameters:
    -----------
    counties_df : DataFrame
        Must have columns: GEOID, latitude, longitude
    years : range
        Years to fetch (default: 2017-2024)
    max_workers : int
        Number of parallel API requests (recommended 1-2 to avoid throttling)
    save_dir : str
        Directory to cache results
    batch_size : int
        Number of counties processed per batch before pausing
    batch_pause_seconds : int
        Sleep time between batches

    Returns:
    --------
    DataFrame with columns: GEOID, year, mean_temp_2m_max, mean_temp_2m_min,
                           mean_humidity_2m, mean_wind_speed_10m, mean_precipitation
    '''

    print(f"Total counties: {len(counties_df)}")
    print(f"Years: {list(years)}")
    print(f"Batch size: {batch_size}")
    print(f"Batch pause: {batch_pause_seconds}s")

    # Check what's already cached
    missing_counties = get_missing_counties(counties_df, years, save_dir)

    if missing_counties.empty:
        print("All data already cached! Loading from cache...")
    else:
        print(f"Counties needing fetch: {len(missing_counties)}/{len(counties_df)}")

    failed_counties = []

    if not missing_counties.empty:
        batches = list(chunk_dataframe(missing_counties.reset_index(drop=True), batch_size))

        for batch_idx, batch_df in enumerate(batches, start=1):
            print(f"\nBatch {batch_idx}/{len(batches)} | counties: {len(batch_df)}")

            with ThreadPoolExecutor(max_workers=max_workers) as executor:
                futures = {
                    executor.submit(fetch_county_all_years, row, years, save_dir): row['GEOID']
                    for _, row in batch_df.iterrows()
                }

                for future in tqdm(as_completed(futures), total=len(futures), desc=f"Batch {batch_idx}"):
                    county_id = futures[future]
                    try:
                        result = future.result()
                        if result is None or result.empty:
                            failed_counties.append(county_id)
                    except Exception as e:
                        print(f"Task failed for county {county_id}: {e}")
                        failed_counties.append(county_id)

            if batch_idx < len(batches):
                print(f"Pausing {batch_pause_seconds}s to respect API limits...")
                time.sleep(batch_pause_seconds)

    # Load ALL data from cache (including newly fetched)
    print("\nLoading all data from cache...")
    all_results = []

    for _, row in tqdm(counties_df.iterrows(), total=len(counties_df), desc="Loading cached data"):
        county_id = row['GEOID']
        county_data = []

        for year in years:
            filename = os.path.join(save_dir, f"county_{county_id}_year_{year}.parquet")
            if os.path.exists(filename):
                try:
                    county_data.append(pd.read_parquet(filename))
                except Exception as e:
                    print(f"Failed to load cache for county {county_id}, year {year}: {e}")

        if county_data:
            all_results.append(pd.concat(county_data, ignore_index=True))

    if all_results:
        final_df = pd.concat(all_results, ignore_index=True)
        final_df = final_df.sort_values(['GEOID', 'year']).reset_index(drop=True)

        final_df = final_df[[
            'GEOID',
            'mean_temp_2m_max', 'mean_temp_2m_min',
            'mean_humidity_2m', 'mean_wind_speed_10m',
            'mean_precipitation',
            'year'
        ]]

        print(f"\n{'='*60}")
        print("SUMMARY")
        print(f"{'='*60}")
        print(f"Total rows: {len(final_df)}")
        print(f"Unique counties: {final_df['GEOID'].nunique()}")
        print(f"Years covered: {sorted(final_df['year'].unique())}")
        print(f"Failed counties: {len(set(failed_counties))}")

        if failed_counties:
            failed_unique = sorted(set(failed_counties))
            print(f"Failed county GEOIDs (sample): {failed_unique[:10]}{'...' if len(failed_unique) > 10 else ''}")

        missing_data = final_df.isnull().sum()
        if missing_data.any():
            print("\nMissing values per column:")
            print(missing_data[missing_data > 0])

        return final_df

    print("No data retrieved!")
    return pd.DataFrame()


def save_weather_summary(weather_df, output_path="county_weather_2017_2024.csv"):
    '''Save final weather data and print summary statistics.'''

    weather_df.to_csv(output_path, index=False)
    print(f"\nWeather data saved to: {output_path}")
    print(f"File size: {os.path.getsize(output_path) / 1e6:.2f} MB")

    print("\nSample data:")
    print(weather_df.head(10).to_string())

    print(f"\n{'='*60}")
    print("Statistics by Year")
    print(f"{'='*60}")

    for year in sorted(weather_df['year'].unique()):
        year_data = weather_df[weather_df['year'] == year]
        print(f"\n{year}:")
        print(f"  Counties: {len(year_data)}")
        print(f"  Temp max: {year_data['mean_temp_2m_max'].mean():.1f} C (+/-{year_data['mean_temp_2m_max'].std():.1f})")
        print(f"  Temp min: {year_data['mean_temp_2m_min'].mean():.1f} C (+/-{year_data['mean_temp_2m_min'].std():.1f})")
        print(f"  Humidity: {year_data['mean_humidity_2m'].mean():.1f}% (+/-{year_data['mean_humidity_2m'].std():.1f})")
        print(f"  Wind: {year_data['mean_wind_speed_10m'].mean():.1f} m/s (+/-{year_data['mean_wind_speed_10m'].std():.1f})")
        print(f"  Precip: {year_data['mean_precipitation'].mean():.2f} mm/day (+/-{year_data['mean_precipitation'].std():.2f})")



In [None]:
# ========================================
# USAGE EXAMPLE
# ========================================

if __name__ == "__main__":
    
    # Load your counties data
    # Assuming df_all has GEOID, latitude, longitude
    # Get unique counties (avoid duplicates)
    all_counties = all_counties[['GEOID', 'latitude', 'longitude']].drop_duplicates().reset_index(drop=True)
    
    print(f"Processing {len(all_counties)} unique counties")
    
    # Fetch weather data
    weather_df = fetch_all_weather_data(
        counties_df=all_counties,
        years=range(2017, 2025),  # 2017-2024 (8 years)
        max_workers=1,  # keep low to avoid API throttling
        save_dir="weather_data",
        batch_size=25,
        batch_pause_seconds=65
    )
    
    # Save results
    if not weather_df.empty:
        save_weather_summary(weather_df, output_path="county_weather_2017_2024.csv")
        
        # Optional: Merge with main dataframe
        print("\nMerging with main dataframe...")
        
        # Pivot to wide format (one row per GEOID)
        weather_wide = weather_df.pivot(
            index='GEOID',
            columns='year',
            values=['mean_temp_2m_max', 'mean_temp_2m_min', 'mean_humidity_2m', 
                    'mean_wind_speed_10m', 'mean_precipitation']
        )
        
        # Flatten column names: mean_temp_2m_max_2017, mean_temp_2m_max_2018, ...
        weather_wide.columns = [f'{col}_{year}' for col, year in weather_wide.columns]
        weather_wide = weather_wide.reset_index()
        
        # Merge with main dataframe
        df_all_with_weather = df_all.merge(weather_wide, on='GEOID', how='left')
        
        print(f"✅ Final dataframe shape: {df_all_with_weather.shape}")
        print(f"   Original columns: {df_all.shape[1]}")
        print(f"   New weather columns: {weather_wide.shape[1] - 1}")  # -1 for GEOID
        
        # Save merged dataframe
        df_all_with_weather.to_csv("df_all_with_weather.csv", index=False)
        print(f"✅ Merged dataframe saved to: df_all_with_weather.csv")
    else:
        print("❌ No weather data retrieved!")

Processing 3150 unique counties
Total counties: 3150
Years: [2017, 2018, 2019, 2020, 2021, 2022, 2023, 2024]
Batch size: 40
Batch pause: 20s
Counties needing fetch: 3118/3150

Batch 1/78 | counties: 40


Batch 1:  12%|█▎        | 5/40 [00:09<01:01,  1.77s/it]

Rate limited (HTTP 429). Waiting 65.2s then retrying...


Batch 1:  18%|█▊        | 7/40 [00:13<00:59,  1.79s/it]

Rate limited (HTTP 429). Waiting 65.7s then retrying...


Batch 1:  35%|███▌      | 14/40 [01:29<01:54,  4.40s/it]

Rate limited (HTTP 429). Waiting 65.2s then retrying...
Rate limited (HTTP 429). Waiting 66.6s then retrying...
Rate limited (HTTP 429). Waiting 76.0s then retrying...
Rate limited (HTTP 429). Waiting 76.5s then retrying...


Batch 1:  50%|█████     | 20/40 [04:04<03:04,  9.22s/it]

Rate limited (HTTP 429). Waiting 66.8s then retrying...


Batch 1:  52%|█████▎    | 21/40 [04:05<02:11,  6.91s/it]

Rate limited (HTTP 429). Waiting 66.7s then retrying...


Batch 1:  52%|█████▎    | 21/40 [05:14<04:44, 14.97s/it]


Rate limited (HTTP 429). Waiting 65.1s then retrying...


In [None]:
df_all_with_weather