# Explore the National Water Model Reanalysis
Use [Xarray](http://xarray.pydata.org/en/stable/), [Dask](https://dask.org) and [hvPlot](https://hvplot.holoviz.org) from the [HoloViz](https://holoviz.org) tool suite to explore the National Water Modle Reanalysis Version 2.  We read from a cloud-optimized [Zarr](https://zarr.readthedocs.io/en/stable/) dataset that is part of the [AWS Open Data Program](https://aws.amazon.com/opendata/), and we use a Dask cluster to parallelize computation and reading of data chunks.  

In [1]:
import xarray as xr
import fsspec
import numpy as np

In [2]:
import hvplot.pandas
import hvplot.xarray
import geoviews as gv
from holoviews.operation.datashader import rasterize
import cartopy.crs as ccrs
import dask

### Start a Dask cluster
This is not required, but speeds up computations. Once can start a local cluster by just doing:
```
from dask.distributed import Client
client = Client()
```
but there are [many other ways to set up Dask clusters](https://docs.dask.org/en/latest/setup.html) that can scale larger than this. 

Since we used [Qhub](https://www.quansight.com/post/announcing-qhub) to install JupyterHub with a Dask Gateway running on Kubernetes, we can start a Dask cluster (with a specified environment and worker profile), scale it, and connect to it thusly:

In [3]:
from dask.distributed import Client
client = Client()

Open Zarr datasets in Xarray using a mapper from fsspec.  We use `anon=True` for free-access public buckets like the AWS Open Data Program, and `requester_pays=True` for requester-pays public buckets. 

In [4]:
url = 's3://noaa-nwm-retro-v2-zarr-pds'

In [5]:
%%time
ds = xr.open_zarr(fsspec.get_mapper(url, anon=True), consolidated=True, chunks={})

CPU times: user 1.28 s, sys: 113 ms, total: 1.39 s
Wall time: 3.16 s


In [6]:
var='streamflow'

In [7]:
ds[var]

Unnamed: 0,Array,Chunk
Bytes,4.53 TiB,153.81 MiB
Shape,"(227904, 2729077)","(672, 30000)"
Dask graph,30940 chunks in 2 graph layers,30940 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 4.53 TiB 153.81 MiB Shape (227904, 2729077) (672, 30000) Dask graph 30940 chunks in 2 graph layers Data type float64 numpy.ndarray",2729077  227904,

Unnamed: 0,Array,Chunk
Bytes,4.53 TiB,153.81 MiB
Shape,"(227904, 2729077)","(672, 30000)"
Dask graph,30940 chunks in 2 graph layers,30940 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,10.41 MiB,10.41 MiB
Shape,"(2729077,)","(2729077,)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 10.41 MiB 10.41 MiB Shape (2729077,) (2729077,) Dask graph 1 chunks in 2 graph layers Data type float32 numpy.ndarray",2729077  1,

Unnamed: 0,Array,Chunk
Bytes,10.41 MiB,10.41 MiB
Shape,"(2729077,)","(2729077,)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,10.41 MiB,10.41 MiB
Shape,"(2729077,)","(2729077,)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 10.41 MiB 10.41 MiB Shape (2729077,) (2729077,) Dask graph 1 chunks in 2 graph layers Data type float32 numpy.ndarray",2729077  1,

Unnamed: 0,Array,Chunk
Bytes,10.41 MiB,10.41 MiB
Shape,"(2729077,)","(2729077,)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray


In [8]:
ds.coords

Coordinates:
  * feature_id  (feature_id) int32 11MB 101 179 181 ... 1180001803 1180001804
    latitude    (feature_id) float32 11MB dask.array<chunksize=(2729077,), meta=np.ndarray>
    longitude   (feature_id) float32 11MB dask.array<chunksize=(2729077,), meta=np.ndarray>
  * time        (time) datetime64[ns] 2MB 1993-01-01 ... 2018-12-31T23:00:00

In [9]:
print(f'Variable size: {ds[var].nbytes/1e12:.1f} TB')

Variable size: 5.0 TB


### Find the site with the largest streamflow on June 1, 2017

In [10]:
%%time
# imax = ds[var].sel(time='2017-06-01 00:00:00').argmax().values

CPU times: user 3 µs, sys: 0 ns, total: 3 µs
Wall time: 5.25 µs


In [11]:
%%time
selected_data = ds[var].sel(
    time=slice('2017-01-01 00:00:00', '2017-01-02 00:00:00'),  # Replace with your time range
)
print(f'selected_data Variable size: {selected_data.nbytes} B')

selected_data Variable size: 545815400 B
CPU times: user 11.3 ms, sys: 33 µs, total: 11.3 ms
Wall time: 10.3 ms


### Time how long it take to write netcdf files when dataset loaded in memory

In [13]:
%%time
# Load the data into memory
in_memory_data = selected_data.compute()

# Create a new dataset with the loaded data
new_ds = xr.Dataset({'streamflow': in_memory_data}, attrs=ds.attrs)

import cProfile
profiler = cProfile.Profile()
profiler.enable()
new_ds.to_netcdf('./from_memory_subset.nc', engine='netcdf4')
profiler.disable()
profiler.dump_stats('./profile_results.prof')

CPU times: user 395 ms, sys: 1.11 s, total: 1.51 s
Wall time: 8.48 s


### Time how long it take to write netcdf files without loading to memory

In [12]:
%%time
new_ds = xr.Dataset({'streamflow': selected_data}, attrs=ds.attrs)

import cProfile
profiler = cProfile.Profile()
profiler.enable()
new_ds.to_netcdf('./selected_data_subset.nc', engine='netcdf4')
profiler.disable()
profiler.dump_stats('./profile_results.prof')

CPU times: user 1.53 s, sys: 243 ms, total: 1.78 s
Wall time: 29.5 s


In [17]:
%%time
from datetime import datetime, timedelta

start_date = datetime(2017, 1, 1)
end_date = datetime(2017, 2, 1)
# Loop through each day of January
while start_date <= end_date:
    # Define the start and end dates for the current day
    day_start = start_date
    day_end = start_date + timedelta(days=1)

    # Extract data for the current day
    selected_data = ds[var].sel(time=slice(day_start, day_end))

    # Load the data into memory
    selected_data = selected_data.compute()

    # Create a new dataset with the extracted data
    new_ds = xr.Dataset({'streamflow': selected_data}, attrs=ds.attrs)

    # Generate the file name for the current day
    file_name = f"./selected_data_{start_date.strftime('%Y-%m-%d')}.nc"

    # Save the new dataset to a netCDF file
    new_ds.to_netcdf(file_name, engine='netcdf4')

    # Move to the next day
    start_date += timedelta(days=1)



CPU times: user 40.3 s, sys: 7.68 s, total: 48 s
Wall time: 14min 11s


Let's plot the whole hindcast time series at that location

In [None]:
%%time
ds[var][:,imax].hvplot(grid=True)

### Compute mean discharge during April 2010 on all rivers

In [None]:
streamflow_April_2010 = ds[var].sel(time=slice('2010-04-01 00:00','2010-04-30 23:00'))

In [None]:
print(f'Variable size: {streamflow_April_2010.nbytes/1e9:.1f} GB')

In [None]:
streamflow_April_2010.to_netcdf('./streamflow_April_2010_subset.nc', engine='netcdf4')

In [None]:
%%time
var_mean = streamflow_April_2010.mean(dim='time').compute()

### Visualize the mean discharge with hvplot
Convert Xarray to Pandas dataframe so we can use hvplot.points for visualization

In [None]:
df = var_mean.to_pandas().to_frame()

The dataframe just has streamflow, so add longitude and latitude as columns

In [None]:
df = df.assign(latitude=ds['latitude'])
df = df.assign(longitude=ds['longitude'])
df.rename(columns={0: "transport"}, inplace=True)

In [None]:
p = df.hvplot.points('longitude', 'latitude', crs=ccrs.PlateCarree(),
                     c='transport', colorbar=True, size=14)

We don't want to plot all the 2.7M points individually, so aggregate to 0.02 degree resolution and rasterize with datashader.  Use a log scale for visualization since there is a large dynamic range in streamflow. 

In [None]:
g = rasterize(p, aggregator='mean', x_sampling=0.02, y_sampling=0.02, width=500).opts(tools=['hover'], 
                aspect='equal', logz=True, cmap='viridis', clim=(1e-2, np.nan))

Plot the rasterized streamflow data on an OpenStreetMap tile service basemap

In [None]:
g * gv.tile_sources.OSM

In [None]:
# client.close(); cluster.shutdown()