In [15]:
!pip install s3fs --quiet
!pip install zarr --quiet
!pip install openpyxl --quiet

In [2]:
import os
import fsspec
import pandas as pd
import xarray as xr
import dask
from dask.distributed import Client
from dask.distributed import progress

In [3]:
# use a try accept loop so we only instantiate the client
# if it doesn't already exist.
try:
    print("")
except:    
    # The client should be customized to your workstation resources.
    client = Client(n_workers=12, memory_limit='2GB') # per worker





In [4]:
fs = fsspec.filesystem('s3', anon=True)
fs.glob('noaa-nwm-retrospective-2-1-zarr-pds/')

['noaa-nwm-retrospective-2-1-zarr-pds/chrtout.zarr',
 'noaa-nwm-retrospective-2-1-zarr-pds/gwout.zarr',
 'noaa-nwm-retrospective-2-1-zarr-pds/index.html',
 'noaa-nwm-retrospective-2-1-zarr-pds/lakeout.zarr',
 'noaa-nwm-retrospective-2-1-zarr-pds/ldasout.zarr',
 'noaa-nwm-retrospective-2-1-zarr-pds/precip.zarr',
 'noaa-nwm-retrospective-2-1-zarr-pds/rtout.zarr']

In [5]:
%%time

file = fs.glob('noaa-nwm-retrospective-2-1-zarr-pds/chrtout.zarr')
ds = xr.open_dataset(fs.get_mapper(file[0]), engine='zarr', backend_kwargs={'consolidated': True})

CPU times: user 2.4 s, sys: 179 ms, total: 2.58 s
Wall time: 7.9 s


In [7]:
ds.chunks

Frozen({})

In [8]:
%%time
# compare with the situation where only time:1 is used. Also, 
# explore what is the optimal combination.

dim_chunk_sizes = {'feature_id': 1, 'time': len(ds.time)}
ds = ds.chunk(chunks=dim_chunk_sizes)


CPU times: user 8.14 s, sys: 616 ms, total: 8.76 s
Wall time: 8.77 s


In [9]:
ds.chunks

Frozen({'feature_id': (1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,

In [10]:
%%time 
# takes longer compared to situation where chunking is uesd for time=1

year=2010
start_time = f'{year}-01-01 00:00'
end_time = f'{year}-01-10 00:00'

# isolate the desired time period of our data
ds_subset = ds.sortby('time').sel(time=slice(start_time, end_time))

print(f'The dataset contains {len(ds_subset.time)} timesteps')

The dataset contains 217 timesteps
CPU times: user 14 s, sys: 1.11 s, total: 15.1 s
Wall time: 15.1 s


In [11]:
ds_subset

Unnamed: 0,Array,Chunk
Bytes,10.59 MiB,4 B
Shape,"(2776738,)","(1,)"
Dask graph,2776738 chunks in 2 graph layers,2776738 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 10.59 MiB 4 B Shape (2776738,) (1,) Dask graph 2776738 chunks in 2 graph layers Data type float32 numpy.ndarray",2776738  1,

Unnamed: 0,Array,Chunk
Bytes,10.59 MiB,4 B
Shape,"(2776738,)","(1,)"
Dask graph,2776738 chunks in 2 graph layers,2776738 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,39.72 MiB,15 B
Shape,"(2776738,)","(1,)"
Dask graph,2776738 chunks in 2 graph layers,2776738 chunks in 2 graph layers
Data type,|S15 numpy.ndarray,|S15 numpy.ndarray
"Array Chunk Bytes 39.72 MiB 15 B Shape (2776738,) (1,) Dask graph 2776738 chunks in 2 graph layers Data type |S15 numpy.ndarray",2776738  1,

Unnamed: 0,Array,Chunk
Bytes,39.72 MiB,15 B
Shape,"(2776738,)","(1,)"
Dask graph,2776738 chunks in 2 graph layers,2776738 chunks in 2 graph layers
Data type,|S15 numpy.ndarray,|S15 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,10.59 MiB,4 B
Shape,"(2776738,)","(1,)"
Dask graph,2776738 chunks in 2 graph layers,2776738 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 10.59 MiB 4 B Shape (2776738,) (1,) Dask graph 2776738 chunks in 2 graph layers Data type float32 numpy.ndarray",2776738  1,

Unnamed: 0,Array,Chunk
Bytes,10.59 MiB,4 B
Shape,"(2776738,)","(1,)"
Dask graph,2776738 chunks in 2 graph layers,2776738 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,10.59 MiB,4 B
Shape,"(2776738,)","(1,)"
Dask graph,2776738 chunks in 2 graph layers,2776738 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 10.59 MiB 4 B Shape (2776738,) (1,) Dask graph 2776738 chunks in 2 graph layers Data type float32 numpy.ndarray",2776738  1,

Unnamed: 0,Array,Chunk
Bytes,10.59 MiB,4 B
Shape,"(2776738,)","(1,)"
Dask graph,2776738 chunks in 2 graph layers,2776738 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,10.59 MiB,4 B
Shape,"(2776738,)","(1,)"
Dask graph,2776738 chunks in 2 graph layers,2776738 chunks in 2 graph layers
Data type,int32 numpy.ndarray,int32 numpy.ndarray
"Array Chunk Bytes 10.59 MiB 4 B Shape (2776738,) (1,) Dask graph 2776738 chunks in 2 graph layers Data type int32 numpy.ndarray",2776738  1,

Unnamed: 0,Array,Chunk
Bytes,10.59 MiB,4 B
Shape,"(2776738,)","(1,)"
Dask graph,2776738 chunks in 2 graph layers,2776738 chunks in 2 graph layers
Data type,int32 numpy.ndarray,int32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,4.49 GiB,1.70 kiB
Shape,"(217, 2776738)","(217, 1)"
Dask graph,2776738 chunks in 3 graph layers,2776738 chunks in 3 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 4.49 GiB 1.70 kiB Shape (217, 2776738) (217, 1) Dask graph 2776738 chunks in 3 graph layers Data type float64 numpy.ndarray",2776738  217,

Unnamed: 0,Array,Chunk
Bytes,4.49 GiB,1.70 kiB
Shape,"(217, 2776738)","(217, 1)"
Dask graph,2776738 chunks in 3 graph layers,2776738 chunks in 3 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,4.49 GiB,1.70 kiB
Shape,"(217, 2776738)","(217, 1)"
Dask graph,2776738 chunks in 3 graph layers,2776738 chunks in 3 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 4.49 GiB 1.70 kiB Shape (217, 2776738) (217, 1) Dask graph 2776738 chunks in 3 graph layers Data type float64 numpy.ndarray",2776738  217,

Unnamed: 0,Array,Chunk
Bytes,4.49 GiB,1.70 kiB
Shape,"(217, 2776738)","(217, 1)"
Dask graph,2776738 chunks in 3 graph layers,2776738 chunks in 3 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray


In [17]:
usgs_stations = pd.read_excel('./NWM_USGS_Natural_Flow.xlsx')
usgs_stations['USGS_ID'] = usgs_stations['USGS_ID'].apply(lambda x: '{:0>8}'.format(x))
usgs_stations.columns

Index(['OBJECTID', 'NWM_ID', 'USGS_ID', 'Hydro_code', 'Feature Type',
       'Drainage Area', 'Lal', 'Long', 'Altitude', 'Name',
       ...
       'November Mean Monthly Flow from Gage Adjustment in Cubic Feet per Second',
       'November Mean Monthly Velocity from Gage Adjustment in Feet per Second',
       'December Mean Monthly Flow from Runoff in Cubic Feet per Second',
       'December Mean Monthly Velocity for Mean Monthly Flow in Feet per Second',
       'December Mean Monthly Flow with Reference Gage Regression in Cubic Feet per Second',
       'December Mean Monthly Velocity for Mean Monthly Flow with Reference Gage Regression in Feet per Second',
       'December Mean Monthly Flow from Gage Adjustment in Cubic Feet per Second',
       'December Mean Monthly Velocity from Gage Adjustment in Feet per Second',
       'Pop-up Title', 'Pop-up Subtitle'],
      dtype='object', length=112)

In [22]:
usgs_sites=['10168000', '10170490', '10170500', '10171000', '10172200', '10172630']

# find nwm feature_id associated with usgs_id
selected_values = [x for x in usgs_stations['USGS_ID'] if x in usgs_sites]
selected_values

['10171000', '10168000']

In [69]:
# read the assiciated NWM id
Feature_id_list = list(usgs_stations[usgs_stations['USGS_ID'].isin(selected_values)]['NWM_ID'])
Feature_id_list

[10390290, 10389562]

In [64]:
ds_subset.gage_id.sel(feature_id=10389562).values.astype(int) #gage id

array(10168000)

In [73]:
# retrieve data for the nwm id that is associated with the gage
ds_subset = ds_subset.sel(feature_id=Feature_id_list)

In [74]:
ds_subset.chunks

Frozen({'feature_id': (1, 1), 'time': (217,)})

In [None]:
%%time 
ds_subset = ds_subset.compute()

In [None]:
df = ds_subset['streamflow'].sel(feature_id=24001093).to_pandas().to_frame()
df.rename(columns={0: f"Discharge {ds.streamflow.units}"}, inplace=True)
df.to_csv(os.path.join('./', str(24001093) + '.csv'))

In [None]:
df.plot()