In [1]:
import requests
import pandas as pd
import math
from datetime import date
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
from pyproj import Transformer
import geopandas as gpd
from shapely.geometry import LineString
import os
import json
import time
import random
from requests.exceptions import RequestException

API_ROOT = "https://geoglows.ecmwf.int/api/v2"
LAYER_URL = "https://livefeeds3.arcgis.com/arcgis/rest/services/GEOGLOWS/GlobalWaterModel_Medium/MapServer/0/query"

# Create a session pool for better connection management
def create_session():
    session = requests.Session()
    session.headers.update({"User-Agent": "geoglows-comid-fetcher/1.0"})
    return session

# Thread-local session storage
import threading
_thread_local = threading.local()

def get_session():
    if not hasattr(_thread_local, 'session'):
        _thread_local.session = create_session()
    return _thread_local.session

# -----------------------------
# Enhanced request function for discharge data specifically
# -----------------------------
def safe_get_discharge_json(url, retries=5, base_delay=0.1, max_delay=5):
    """
    Specialized function for discharge API with aggressive retry logic for 500 errors
    """
    session = get_session()

    for attempt in range(retries):
        try:
            # Add small random jitter to distribute requests
            if attempt > 0:
                jitter = random.uniform(0, base_delay)
                time.sleep(jitter)

            r = session.get(url, timeout=30)

            if r.status_code == 200:
                return r.json()
            elif r.status_code == 500:
                # 500 errors seem to be transient, retry with exponential backoff
                if attempt < retries - 1:
                    wait_time = min(base_delay * (2 ** attempt), max_delay)
                    time.sleep(wait_time)
                    continue
                else:
                    return None
            else:
                r.raise_for_status()

        except RequestException as e:
            if attempt < retries - 1:
                wait_time = min(base_delay * (2 ** attempt), max_delay)
                time.sleep(wait_time)
            else:
                return None

    return None

# -----------------------------
# STEP 1: Fetch COMID geometries (unchanged)
# -----------------------------
def fetch_comids_with_geometry(base_url, where_clause="1=1", batch_size=2000, cache_file="comid_geometry_cache.json"):
    if os.path.exists(cache_file):
        with open(cache_file, "r") as f:
            print(f"🔄 Loading COMID geometries from cache: {cache_file}")
            return json.load(f)

    transformer = Transformer.from_crs("EPSG:3857", "EPSG:4326", always_xy=True)
    all_comids_with_geometry = {}
    offset = 0

    while True:
        params = {
            "where": where_clause,
            "outFields": "comid,streamorder",
            "returnGeometry": "true",
            "f": "json",
            "resultOffset": offset,
            "resultRecordCount": batch_size,
        }
        response = requests.get(base_url, params=params)
        response.raise_for_status()
        data = response.json()

        features = data.get("features", [])
        if not features:
            break

        for feature in features:
            attrs = feature.get("attributes", {})
            geometry = feature.get("geometry", {})
            comid = attrs.get("comid")
            streamorder = attrs.get("streamorder")

            if comid is not None and "paths" in geometry:
                raw_path = geometry["paths"][0]
                # Transform from Web Mercator (EPSG:3857) to WGS84 (EPSG:4326)
                # always_xy=True ensures transform returns (longitude, latitude)
                latlon_path = [transformer.transform(x, y) for x, y in raw_path]
                all_comids_with_geometry[comid] = {
                    "polyline": latlon_path,
                    "streamorder": streamorder,
                }

        offset += batch_size

    with open(cache_file, "w") as f:
        json.dump(all_comids_with_geometry, f)
        print(f"✅ Saved COMID geometries to cache: {cache_file}")

    # Debug: Check coordinate ranges for Nicaragua
    if all_comids_with_geometry:
        sample_comid = next(iter(all_comids_with_geometry))
        sample_coords = all_comids_with_geometry[sample_comid]["polyline"]
        lons = [coord[0] for coord in sample_coords]
        lats = [coord[1] for coord in sample_coords]
        print(f"🔍 Coordinate check - Lon range: {min(lons):.2f} to {max(lons):.2f}, Lat range: {min(lats):.2f} to {max(lats):.2f}")
        print(f"   Nicaragua should be roughly: Lon -87 to -83, Lat 10 to 15")

    return all_comids_with_geometry

# -----------------------------
# STEP 2: Keep return periods fetching as-is
# -----------------------------
def fetch_return_periods(comid_list, cache_file="return_periods_cache.json", max_workers=100):
    if os.path.exists(cache_file):
        with open(cache_file, "r") as f:
            print(f"🔄 Loading return periods from cache: {cache_file}")
            return json.load(f)

    def fetch_single_rp(comid):
        session = get_session()
        url = f"{API_ROOT}/returnperiods/{comid}?format=json&bias_corrected=true"
        try:
            r = session.get(url, timeout=30)
            r.raise_for_status()
            js = r.json()
            if "return_periods" in js:
                return comid, js["return_periods"]
        except:
            pass
        return comid, None

    rp_dict = {}
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {executor.submit(fetch_single_rp, comid): comid for comid in comid_list}
        for fut in tqdm(as_completed(futures), total=len(futures), desc="Fetching return periods"):
            comid, data = fut.result()
            if data:
                rp_dict[comid] = data

    with open(cache_file, "w") as f:
        json.dump(rp_dict, f)
        print(f"✅ Saved return periods to cache: {cache_file}")

    return rp_dict

# -----------------------------
# STEP 3: Optimized discharge fetching
# -----------------------------
def get_retrospective_day_robust(river_id: int, day: str, rp_data: dict):
    """Robust discharge fetching with specialized error handling"""
    try:
        retro_url = f"{API_ROOT}/retrospectivedaily/{river_id}?format=json&start_date={day}&end_date={day}&bias_corrected=true"

        js = safe_get_discharge_json(retro_url)

        if not js:
            return None

        date_str = js['datetime'][0]
        discharge = js[str(river_id)][0]
        unit = js['metadata']['units']['long']

        return_periods = rp_data.get(str(river_id)) or rp_data.get(int(river_id))
        return_period = 0
        if return_periods:
            for rp, threshold in sorted(return_periods.items(), key=lambda x: x[1]):
                if discharge >= threshold:
                    return_period = int(rp)

        return {
            "comid": river_id,
            "date": date_str,
            "discharge_m3s": discharge,
            "unit": unit,
            "exceeds_rp": return_period
        }

    except Exception as e:
        return None

# -----------------------------
# STEP 4: Adaptive discharge fetching with failure recovery
# -----------------------------
def fetch_all_discharge_adaptive(comid_list, target_date, rp_data, initial_workers=80, min_workers=20):
    """
    Adaptive approach: start with high concurrency, reduce if too many failures
    """
    failed_comids = []
    successful_results = []
    current_workers = initial_workers

    def process_batch(comids, workers):
        results = []
        failures = []

        with ThreadPoolExecutor(max_workers=workers) as executor:
            futures = {
                executor.submit(get_retrospective_day_robust, comid, target_date, rp_data): comid
                for comid in comids
            }

            for fut in tqdm(as_completed(futures), total=len(futures),
                          desc=f"Discharge ({workers} workers)"):
                result = fut.result()
                original_comid = futures[fut]

                if result:
                    results.append(result)
                else:
                    failures.append(original_comid)

        return results, failures

    # First attempt with high concurrency
    print(f"📈 First pass: trying {len(comid_list)} COMIDs with {current_workers} workers...")
    results, failures = process_batch(comid_list, current_workers)
    successful_results.extend(results)

    # Retry failed ones with reduced concurrency
    retry_count = 1
    max_retries = 5

    while failures and retry_count <= max_retries:
        current_workers = max(min_workers, current_workers // 2)
        print(f"🔄 Retry {retry_count}: {len(failures)} failed COMIDs with {current_workers} workers...")

        # Add some delay before retry
        time.sleep(2)

        retry_results, new_failures = process_batch(failures, current_workers)
        successful_results.extend(retry_results)
        failures = new_failures
        retry_count += 1

    if failures:
        print(f"⚠️ Final status: {len(failures)} COMIDs still failed after {max_retries} retries")
        print(f"Failed COMIDs: {failures[:10]}{'...' if len(failures) > 10 else ''}")

    success_rate = len(successful_results) / len(comid_list) * 100
    print(f"✅ Overall success rate: {success_rate:.1f}% ({len(successful_results)}/{len(comid_list)})")

    return successful_results

# -----------------------------
# Alternative: Chunked processing for very large datasets
# -----------------------------
def fetch_all_discharge_chunked(comid_list, target_date, rp_data, chunk_size=500, max_workers=60):
    """
    Process in chunks - good for very large datasets (6000+ COMIDs)
    """
    all_results = []
    chunks = [comid_list[i:i + chunk_size] for i in range(0, len(comid_list), chunk_size)]

    print(f"📊 Processing {len(comid_list)} COMIDs in {len(chunks)} chunks of {chunk_size}")

    for chunk_num, chunk in enumerate(chunks, 1):
        print(f"\n🔄 Chunk {chunk_num}/{len(chunks)} ({len(chunk)} COMIDs)...")

        # Use adaptive approach for each chunk
        chunk_results = fetch_all_discharge_adaptive(chunk, target_date, rp_data,
                                                   initial_workers=max_workers, min_workers=20)
        all_results.extend(chunk_results)

        # Short break between chunks
        if chunk_num < len(chunks):
            time.sleep(1)

    return all_results

# -----------------------------
# STEP 5: Main execution with options
# -----------------------------
if __name__ == "__main__":
    where_clause = "rivercountry='Nicaragua'"
    target_date = "20200315"
    # target_date = "20201117"

    # Step 1: Get COMID geometries
    print("🔍 Fetching geometries...")
    geometry_dict = fetch_comids_with_geometry(LAYER_URL, where_clause=where_clause)

    # Use all COMIDs or limit for testing
    comid_list = list(geometry_dict.keys())  # Remove [:100] to use all
    print(f"Found {len(comid_list)} COMIDs")

    # Step 2: Fetch return periods (keep high concurrency - this works fine)
    print(f"📊 Fetching return periods for {len(comid_list)} COMIDs...")
    rp_data = fetch_return_periods(comid_list, max_workers=100)

    # Step 3: Choose your discharge fetching strategy:

    if len(comid_list) <= 1000:
        # For smaller datasets: use adaptive approach
        print(f"📈 Using adaptive approach for {len(comid_list)} COMIDs...")
        discharge_data = fetch_all_discharge_adaptive(comid_list, target_date, rp_data)
    else:
        # For large datasets (6000+): use chunked approach
        print(f"📈 Using chunked approach for {len(comid_list)} COMIDs...")
        discharge_data = fetch_all_discharge_chunked(comid_list, target_date, rp_data,
                                                   chunk_size=500, max_workers=60)

    # Step 4: Combine with geometry
    combined_records = []
    for item in discharge_data:
        comid = item["comid"]
        if comid in geometry_dict:
            line = geometry_dict[comid]["polyline"]
            streamorder = geometry_dict[comid]["streamorder"]
            geom = LineString(line)
            record = {**item, "geometry": geom, "streamorder": streamorder}
            combined_records.append(record)

    # Step 5: Create GeoDataFrame and save
    if combined_records:
        gdf = gpd.GeoDataFrame(combined_records, geometry="geometry", crs="EPSG:4326")
        out_file = f"nicaragua_discharge_{target_date}.geojson"
        gdf.to_file(out_file, driver="GeoJSON")

        final_success_rate = len(combined_records) / len(comid_list) * 100
        print(f"\n🎉 FINAL RESULTS:")
        print(f"✅ Saved {len(gdf)} features to {out_file}")
        print(f"📊 Final success rate: {final_success_rate:.1f}% ({len(combined_records)}/{len(comid_list)})")
    else:
        print("❌ No data retrieved successfully")

🔍 Fetching geometries...
🔄 Loading COMID geometries from cache: comid_geometry_cache.json
Found 6592 COMIDs
📊 Fetching return periods for 6592 COMIDs...
🔄 Loading return periods from cache: return_periods_cache.json
📈 Using chunked approach for 6592 COMIDs...
📊 Processing 6592 COMIDs in 14 chunks of 500

🔄 Chunk 1/14 (500 COMIDs)...
📈 First pass: trying 500 COMIDs with 60 workers...


Discharge (60 workers): 100%|██████████| 500/500 [02:36<00:00,  3.20it/s]


🔄 Retry 1: 110 failed COMIDs with 30 workers...


Discharge (30 workers): 100%|██████████| 110/110 [00:37<00:00,  2.93it/s]


✅ Overall success rate: 100.0% (500/500)

🔄 Chunk 2/14 (500 COMIDs)...
📈 First pass: trying 500 COMIDs with 60 workers...


Discharge (60 workers): 100%|██████████| 500/500 [02:37<00:00,  3.18it/s]


🔄 Retry 1: 137 failed COMIDs with 30 workers...


Discharge (30 workers): 100%|██████████| 137/137 [00:47<00:00,  2.89it/s]


✅ Overall success rate: 100.0% (500/500)

🔄 Chunk 3/14 (500 COMIDs)...
📈 First pass: trying 500 COMIDs with 60 workers...


Discharge (60 workers): 100%|██████████| 500/500 [02:45<00:00,  3.02it/s]


🔄 Retry 1: 163 failed COMIDs with 30 workers...


Discharge (30 workers): 100%|██████████| 163/163 [00:47<00:00,  3.41it/s]


✅ Overall success rate: 100.0% (500/500)

🔄 Chunk 4/14 (500 COMIDs)...
📈 First pass: trying 500 COMIDs with 60 workers...


Discharge (60 workers): 100%|██████████| 500/500 [02:34<00:00,  3.25it/s]


🔄 Retry 1: 118 failed COMIDs with 30 workers...


Discharge (30 workers): 100%|██████████| 118/118 [01:04<00:00,  1.83it/s]


🔄 Retry 2: 17 failed COMIDs with 20 workers...


Discharge (20 workers): 100%|██████████| 17/17 [00:16<00:00,  1.05it/s]


✅ Overall success rate: 100.0% (500/500)

🔄 Chunk 5/14 (500 COMIDs)...
📈 First pass: trying 500 COMIDs with 60 workers...


Discharge (60 workers): 100%|██████████| 500/500 [02:33<00:00,  3.26it/s]


🔄 Retry 1: 151 failed COMIDs with 30 workers...


Discharge (30 workers): 100%|██████████| 151/151 [00:52<00:00,  2.89it/s]


🔄 Retry 2: 1 failed COMIDs with 20 workers...


Discharge (20 workers): 100%|██████████| 1/1 [00:06<00:00,  6.82s/it]


✅ Overall success rate: 100.0% (500/500)

🔄 Chunk 6/14 (500 COMIDs)...
📈 First pass: trying 500 COMIDs with 60 workers...


Discharge (60 workers): 100%|██████████| 500/500 [02:46<00:00,  3.01it/s]


🔄 Retry 1: 130 failed COMIDs with 30 workers...


Discharge (30 workers): 100%|██████████| 130/130 [01:08<00:00,  1.89it/s]


🔄 Retry 2: 30 failed COMIDs with 20 workers...


Discharge (20 workers): 100%|██████████| 30/30 [00:23<00:00,  1.29it/s]


✅ Overall success rate: 100.0% (500/500)

🔄 Chunk 7/14 (500 COMIDs)...
📈 First pass: trying 500 COMIDs with 60 workers...


Discharge (60 workers): 100%|██████████| 500/500 [02:13<00:00,  3.74it/s]


🔄 Retry 1: 151 failed COMIDs with 30 workers...


Discharge (30 workers): 100%|██████████| 151/151 [00:49<00:00,  3.03it/s]


✅ Overall success rate: 100.0% (500/500)

🔄 Chunk 8/14 (500 COMIDs)...
📈 First pass: trying 500 COMIDs with 60 workers...


Discharge (60 workers): 100%|██████████| 500/500 [02:55<00:00,  2.84it/s]


🔄 Retry 1: 188 failed COMIDs with 30 workers...


Discharge (30 workers): 100%|██████████| 188/188 [01:29<00:00,  2.11it/s]


🔄 Retry 2: 14 failed COMIDs with 20 workers...


Discharge (20 workers): 100%|██████████| 14/14 [00:19<00:00,  1.38s/it]


✅ Overall success rate: 100.0% (500/500)

🔄 Chunk 9/14 (500 COMIDs)...
📈 First pass: trying 500 COMIDs with 60 workers...


Discharge (60 workers): 100%|██████████| 500/500 [02:40<00:00,  3.11it/s]


🔄 Retry 1: 217 failed COMIDs with 30 workers...


Discharge (30 workers): 100%|██████████| 217/217 [01:29<00:00,  2.44it/s]


🔄 Retry 2: 2 failed COMIDs with 20 workers...


Discharge (20 workers): 100%|██████████| 2/2 [00:09<00:00,  4.53s/it]


✅ Overall success rate: 100.0% (500/500)

🔄 Chunk 10/14 (500 COMIDs)...
📈 First pass: trying 500 COMIDs with 60 workers...


Discharge (60 workers): 100%|██████████| 500/500 [02:40<00:00,  3.11it/s]


🔄 Retry 1: 194 failed COMIDs with 30 workers...


Discharge (30 workers): 100%|██████████| 194/194 [01:09<00:00,  2.80it/s]


🔄 Retry 2: 4 failed COMIDs with 20 workers...


Discharge (20 workers): 100%|██████████| 4/4 [00:09<00:00,  2.49s/it]


✅ Overall success rate: 100.0% (500/500)

🔄 Chunk 11/14 (500 COMIDs)...
📈 First pass: trying 500 COMIDs with 60 workers...


Discharge (60 workers): 100%|██████████| 500/500 [03:10<00:00,  2.63it/s]


🔄 Retry 1: 222 failed COMIDs with 30 workers...


Discharge (30 workers): 100%|██████████| 222/222 [01:10<00:00,  3.13it/s]


✅ Overall success rate: 100.0% (500/500)

🔄 Chunk 12/14 (500 COMIDs)...
📈 First pass: trying 500 COMIDs with 60 workers...


Discharge (60 workers): 100%|██████████| 500/500 [03:07<00:00,  2.67it/s]


🔄 Retry 1: 173 failed COMIDs with 30 workers...


Discharge (30 workers): 100%|██████████| 173/173 [00:57<00:00,  2.99it/s]


✅ Overall success rate: 100.0% (500/500)

🔄 Chunk 13/14 (500 COMIDs)...
📈 First pass: trying 500 COMIDs with 60 workers...


Discharge (60 workers): 100%|██████████| 500/500 [02:45<00:00,  3.01it/s]


🔄 Retry 1: 148 failed COMIDs with 30 workers...


Discharge (30 workers): 100%|██████████| 148/148 [01:05<00:00,  2.27it/s]


🔄 Retry 2: 3 failed COMIDs with 20 workers...


Discharge (20 workers): 100%|██████████| 3/3 [00:09<00:00,  3.16s/it]


✅ Overall success rate: 100.0% (500/500)

🔄 Chunk 14/14 (92 COMIDs)...
📈 First pass: trying 92 COMIDs with 60 workers...


Discharge (60 workers): 100%|██████████| 92/92 [00:35<00:00,  2.57it/s]


🔄 Retry 1: 29 failed COMIDs with 30 workers...


Discharge (30 workers): 100%|██████████| 29/29 [00:19<00:00,  1.51it/s]


✅ Overall success rate: 100.0% (92/92)

🎉 FINAL RESULTS:
✅ Saved 6592 features to nicaragua_discharge_20200315.geojson
📊 Final success rate: 100.0% (6592/6592)
