In [None]:
import xarray as xr
import pandas as pd
import numpy as np
import dask
from dask.distributed import LocalCluster, Client
import fsspec
import warnings
warnings.filterwarnings('ignore', category=RuntimeWarning)

In [None]:
# 1) Dask cluster
cluster = LocalCluster(
    memory_limit='32GB',
    n_workers=4,
    dashboard_address=":8787"
)
client = Client(cluster)

In [None]:
# 2) Open dataset (NWM v2.1 from S3) and subset by COMIDs and broad time range
url = 's3://noaa-nwm-retrospective-3-0-pds/CONUS/zarr'
fs = fsspec.filesystem('s3', anon=True)
outputlist = fs.ls(url)
ds = xr.open_dataset(fs.get_mapper(outputlist[1]), engine='zarr', backend_kwargs={'consolidated':True})
# 3) Read TX IDs
ids_df = pd.read_csv('TX_ids.csv', low_memory=False)
ids_list = ids_df['COMID'].tolist()
idx = ds.feature_id.isin(ids_list)

with dask.config.set(**{'array.slicing.split_large_chunks': False}):
    ds_sub = ds[['streamflow']].sel(feature_id=idx)

ds_sub = ds_sub.chunk({"feature_id": 1000})

In [None]:
# 3) Dailyflow resample
ds_daily = ds_sub.resample(time="D").interpolate("linear")

# Convert to float32 to reduce memory usage
ds_daily = ds_daily.astype(np.float32)
ds_daily = ds_daily.persist()

In [None]:
# 4) Loop over each year, pivot, and write CSV
years = ds_daily.time.dt.year
min_year = int(years.min().compute())
max_year = int(years.max().compute())

for year in range(min_year, max_year + 1):
    ds_year = ds_daily.sel(time=str(year))

    # Convert to pivoted DataFrame: time as rows, feature_id as columns
    da_reset = ds_year["streamflow"].reset_index(["time", "feature_id"])
    df_year = da_reset.to_pandas()

    # Make time the index, feature_id the columns
    df_year.index = da_reset["time"].values
    df_year.columns = da_reset["feature_id"].values

    # Write to CSV
    csv_out = f"dailyflow_{year}.csv"
    df_year.to_csv(csv_out)
    print(f"Wrote {csv_out}")