## NWM Retrospective Streamflow Visualization

This example demonstrates how to collect and visualize NWM retrospective predictions using Python

NWM Retrospective data is located at:

https://registry.opendata.aws/nwm-archive/




**Requirements**

This notebook was developed using the following software and operating system versions.

OS: MacOS Ventura 13.0.1  
Python: 3.10.0
Zarr: 2.13.2  
NetCDF4: 1.6.1  
xarray: 0.17.0  
fsspec: 0.8.7  
dask: 2021.3.0  
hvplot: 0.7.1  
holoviews: 1.14.2  
pynhd: 0.10.1
nest-asyncio: 1.5.6


The following commands should help you set up these dependencies
```
$ conda create -n nwm-env python=3.10.0

$ conda install -y -c pyviz -c conda-forge pynhd folium s3fs hvplot dask distributed zarr

```

In [3]:
!pip install pynhd

Collecting pynhd
  Using cached pynhd-0.16.3-py3-none-any.whl.metadata (25 kB)
Collecting async-retriever<0.17,>=0.16 (from pynhd)
  Using cached async_retriever-0.16.1-py3-none-any.whl.metadata (16 kB)
Collecting pygeoogc<0.17,>=0.16 (from pynhd)
  Using cached pygeoogc-0.16.3-py3-none-any.whl.metadata (18 kB)
Collecting pygeoutils<0.17,>=0.16 (from pynhd)
  Using cached pygeoutils-0.16.3-py3-none-any.whl.metadata (12 kB)
Collecting owslib>=0.27.2 (from pygeoogc<0.17,>=0.16->pynhd)
  Using cached OWSLib-0.31.0-py2.py3-none-any.whl.metadata (6.7 kB)
Collecting requests-cache>=0.9.6 (from pygeoogc<0.17,>=0.16->pynhd)
  Using cached requests_cache-1.2.0-py3-none-any.whl.metadata (9.9 kB)
Collecting aiodns (from aiohttp[speedups]>=3.8.3->async-retriever<0.17,>=0.16->pynhd)
  Using cached aiodns-3.2.0-py3-none-any.whl.metadata (4.0 kB)
Collecting cattrs>=22.2 (from requests-cache>=0.9.6->pygeoogc<0.17,>=0.16->pynhd)
  Using cached cattrs-23.2.3-py3-none-any.whl.metadata (10 kB)
Collecting 

In [1]:
import os
import pandas
import xarray
import s3fs
import hvplot.xarray

from dask.distributed import Client
client = Client()
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: /user/castronova/proxy/8787/status,

0,1
Dashboard: /user/castronova/proxy/8787/status,Workers: 4
Total threads: 4,Total memory: 15.00 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:33327,Workers: 4
Dashboard: /user/castronova/proxy/8787/status,Total threads: 4
Started: Just now,Total memory: 15.00 GiB

0,1
Comm: tcp://127.0.0.1:38965,Total threads: 1
Dashboard: /user/castronova/proxy/44513/status,Memory: 3.75 GiB
Nanny: tcp://127.0.0.1:35635,
Local directory: /tmp/dask-scratch-space/worker-x8o6qdwo,Local directory: /tmp/dask-scratch-space/worker-x8o6qdwo

0,1
Comm: tcp://127.0.0.1:44975,Total threads: 1
Dashboard: /user/castronova/proxy/43275/status,Memory: 3.75 GiB
Nanny: tcp://127.0.0.1:44887,
Local directory: /tmp/dask-scratch-space/worker-34lqz3fv,Local directory: /tmp/dask-scratch-space/worker-34lqz3fv

0,1
Comm: tcp://127.0.0.1:35007,Total threads: 1
Dashboard: /user/castronova/proxy/40315/status,Memory: 3.75 GiB
Nanny: tcp://127.0.0.1:43343,
Local directory: /tmp/dask-scratch-space/worker-j0d6rfvo,Local directory: /tmp/dask-scratch-space/worker-j0d6rfvo

0,1
Comm: tcp://127.0.0.1:35113,Total threads: 1
Dashboard: /user/castronova/proxy/37479/status,Memory: 3.75 GiB
Nanny: tcp://127.0.0.1:40413,
Local directory: /tmp/dask-scratch-space/worker-fhvuboox,Local directory: /tmp/dask-scratch-space/worker-fhvuboox


## Create Map for Watershed for USGS Station ID

The following code uses `pynhd` and `folium` to create an interactive map of a watershed from a USGS gauge ID.

In [8]:
import folium
from folium.features import DivIcon
from folium.plugins import MousePosition
from pynhd import NLDI, WaterData, NHDPlusHR, GeoConnex
import pynhd

Define the watershed outlet using USGS station ID. Create a map object that we'll add layers to.

In [6]:
nldi = NLDI()
station_id = "04289000"

Collect watershed and reach vectors using the `pynhd` module

In [13]:
nldi = NLDI()

print('Collecting basins...', end='')
basin = nldi.get_basins(station_id)
print('done')

print('Collecting NHD...', end='')
mr = WaterData("nhdflowline_network")
nhd = mr.bybox(basin.geometry[0].bounds)
print('done')

print('Collecting gauge locations...', end='')
gages = pynhd.GeoConnex(item="gages")
gages = gages.bygeometry(basin.geometry.iloc[0]).to_crs(epsg='4326')
print('done')

Collecting basins...done
Collecting NHD...

  val = getattr(super(), mtd)(*args, **kwargs)


done
Collecting gauge locations...done


Create and interactive map to display all of these data.

In [21]:
# create map
m = folium.Map(tiles='OpenStreetMap', zoom_start=11)
_ = MousePosition().add_to(m)

# add data to the map
print('Building map...', end='')

# watershed boundary
watershed_json = basin.to_crs(epsg='4326').to_json()
w = folium.features.GeoJson(data=watershed_json, style_function=lambda x: {'color':'red', 'fillColor':'#00000000'})
m.add_child(w)

# river vectors
nhd['comid'] = nhd['comid'].astype(str)
nhd_json = nhd.to_crs(epsg='4326').to_json()
w = folium.features.GeoJson(data=nhd_json,
                            popup=folium.features.GeoJsonPopup(fields=['comid',
                                                                       'gnis_id',
                                                                       'gnis_name']),
                            highlight_function=lambda feature: {"fillcolor": "green", "color": "green"},
                           )
m.add_child(w)


# gauge points
for idx, row in gages.iterrows():
    html = f"""
           <html>
            <b>Site Name: </b>{row['name']}<br>
            <b>Site ID: </b>{row['provider_id']}<br>
            <b>Service: </b>NWISDV<br>
           </html>
           """
    popup = folium.Popup(folium.Html(html, script=True), max_width=2650)
    folium.Marker(location=[row.geometry.y, row.geometry.x], 
                  icon=folium.Icon(color='blue'),
                  popup=popup,
                 ).add_to(m)
print('done')

# Set the map extent (bounds) to the extent of the sites
m.fit_bounds(m.get_bounds())
m

Building map...done


## Visualize NWM Retrospective Streamflow

NWM streamflow will be collected from the public datastore located on AWS. We'll be using the Zarr library to efficiently extract data from this store.

In [22]:
s3_path = 's3://noaa-nwm-retro-v2-zarr-pds'

In [23]:
# Connect to S3
s3 = s3fs.S3FileSystem(anon=True)
store = s3fs.S3Map(root=s3_path, s3=s3, check=False)

In [24]:
%%time

# load the dataset
ds = xarray.open_zarr(store=store, consolidated=True)

CPU times: user 3.89 s, sys: 356 ms, total: 4.24 s
Wall time: 9.11 s


In [9]:
# preview the dataset
ds

Unnamed: 0,Array,Chunk
Bytes,10.41 MiB,10.41 MiB
Shape,"(2729077,)","(2729077,)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 10.41 MiB 10.41 MiB Shape (2729077,) (2729077,) Dask graph 1 chunks in 2 graph layers Data type float32 numpy.ndarray",2729077  1,

Unnamed: 0,Array,Chunk
Bytes,10.41 MiB,10.41 MiB
Shape,"(2729077,)","(2729077,)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,10.41 MiB,10.41 MiB
Shape,"(2729077,)","(2729077,)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 10.41 MiB 10.41 MiB Shape (2729077,) (2729077,) Dask graph 1 chunks in 2 graph layers Data type float32 numpy.ndarray",2729077  1,

Unnamed: 0,Array,Chunk
Bytes,10.41 MiB,10.41 MiB
Shape,"(2729077,)","(2729077,)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,2.26 TiB,76.90 MiB
Shape,"(227904, 2729077)","(672, 30000)"
Dask graph,30940 chunks in 2 graph layers,30940 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 2.26 TiB 76.90 MiB Shape (227904, 2729077) (672, 30000) Dask graph 30940 chunks in 2 graph layers Data type float32 numpy.ndarray",2729077  227904,

Unnamed: 0,Array,Chunk
Bytes,2.26 TiB,76.90 MiB
Shape,"(227904, 2729077)","(672, 30000)"
Dask graph,30940 chunks in 2 graph layers,30940 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,2.26 TiB,76.90 MiB
Shape,"(227904, 2729077)","(672, 30000)"
Dask graph,30940 chunks in 2 graph layers,30940 chunks in 2 graph layers
Data type,int32 numpy.ndarray,int32 numpy.ndarray
"Array Chunk Bytes 2.26 TiB 76.90 MiB Shape (227904, 2729077) (672, 30000) Dask graph 30940 chunks in 2 graph layers Data type int32 numpy.ndarray",2729077  227904,

Unnamed: 0,Array,Chunk
Bytes,2.26 TiB,76.90 MiB
Shape,"(227904, 2729077)","(672, 30000)"
Dask graph,30940 chunks in 2 graph layers,30940 chunks in 2 graph layers
Data type,int32 numpy.ndarray,int32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,4.53 TiB,153.81 MiB
Shape,"(227904, 2729077)","(672, 30000)"
Dask graph,30940 chunks in 2 graph layers,30940 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 4.53 TiB 153.81 MiB Shape (227904, 2729077) (672, 30000) Dask graph 30940 chunks in 2 graph layers Data type float64 numpy.ndarray",2729077  227904,

Unnamed: 0,Array,Chunk
Bytes,4.53 TiB,153.81 MiB
Shape,"(227904, 2729077)","(672, 30000)"
Dask graph,30940 chunks in 2 graph layers,30940 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,4.53 TiB,153.81 MiB
Shape,"(227904, 2729077)","(672, 30000)"
Dask graph,30940 chunks in 2 graph layers,30940 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 4.53 TiB 153.81 MiB Shape (227904, 2729077) (672, 30000) Dask graph 30940 chunks in 2 graph layers Data type float64 numpy.ndarray",2729077  227904,

Unnamed: 0,Array,Chunk
Bytes,4.53 TiB,153.81 MiB
Shape,"(227904, 2729077)","(672, 30000)"
Dask graph,30940 chunks in 2 graph layers,30940 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,4.53 TiB,153.81 MiB
Shape,"(227904, 2729077)","(672, 30000)"
Dask graph,30940 chunks in 2 graph layers,30940 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 4.53 TiB 153.81 MiB Shape (227904, 2729077) (672, 30000) Dask graph 30940 chunks in 2 graph layers Data type float64 numpy.ndarray",2729077  227904,

Unnamed: 0,Array,Chunk
Bytes,4.53 TiB,153.81 MiB
Shape,"(227904, 2729077)","(672, 30000)"
Dask graph,30940 chunks in 2 graph layers,30940 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,4.53 TiB,153.81 MiB
Shape,"(227904, 2729077)","(672, 30000)"
Dask graph,30940 chunks in 2 graph layers,30940 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 4.53 TiB 153.81 MiB Shape (227904, 2729077) (672, 30000) Dask graph 30940 chunks in 2 graph layers Data type float64 numpy.ndarray",2729077  227904,

Unnamed: 0,Array,Chunk
Bytes,4.53 TiB,153.81 MiB
Shape,"(227904, 2729077)","(672, 30000)"
Dask graph,30940 chunks in 2 graph layers,30940 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,4.53 TiB,153.81 MiB
Shape,"(227904, 2729077)","(672, 30000)"
Dask graph,30940 chunks in 2 graph layers,30940 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 4.53 TiB 153.81 MiB Shape (227904, 2729077) (672, 30000) Dask graph 30940 chunks in 2 graph layers Data type float64 numpy.ndarray",2729077  227904,

Unnamed: 0,Array,Chunk
Bytes,4.53 TiB,153.81 MiB
Shape,"(227904, 2729077)","(672, 30000)"
Dask graph,30940 chunks in 2 graph layers,30940 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,4.53 TiB,153.81 MiB
Shape,"(227904, 2729077)","(672, 30000)"
Dask graph,30940 chunks in 2 graph layers,30940 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 4.53 TiB 153.81 MiB Shape (227904, 2729077) (672, 30000) Dask graph 30940 chunks in 2 graph layers Data type float64 numpy.ndarray",2729077  227904,

Unnamed: 0,Array,Chunk
Bytes,4.53 TiB,153.81 MiB
Shape,"(227904, 2729077)","(672, 30000)"
Dask graph,30940 chunks in 2 graph layers,30940 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray


In [10]:
# slice all data using a specific reach identifier and time range

reach_id = 4576576  # COMID from map above
timerange = slice('2007-01-01', '2008-01-01')
dat = ds.sel(feature_id=reach_id,
             time=timerange).streamflow.persist() 

In [11]:
# resample and plot streamflow. 
# This step takes a bit longer because it's actually returning the data
q_daily_ave = dat.resample(time='1d').mean()
q_daily_ave.hvplot()