In [None]:
import dask
import geopandas as gpd
import xarray as xr
from dask.distributed import Client

dask.config.set({"temporary_directory": r"/home/jovyan/shared/pan_arctic_data"})

In [None]:
radr_pt_shp = f"~/shared/pan_arctic_data/merit_hydro_basins/radr_outlets_merged.shp"
radr_pt_gdf = gpd.read_file(radr_pt_shp)
cat_ids = radr_pt_gdf["OutCOMID"].unique()
cat_ids.sort()

## 1. Without Dask

Takes about 17 mins.

In [None]:
# %%time

# radr_path = r"/home/jovyan/shared/pan_arctic_data/discharge/RADR_v1.0.0.nc"
# radr_monthly_path = r"/home/jovyan/shared/pan_arctic_data/discharge/RADR_v1_monthly_no_dask.nc"

# ds = xr.open_dataset(radr_path)
# ds = ds.sel(reach=cat_ids)
# ds = ds.resample(time="M").mean()
# ds.to_netcdf(radr_monthly_path)
# ds

In [None]:
#ds.close()

## 2. With Dask

Takes about 6 mins with 10 workers.

In [None]:
# Note that 'memory_limit' is per worker, not for the whole cluster
# i.e. n_workers=2 and memory_limit='6GB' => 12 GB total
client = Client(n_workers=10, threads_per_worker=1, processes=True, memory_limit="1.5GB")
client

In [None]:
%%time

radr_path = r"/home/jovyan/shared/pan_arctic_data/discharge/RADR_v1.0.0.nc"
radr_monthly_path = r"/home/jovyan/shared/pan_arctic_data/discharge/RADR_v1_monthly_dask.nc"
ds = xr.open_dataset(radr_path)

# Subset first without Dask, then chunk
ds = ds.sel(reach=cat_ids)
ds = ds.chunk(chunks='auto')
ds = ds.resample(time="M").mean()
ds.to_netcdf(radr_monthly_path)
ds

In [None]:
ds.close()