# Download and process USBR reservoirs

The following notebook provides code to scrape the USBR reservoir data in order to find what USBR reservoirs are in the hydrofabric

In [None]:
import os
from pathlib import Path

import yaml
from dotenv import load_dotenv

# Changes the current working dir to be the project root
current_working_dir = Path.cwd()
os.chdir(current_working_dir / "../../../../")
print(
    f"Changed current working dir from {current_working_dir} to: {Path.cwd()}. This notebook must run at the project root"
)

# dir is where the .env file is located
load_dotenv(dotenv_path=Path.cwd())
pyiceberg_file = Path.cwd() / ".pyiceberg.yaml"
if pyiceberg_file.exists():
    os.environ["PYICEBERG_HOME"] = str(pyiceberg_file)
else:
    raise FileNotFoundError(
        "Cannot find .pyiceberg.yaml. Please download this from NGWPC confluence or create "
    )

# Loading the local pyiceberg config settings
try:
    with open(Path.cwd() / ".pyiceberg.yaml", encoding="utf-8") as file:
        pyiceberg_config = yaml.safe_load(file)
except FileNotFoundError as e:
    raise FileNotFoundError(f".pyiceberg YAML file not found in cwd: {Path.cwd() / '../../'}") from e
except yaml.YAMLError as e:
    raise yaml.YAMLError(f"Error parsing .pyiceberg YAML file: {e}") from e

Get all of the USBR data. For more inforrmation on the endpoint, see https://data.usbr.gov/rise/

In [None]:
import httpx

"""Get all reservoir locations from USBR RISE API"""
base_url = "https://data.usbr.gov/rise/api/location"
all_data = []
page = 1

print("Retrieving USBR locations...")

with httpx.Client(timeout=30.0) as client:
    while True:
        try:
            response = client.get(
                base_url,
                params={"page": page, "itemsPerPage": 100},
                headers={"Accept": "application/vnd.api+json"},
            )
            response.raise_for_status()

            content = response.json()
            data_page = content.get("data", [])

            if not data_page:
                break

            all_data.extend(data_page)
            print(f"Retrieved page {page}, total entries: {len(all_data)}")
            page += 1
        except httpx.RequestError as e:
            print(f"Error retrieving page {page}: {e}")
            break
        except httpx.HTTPStatusError as e:
            print(f"HTTP error retrieving page {page}: {e}")
            break

print(f"Total entries retrieved: {len(all_data)}")

Process all data from the requests and format them into a list based on which are lakes/reservoirs

In [None]:
import pandas as pd

# Process the data
location_type = "Lake/Reservoir"  # ensuring we only are about lakes and reservoirs
locations = []
location_types = {}

for _i, item in enumerate(all_data):
    attrs = item.get("attributes", {})
    coords_data = attrs.get("locationCoordinates", {})
    coords = coords_data.get("coordinates") if coords_data else None

    loc_type = attrs.get("locationTypeName")
    if loc_type:
        location_types[loc_type] = location_types.get(loc_type, 0) + 1

    if coords and len(coords) == 2:
        locations.append(
            {
                "locationName": attrs.get("locationName"),
                "longitude": coords[0],
                "latitude": coords[1],
                "locationType": loc_type,
                "location_id": attrs.get("_id"),
            }
        )

print("\nLocation types found:")
for loc_type, count in sorted(location_types.items(), key=lambda x: x[1], reverse=True):
    print(f"  {loc_type}: {count}")

# Convert to DataFrame and filter
reservoirs_df = pd.DataFrame(locations)

if location_type:
    reservoirs_df = reservoirs_df[reservoirs_df["locationType"] == location_type].copy()
    print(f"\nFiltered to {len(reservoirs_df)} '{location_type}' entries")

reservoirs_df.head()

Read in the hydrofabric in order to determine which lakes are already represented in the hydrofabric

In [None]:
import geopandas as gpd
import pandas as pd


def to_geopandas(df: pd.DataFrame, crs: str = "EPSG:5070") -> gpd.GeoDataFrame:
    """Converts the geometries in a pandas df to a geopandas dataframe

    Parameters
    ----------
    df: pd.DataFrame
        The iceberg table you are trying to read from
    crs: str, optional
        A string representing the CRS to set in the gdf, by default "EPSG:5070"

    Returns
    -------
    gpd.DataFrame
        The resulting queried row, but in a geodataframe

    Raises
    ------
    ValueError
        Raised if the table does not have a geometry column
    """
    if "geometry" not in df.columns:
        raise ValueError("The provided table does not have a geometry column.")

    return gpd.GeoDataFrame(df, geometry=gpd.GeoSeries.from_wkb(df["geometry"]), crs=crs)

In [None]:
from pyiceberg.catalog import load_catalog

# Loading SQL Catalog
# This catalog can be downloaded through the icefabric repo. We care about the conus_hf namespace
catalog = load_catalog(
    name="sql",
    type=pyiceberg_config["catalog"]["sql"]["type"],
    uri=pyiceberg_config["catalog"]["sql"]["uri"],
    warehouse=pyiceberg_config["catalog"]["sql"]["warehouse"],
)

# Loading Glue Catalog
# catalog = load_catalog("glue", **{
#     "type": "glue",
#     "glue.region": "us-east-1"
# })

lakes = to_geopandas(catalog.load_table("conus_hf.lakes").scan().to_pandas())
divides = to_geopandas(catalog.load_table("conus_hf.divides").scan().to_pandas())
hydrolocations = catalog.load_table("conus_hf.hydrolocations").scan().to_pandas()
pois = catalog.load_table("conus_hf.pois").scan().to_pandas()
network = catalog.load_table("conus_hf.network").scan().to_pandas()

In [None]:
from shapely.geometry import Point

# Create reservoir points
geometry = [Point(xy) for xy in zip(reservoirs_df.longitude, reservoirs_df.latitude, strict=True)]
reservoirs_gdf = gpd.GeoDataFrame(reservoirs_df, geometry=geometry, crs="EPSG:4326")

# Ensure matching CRS. This should be EPSG:5070
if reservoirs_gdf.crs != lakes.crs:
    print(f"Reprojecting reservoirs from {reservoirs_gdf.crs} to {lakes.crs}")
    reservoirs_gdf = reservoirs_gdf.to_crs(lakes.crs)

# Buffer the reservoir points
buffer_meters = 1000  # Adjust this value as needed
print(f"Applying {buffer_meters}m buffer to reservoir points...")

# Apply buffer in projected coordinates
reservoirs_buffered = reservoirs_gdf.copy()
reservoirs_buffered.geometry = reservoirs_buffered.geometry.buffer(buffer_meters)

# Spatial join
print("Performing spatial join...")
matches = gpd.sjoin(reservoirs_buffered, lakes, how="inner", predicate="intersects")

print(f"Found {len(matches)} reservoir-lake matches")
print(f"Matched {matches['locationName'].nunique()} unique reservoirs")

matches.head()

In [None]:
hydrolocations_first = hydrolocations.drop_duplicates(subset=["poi_id"], keep="first")

matches_extended = pd.merge(
    matches,
    hydrolocations_first,
    how="inner",
    on="poi_id",  # or whatever your join column is
)

In [None]:
matches_extended.to_parquet("data/usbr/reservoir_matches.parquet")
matches_extended.to_file("data/usbr/reservoir_matches.gpkg", driver="GPKG")