# Save NWM to Parquet using Bags in parallel 

In [1]:
import json
from os.path import basename, join
import os 
import shutil
from os import makedirs

import dask.bag as db
import dask.dataframe as dd
from dask.distributed import Client
import xarray as xr
import fsspec
from rechunker import rechunk
import s3fs
import fsspec

%matplotlib inline

def get_json(uri):
    with fsspec.open(uri) as fd:
        return json.load(fd)

In [2]:
# Connect to existing cluster using cluster.name

# This constant needs to be set!
cluster_name = ''
gateway = Gateway()
cluster = gateway.connect(cluster_name)
client = cluster.get_client()

In [7]:
# one year subset of data
zarr_uri = 's3://azavea-noaa-hydro-data/esip-experiments/datasets/reanalysis-chrtout/zarr/07-06-2022b/nwm-subset.zarr/'
out_root_uri = 's3://azavea-noaa-hydro-data/esip-experiments/datasets/reanalysis-chrtout/parquet/lf/07-11-2022a'

zarr_ds = xr.open_zarr(zarr_uri)
zarr_ds

Unnamed: 0,Array,Chunk
Bytes,477.56 kiB,3.91 kiB
Shape,"(122256,)","(1000,)"
Count,124 Tasks,123 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 477.56 kiB 3.91 kiB Shape (122256,) (1000,) Count 124 Tasks 123 Chunks Type float32 numpy.ndarray",122256  1,

Unnamed: 0,Array,Chunk
Bytes,477.56 kiB,3.91 kiB
Shape,"(122256,)","(1000,)"
Count,124 Tasks,123 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,1.75 MiB,14.65 kiB
Shape,"(122256,)","(1000,)"
Count,124 Tasks,123 Chunks
Type,|S15,numpy.ndarray
"Array Chunk Bytes 1.75 MiB 14.65 kiB Shape (122256,) (1000,) Count 124 Tasks 123 Chunks Type |S15 numpy.ndarray",122256  1,

Unnamed: 0,Array,Chunk
Bytes,1.75 MiB,14.65 kiB
Shape,"(122256,)","(1000,)"
Count,124 Tasks,123 Chunks
Type,|S15,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,477.56 kiB,3.91 kiB
Shape,"(122256,)","(1000,)"
Count,124 Tasks,123 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 477.56 kiB 3.91 kiB Shape (122256,) (1000,) Count 124 Tasks 123 Chunks Type float32 numpy.ndarray",122256  1,

Unnamed: 0,Array,Chunk
Bytes,477.56 kiB,3.91 kiB
Shape,"(122256,)","(1000,)"
Count,124 Tasks,123 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,477.56 kiB,3.91 kiB
Shape,"(122256,)","(1000,)"
Count,124 Tasks,123 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 477.56 kiB 3.91 kiB Shape (122256,) (1000,) Count 124 Tasks 123 Chunks Type float32 numpy.ndarray",122256  1,

Unnamed: 0,Array,Chunk
Bytes,477.56 kiB,3.91 kiB
Shape,"(122256,)","(1000,)"
Count,124 Tasks,123 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,477.56 kiB,3.91 kiB
Shape,"(122256,)","(1000,)"
Count,124 Tasks,123 Chunks
Type,int32,numpy.ndarray
"Array Chunk Bytes 477.56 kiB 3.91 kiB Shape (122256,) (1000,) Count 124 Tasks 123 Chunks Type int32 numpy.ndarray",122256  1,

Unnamed: 0,Array,Chunk
Bytes,477.56 kiB,3.91 kiB
Shape,"(122256,)","(1000,)"
Count,124 Tasks,123 Chunks
Type,int32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,8.00 GiB,7.63 MiB
Shape,"(8784, 122256)","(1000, 1000)"
Count,1108 Tasks,1107 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 8.00 GiB 7.63 MiB Shape (8784, 122256) (1000, 1000) Count 1108 Tasks 1107 Chunks Type float64 numpy.ndarray",122256  8784,

Unnamed: 0,Array,Chunk
Bytes,8.00 GiB,7.63 MiB
Shape,"(8784, 122256)","(1000, 1000)"
Count,1108 Tasks,1107 Chunks
Type,float64,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,8.00 GiB,7.63 MiB
Shape,"(8784, 122256)","(1000, 1000)"
Count,1108 Tasks,1107 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 8.00 GiB 7.63 MiB Shape (8784, 122256) (1000, 1000) Count 1108 Tasks 1107 Chunks Type float64 numpy.ndarray",122256  8784,

Unnamed: 0,Array,Chunk
Bytes,8.00 GiB,7.63 MiB
Shape,"(8784, 122256)","(1000, 1000)"
Count,1108 Tasks,1107 Chunks
Type,float64,numpy.ndarray


In [8]:
def save_parquet_segment(bounds):
    start_ind, end_ind = bounds
    sub_ds = zarr_ds.sel(feature_id=zarr_ds.feature_id[start_ind:end_ind])
    # Using `to_dask_dataframe` works, but seems to get bottlenecked running almost everything on a single worker.
    # Using `to_dataframe` runs much faster, but fails when mapping > 32 items in the cells below.
    df = sub_ds.to_dask_dataframe()
    df.to_parquet(join(out_root_uri, f'nwm-{start_ind}-{end_ind}.parquet'), engine='pyarrow')
    # TODO: delete stuff explicitly

In [None]:
features_per_store = 2000
feature_bounds = [
    (start_ind, min(start_ind + features_per_store, zarr_ds.feature_id.shape[0]))
    for start_ind in range(0, zarr_ds.feature_id.shape[0], features_per_store)]

In [10]:
b = db.from_sequence(feature_bounds, npartitions=len(feature_bounds))
b = b.map(save_parquet_segment)
results_bag = b.compute()

In [22]:
ddf = dd.read_parquet(out_root_uri)

In [23]:
ddf

Unnamed: 0_level_0,feature_id,time,elevation,gage_id,latitude,longitude,order,crs,streamflow,velocity,dir0
npartitions=1107,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1
,int32,datetime64[ns],float32,object,float32,float32,int32,object,float64,float64,category[known]
,...,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...
