In [None]:
import dataretrieval.nwis as nwis
import pandas as pd
import pyarrow.parquet as pq
from datetime import datetime
import duckdb
import logging
import os
from typing import Optional
from pathlib import Path

In [None]:
usgs_sites = [
    "09152500",   # Gunnison River Near Grand Junction, CO
    "09095500",   # Colorado River Near Cameo, CO
    "09106150",   # Colorado River Below Grand Valley Div NR Palisade, CO
    "09106485",   # Colorado River Above Gunnison River at Grand Junction, CO
    "09163500",   # Colorado River Near Colorado-utah State Line
    "09306500",   # White River Near Watson, Utah
    "09251000",   # Yampa River Near Maybell, CO
    "09260050",   # Yampa River at Deerlodge Park, CO
    "09260000",   # Little Snake River Near Lily, CO
    "09261000",   # Green River Near Jensen, UT
    "09315000",   # Green River at Green River, UT
    "09302000",   # Duchesne River Near Randlett, UT
    "09180000",   # Dolores River Near Cisco, UT
    "09328960",   # Colorado River at Gypsum Canyon Near Hite, UT
    "09147022",   # Ridgeway Reservoir Near Ridgway, CO
    "09041395",   # Wolford Mtn Reservoir Nr Kremmling, CO
    "09379900",   # Lake Powell at Glen Canyon Dam, AZ
]

In [None]:
def fetch_data(site: str, pcode: str, start_date: str, end_date: str, service_code: str = 'iv') -> Optional[pd.DataFrame]:
    """
    Fetch data from NWIS for a given site and parameter code.
    Logs an error if the request fails.
    """
    try:
        df = nwis.get_record(
            sites=site,
            service=service_code,
            start=start_date,
            end=end_date,
            parameterCd=pcode
        )
    except Exception as e:
        logging.error(f"Error fetching data for site {site} and parameter {pcode}: {e}")
        return None
    
    if df.empty:
        logging.warning(f"No data returned for site {site} and parameter {pcode}.")
        return None
    
    return df


In [None]:
def transform_data(df: pd.DataFrame, site: str, pcode: str) -> pd.DataFrame:
    """
    Transform raw NWIS 'iv' data into standardized long format.
    
    Parameters:
        df: Raw dataframe from nwis.get_record()
        site: USGS site number
        pcode: Parameter code (e.g., '00060' for discharge)
    Returns:
        A cleaned DataFrame with standard columns:
        ['site', 'datetime', 'parameter', 'value', 'approval_status', 'year']
    """

    # Set fail-safe defaults incase no data is available
    if df is None or df.empty:
        logging.warning(f"No data to transform for site {site} and parameter {pcode}.")
        return pd.DataFrame()
    
    # Datetime is the index in the raw data
    df = df.reset_index()

    # Ensure datetime column is present
    if 'datetime' not in df.columns:
        logging.error(f"Missing 'datetime' column in data for site {site} and parameter {pcode}.")
        raise ValueError("Missing 'datetime' column.")
    
    # Identify value column and approval status column
    value_cols = [col for col in df.columns if col not in ('site_no', 'datetime') and not col.endswith('cd')]
    code_cols = [col for col in df.columns if col.endswith('cd')]

    if len(value_cols) != 1:
        logging.error(f"Expected exactly one value column for site {site} and parameter {pcode}. Found: {value_cols}")
        raise ValueError("Expected exactly one value column.")
    if len(code_cols) != 1:
        logging.error(f"Expected exactly one code column for site {site} and parameter {pcode}. Found: {code_cols}")
        raise ValueError("Expected exactly one code column.")
    
    value_col = value_cols[0]
    code_col = code_cols[0]

    # Construct clean output
    df_clean = pd.DataFrame({
        'site': site,
        'datetime': pd.to_datetime(df['datetime']),
        'parameter': pcode,
        'value': pd.to_numeric(df[value_col], errors='coerce'),
        'approval_status': df[code_col].str[0], # Assuming first character is the status (P or A)
    })

    # Derived field for partitioning
    df_clean['year'] = df_clean['datetime'].dt.year
    
    # Drop bad rows (e.g., NaN values in 'site', 'value' or 'datetime')
    required_fields = ['site', 'datetime', 'value']
    df_clean = df_clean.dropna(subset=required_fields)
    
    return df_clean
    



In [None]:
def write_to_datalake(df: pd.DataFrame, site: str, output_root: str) -> None:
    """
    Write transformed USGS IV data to partitioned parquet files in the datalake.

    Parameters: 
        df              : Transformed DataFrame with columns including ['site', 'datetime', 'parameter', 'value', 'approval_status', 'year']
        site            : USGS site number
        output_root     : Root directory for the datalake
    """
    if df.empty:
        logging.warning(f"No data to write for site {site}.")
        return
    
    # Sort data by datetime for performance and compression
    df_sorted = df.sort_values(by='datetime')

    # Partition by year and site and write to parquet
    for year, group in df_sorted.groupby('year'):
        output_path = Path(output_root) / "timeseries_iv" / f"site={site}" / f"year={year}"
        output_path.mkdir(parents=True, exist_ok=True)
        file_path = output_path / "data.parquet"
        try:
            group = group.copy()

            # Strip timezone from datetime if present
            if isinstance(group["datetime"].dtype, pd.DatetimeTZDtype):
                group["datetime"] = group["datetime"].dt.tz_localize(None)

            # Ensure correct data types
            group = group.astype({
                "site": "string",
                "datetime": "datetime64[ns]",
                "parameter": "string",
                "value": "float64",
                "approval_status": "string",
                "year": "int64"
            })

            duckdb.register("temp_df", group)
            duckdb.sql(f"COPY temp_df TO '{file_path}' (FORMAT PARQUET)")
            duckdb.unregister("temp_df")
            logging.info(f"{len(group)} rows → {file_path}")
        except Exception as e:
            logging.error(f"Error writing data for site {site} and year {year}: {e}")
            raise

In [None]:
# NWIS Data Capture for Streamflow and Water Quality - "Instantaneous Values"
# This script fetches data from the USGS NWIS database for specified sites and parameter codes.

# === CONFIGURATION ===
sites = usgs_sites
parameter_codes = ['00060', '00010', '62614']
start_date = '2022-01-01'
end_date = '2025-01-01'
service_code = 'iv'
output_root = 'data/data_lake'
# ======================


# Configure logging ------------------------------------------------
os.makedirs('logs', exist_ok=True)
log_name = 'logs/' + datetime.now().strftime('%Y-%m-%d_%H-%M-%S') + '.log'
logging.basicConfig(filename=log_name,
                    level=logging.INFO,
                    format='%(asctime)s - %(levelname)s - %(message)s')
# -------------------------------------------------------------------

# Itterate through each site
for site in sites:
    print(f"Processing site: {site}")
    all_data = []

    # Itterate through each parameter code
    for pcode in parameter_codes:
        
        # Fetch data for the current site and parameter code from NWIS
        df_raw = fetch_data(site, pcode, start_date, end_date, service_code)
        if df_raw is None:
            continue

        # Clean and transform the data, standardizing column names and types
        df_transformed = transform_data(df_raw, site, pcode)
        if df_transformed.empty:
            continue

        # Write the data to a parquet file
        write_to_datalake(df_transformed, site, output_root)
        

In [None]:
import duckdb

query = """
SELECT *
FROM 'data/data_lake/timeseries_iv/site=*/year=*/data.parquet'
WHERE site = '09379900'
  AND parameter = '62614'
  AND datetime BETWEEN TIMESTAMP '2024-01-01' AND TIMESTAMP '2024-01-15'
  AND approval_status = 'A'
ORDER BY datetime
"""

df = duckdb.sql(query).df()


In [None]:
from pathlib import Path

# Multipurpose function to capture gage data
def unit_value_fetch_and_store_by_site_year(
        sites: list, 
        start_date: str, 
        end_date: str, 
        parameter_codes: list,
        service_code: str = "iv", 
        output_root: Path = "data/data_lake"
    ) -> None:
    """
    Fetch unit value (iv) data from NWIS for one or more gaging stations.
    Will also capture daily value optionally.
    Chunk the data by site and year and write to partitioned parquet files
    """
    assert service_code in ("iv", "dv"), "Service code must be 'iv' or 'dv'"
    output_root = Path(output_root)
    
    # Set top level folder based on service type
    if service_code == "iv":
        table_dir = "timeseries_iv"
    else:
        table_dir = "daily_values"

    
    for site in sites:
        print(f"Processing site: {site}")
        all_data = []

        for pcode in parameter_codes:
            try:
                df = nwis.get_record(
                    sites=site,
                    service=service_code,
                    start=start_date,
                    end=end_date,
                    parameterCd=pcode
                )
            except Exception as e:
                print(f"Error fetching data for {site}: {e}")
                continue

            if df.empty:
                print(f"No data returned for {site}")
                continue

            df = df.reset_index()
            time_column = "datetime" if "datetime" in df.columns else "date"
            df['parameter'] = pcode
            df['value'] = df[df.columns.difference(['site_no', time_column, 'parameter'])[0]]
            df = df[['site_no', time_column, 'parameter', 'value']]
            df = df.rename(columns={'site_no': 'site', time_column: 'datetime'})
            df['year'] = df['datetime'].dt.year

            all_data.append(df)

        if not all_data:
            continue
        
        df_all = pd.concat(all_data)
        df_all_sorted = df_all.sort_values(by=['datetime', 'parameter'])
        
        for year, group in df_all_sorted.groupby('year'):
            output_path = output_root / table_dir / f"site={site}" / f"year={year}"
            output_path.mkdir(parents=True, exist_ok=True)
            file = output_path / "data.parquet"

            # ✅ Ensure consistent data types
            group['site'] = group['site'].astype(str)
            
            try:
                # Write with duckdb to ensure consistent types
                # duckdb.sql("CREATE OR REPLACE TABLE temp AS SELECT * FROM group")
                duckdb.register("temp_df", group)
                duckdb.sql(f"COPY temp_df TO '{file}' (FORMAT PARQUET)")
                duckdb.unregister("temp_df")
                print(f"   Wrote {len(group)} records to {file}")
            except Exception as e:
                print(f"Failed to write {file}: {e}")
            

In [None]:
[usgs_sites[0]]

In [None]:
for site in [usgs_sites[0]]:
        print(f"Processing site: {site}")
        all_data = []

        for pcode in ['00060', '00010']:
            try:
                df = nwis.get_record(
                    sites=site,
                    service='iv',
                    start="2024-01-01",
                    end="2025-12-31",
                    parameterCd=pcode
                )
            except Exception as e:
                print(f"Error fetching data for {site}: {e}")
                continue

            if df.empty:
                print(f"No data returned for {site}")
                continue

            df = df.reset_index()
            time_column = "datetime" if "datetime" in df.columns else "date"
            df['parameter'] = pcode
            df['value'] = df[df.columns.difference(['site_no', time_column, 'parameter'])[0]]
            df = df[['site_no', time_column, 'parameter', 'value']]
            df = df.rename(columns={'site_no': 'site', time_column: 'datetime'})
            df['year'] = df['datetime'].dt.year

            all_data.append(df)

        if not all_data:
            continue
        
        df_all = pd.concat(all_data)
        df_all_sorted = df_all.sort_values(by=['datetime', 'parameter'])

        


In [None]:
unit_value_fetch_and_store_by_site_year(sites=[usgs_sites[0]], service_code="iv", start_date="2024-01-01", end_date="2025-12-31", parameter_codes=["00060", "00010"],)

In [None]:
df = duckdb.read_parquet('data/data_lake/timeseries_iv/site=09180000/year=2024/data.parquet')

In [None]:
table = pq.read_table('data/data_lake/timeseries_iv/site=09152500/year=2024/data.parquet')
df = table.to_pandas()

In [None]:
ck = usgs_gage_data(sites=usgs_sites, service="dv", start="2000-01-01", end="2005-01-01", parameterCd="00060")

In [None]:
nwis.get_record(
    sites=usgs_sites[1],
    service="dv",
    start="2025-01-01",
    end="2025-05-14",
    parameterCd='00060'
).reset_index()

In [None]:
for site in usgs_sites:
    print(site)

In [None]:
nwis.get_record(sites=['03339000', '09180000'], service='dv', start='2017-12-31', parameterCd='00060')

In [None]:
with duckdb.connect("waterdata_lakehouse.duckdb") as conn:
    conn.register("clean_sites", clean_sites)
    conn.execute("INSERT OR REPLACE INTO site SELECT * FROM clean_sites")