In [1]:
import numpy as np
import xarray as xr
from netCDF4 import Dataset
from datetime import datetime, timedelta, date
import os

In [2]:
import dask
from distributed import Client

dask.config.set({'distributed.dashboard.link':'https://jupyter.alcf.anl.gov/theta/user/{USER}/proxy/{port}/status'})


client = Client(n_workers=10,threads_per_worker=4)

print(client.dashboard_link)

Perhaps you already have a cluster running?
Hosting the HTTP server on port 43736 instead


https://jupyter.alcf.anl.gov/theta/user/bwallace/proxy/43736/status


In [3]:
rain_ds=xr.open_mfdataset('/eagle/climate_severe/bwallace_scratch/temp/big_rain.zarr',engine='zarr')
pwat_ds=xr.open_mfdataset('/eagle/climate_severe/bwallace_scratch/temp/big_pwat.zarr',engine='zarr')
qg_ds=xr.open_mfdataset('/eagle/climate_severe/bwallace_scratch/temp/big_qg.zarr',engine='zarr')

1. Consolidating metadata in this existing store with zarr.consolidate_metadata().
2. Explicitly setting consolidated=False, to avoid trying to read consolidate metadata, or
3. Explicitly setting consolidated=True, to raise an error in this case instead of falling back to try reading non-consolidated metadata.
  datasets = [open_(p, **open_kwargs) for p in paths]
1. Consolidating metadata in this existing store with zarr.consolidate_metadata().
2. Explicitly setting consolidated=False, to avoid trying to read consolidate metadata, or
3. Explicitly setting consolidated=True, to raise an error in this case instead of falling back to try reading non-consolidated metadata.
  datasets = [open_(p, **open_kwargs) for p in paths]
1. Consolidating metadata in this existing store with zarr.consolidate_metadata().
2. Explicitly setting consolidated=False, to avoid trying to read consolidate metadata, or
3. Explicitly setting consolidated=True, to raise an error in this case instead of falling bac

### Print array information

Each array is little over 100 GB in size and is composed of 270 chunks across the Time dimension. Attempts at reorienting these chunks (chunked in time vs space) yielded little performance gains.

In [4]:
rain_ds

Unnamed: 0,Array,Chunk
Bytes,101.20 GiB,383.82 MiB
Shape,"(21600, 899, 1399)","(80, 899, 1399)"
Dask graph,270 chunks in 2 graph layers,270 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 101.20 GiB 383.82 MiB Shape (21600, 899, 1399) (80, 899, 1399) Dask graph 270 chunks in 2 graph layers Data type float32 numpy.ndarray",1399  899  21600,

Unnamed: 0,Array,Chunk
Bytes,101.20 GiB,383.82 MiB
Shape,"(21600, 899, 1399)","(80, 899, 1399)"
Dask graph,270 chunks in 2 graph layers,270 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray


In [5]:
pwat_ds

Unnamed: 0,Array,Chunk
Bytes,101.20 GiB,383.82 MiB
Shape,"(21600, 899, 1399)","(80, 899, 1399)"
Dask graph,270 chunks in 2 graph layers,270 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 101.20 GiB 383.82 MiB Shape (21600, 899, 1399) (80, 899, 1399) Dask graph 270 chunks in 2 graph layers Data type float32 numpy.ndarray",1399  899  21600,

Unnamed: 0,Array,Chunk
Bytes,101.20 GiB,383.82 MiB
Shape,"(21600, 899, 1399)","(80, 899, 1399)"
Dask graph,270 chunks in 2 graph layers,270 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray


In [6]:
qg_ds

Unnamed: 0,Array,Chunk
Bytes,101.20 GiB,383.82 MiB
Shape,"(21600, 899, 1399)","(80, 899, 1399)"
Dask graph,270 chunks in 2 graph layers,270 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 101.20 GiB 383.82 MiB Shape (21600, 899, 1399) (80, 899, 1399) Dask graph 270 chunks in 2 graph layers Data type float32 numpy.ndarray",1399  899  21600,

Unnamed: 0,Array,Chunk
Bytes,101.20 GiB,383.82 MiB
Shape,"(21600, 899, 1399)","(80, 899, 1399)"
Dask graph,270 chunks in 2 graph layers,270 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray


Create 33 bins for precipitable water & 30 bins for QG-OMEGA.

In [7]:
pwat_bins=np.arange(0,102,3)
qg_bins=np.arange(-150,160,10)

In [8]:
#create an empty xarray to fill in with the quantities as we calculate 

dims=['pwat_bin','qg_bin','south_north','west_east']

empty_arr=xr.DataArray(coords=(range(len(pwat_bins)-1),
                     range(len(qg_bins)-1),
                     range(len(rain_ds.south_north)),
                     range(len(rain_ds.west_east))),
            dims=dims)

empty_ds=xr.Dataset(
                data_vars=dict(
                    total_rain=(dims,empty_arr.data),
                    num_total_events=(dims,empty_arr.data),
                    probability_rain=(dims,empty_arr.data),
                    mean_intensity=(dims,empty_arr.data)
                ),
            )

In [9]:
#loop through each bin
for i in range(len(pwat_bins)):
    for j in range(len(qg_bins)):
        
        #skip the last bin because [i+1],[j+1] will return an error
        if i<len(pwat_bins)-1 and j<len(qg_bins)-1:
            
            now=datetime.now()
            
            #isolate rain based on bins
            rain_sel=rain_ds.AFWA_TOTPRECIP.where((pwat_ds.pwat>pwat_bins[i])&(pwat_ds.pwat<pwat_bins[i+1])&
                                (qg_ds.QG_OMEGA>qg_bins[j])&(qg_ds.QG_OMEGA<qg_bins[j+1]))
            
            #need to do a second .where() to filter low intensity values
            rain_sel_gt_thresh=rain_sel.where(rain_sel>0.5)
            
            #calculate quantities
            total_rain=rain_sel.sum(dim='Time').compute()
            num_total_events=rain_sel.count(dim='Time').compute()
            probability_rain=rain_sel_gt_thresh.count(dim='Time').compute()/num_total_events
            mean_intensity=rain_sel_gt_thresh.mean(dim='Time').compute()
            
            
            empty_ds['total_rain'][i,j]=total_rain
            empty_ds['num_total_events'][i,j]=num_total_events
            empty_ds['probability_rain'][i,j]=probability_rain
            empty_ds['mean_intensity'][i,j]=mean_intensity
            
            print('Timing for loop: i=',i,',j=',j,':',datetime.now()-now)
            
        break
    break
            

Timing for loop: i= 0 ,j= 0 : 0:04:17.761788


This timing isn't actually that bad for masking across a ~100 GB array. Unfortunately, scaling this up across 900 loops gives a total time of about 60 hours. There might be a better way to speed this up?

#### Dask Task Graph

The number of total tasks looks fine here (270 corresponding to the total number of chunks). 

<img src="img/dask_tasks_masking.png" alt="Dask tasks for one operation"/>