# Batch analysis for sampling any 2d or 3d field as a function of cloud controlling factors

Note: we use the numpy implementation of digitize, which has proven easy to use and sufficiently fast.

Aiko Voigt, KIT, 15 Oct 2020

Preamble


In [1]:
import numpy as np
import matplotlib.pyplot as plt
import xarray as xr
import zarr
import shutil

In [2]:
import dask
dask.config.set({"array.slicing.split_large_chunks": True})

from dask.distributed import Client
client = Client()

In [3]:
client

0,1
Client  Scheduler: tcp://127.0.0.1:36789  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 4  Cores: 8  Memory: 22.31 GB


In [4]:
import sys
sys.path.append('/pf/b/b380459/nawdex-hackathon/shared')
#print(sys.path)

import dict_nawdexsims
simdict = dict_nawdexsims.simdictionary()

import nawdexutils as nawut

## Ancillary functions for loading data

In [5]:
def load_griddata(gridres):
    path  = '/work/bb1018/icon_4_hackathon/'
    dict_gridres={'80km': 'R80000m', '40km': 'R40000m', '20km': 'R20000m',
                  '10km': 'R10000m', '5km': 'R5000m', '2km': 'R2500m'}
    fname = path+'/grids/icon-grid_nawdex_78w40e23n80n_'+dict_gridres[gridres]+'.nc'
    return xr.open_dataset(fname)['cell_area'].rename({'cell': 'ncells'}) 

def load_openoceanmask(expid):
    path  = '/work/bb1018/nawdex-hackathon_pp/'
    fname = path+'/openoceanmask/'+expid+'_openoceanmask.nc'
    return xr.open_dataset(fname)['mask_openocean']  

def load_ccf(expid, ccf):
    #path  = '/scratch/b/b380459/icon_4_hackathon/'
    #dict_stream = {'omega': 'rh_omega_DOM01_PL', 't_g': '2d_30min_DOM01_ML'}
    #fname = path+'/'+expid+'/'+expid+'_2016*_'+dict_stream[ccf]+'_0*.nc'
    #ds = ( xr.open_mfdataset(fname,combine='by_coords',parallel=True, 
    #                         engine='h5netcdf', chunks={'time': 1})
    #       [ccf].resample(time="1H").nearest(tolerance="5M") )
    #if ccf == 'omega':
    #    ds = ds.sel(lev=500e2)
    #return ds
    path  = '/work/bb1018/icon_4_hackathon/upper_tropo_stability'
    #path  = '/work/bb1018/nawdex-hackathon_pp/lower_tropo_stability_Elzina'
    fname = path+'/uts_'+expid+'.zarr'
    ds = ( xr.open_zarr(fname))
    if ccf == 'UTS':
        ds['UTS'].attrs['units'] = 'K/km'
    return ds

def load_var(expid, var):
    path  = '/work/bb1018/b380459/NAWDEX/ICON_OUTPUT_NWP'
    dict_stream = {'clct': '2d_30min', 'clch': '2d_30min',
                   'clcm': '2d_30min', 'clcl': '2d_30min',
                   'clc': '3dcloud'}
    fname = path+'/'+expid+'/'+expid+'_2016*_'+dict_stream[var]+'_DOM01_ML_0*.nc'
    return ( xr.open_mfdataset(fname,
                               combine='by_coords',parallel=True, 
                               engine='h5netcdf', chunks={'time': 1})
             [var].resample(time="1H").nearest(tolerance="5M").squeeze() )


def load_var_ddt_temp_rad_fromflux(expid, var):
    # open previously calculated radiative heating rates from zarr store
    zarr_store = '/work/bb1018/nawdex-hackathon_pp/ddttemp_rad-from-fluxes/'+expid+'_ddttemp_rad-from-fluxes_DOM01_ML.zarr'
    return ( xr.open_zarr(zarr_store)
             [var].resample(time="1H").nearest(tolerance="5M").squeeze() )

# Load cloud controlling factor, which is a 2d field (time x ncells)
def prepare_ccf_dataset(expid, resolution, ccf):
    
    # open ocean mask
    da_oom = load_openoceanmask(expid)
    index  = np.where(da_oom==1)[0]
    
    # load data and only keep cells over open ocean
    ds_grid = load_griddata(resolution).isel(ncells=index)
    ds_ccf  = load_ccf(expid, ccf).isel(ncells=index)
    
    return xr.merge([ds_grid, ds_ccf])

# Load variable that will be sampled on cloud controlling factor, this
# can be a 2d field (time x ncells) or a 3d field (time x height x ncells), 
# and it can also be the radiative heating rates provided in zarr stores
def prepare_var_dataset(expid, resolution, var):
    # open ocean mask
    da_oom = load_openoceanmask(expid)
    index  = np.where(da_oom==1)[0]
    
    # load data and only keep cells over open ocean
    ds_grid = load_griddata(resolution).isel(ncells=index)
    # special treament for diagnosed rad heating rates stored in zarr stores
    if var in ['ddt_temp_radlw_fromflux', 'ddt_temp_radlwclr_fromflux',
               'ddt_temp_radsw_fromflux', 'ddt_temp_radswclr_fromflux']:
        ds_var  = load_var_ddt_temp_rad_fromflux(expid, var).isel(ncells=index)
    else:
        ds_var  = load_var(expid, var).isel(ncells=index)
    
    
    return xr.merge([ds_grid, ds_var])

## Ancillary functions for indexing ccf data and for sampling variable on ccf

Function to index grid cells according to which bin in the ccf they belong to

In [6]:
def make_ccf_binindexing(ds_in, nbins, binrange):
    # input: dataset ds_in, assumed to contain the cloud controlling factor (ccf)
    # as well as the surface area of the grid cells (cell_area)
    # output: for each cell and time step, the function return the binning index
    # of the cloud controlling factor
    
    # define surface area weights, which we add to output dataset for later use
    weights = np.broadcast_to(ds_in['cell_area'], ds_in['ccf'].shape)
    # define binning edges based on 
    bins_edges = np.linspace(binrange[0], binrange[1], nbins+1)
    # calculate centre of bins from the edges of the bins
    bins = bins_edges[1:] - 0.5*np.abs(bins_edges[1]-bins_edges[0])
    # for each entry of omega, ccf_indices gives the bin index it belongs to
    ccf_indices  = np.digitize(ds_in['ccf'], bins)
    
    # make an output dataset that contains ccf_indices, weights, bins and bins_edges
    ds_out = ( xr.Dataset(
               {'ccf_indices': (['time','ncells'], ccf_indices),
                'weights': (['time','ncells'], weights),},
                coords={'time': (['time'], ds_in.time),
                        'bins': (['bins'], bins),
                        'bins_edges': (['bins_edges'], bins_edges),
                        'clon': (['ncells'], ds_in.clon),
                        'clat': (['ncells'], ds_in.clat)},
    ) )
    ds_out['time'].attrs = ds_in['time'].attrs
    ds_out['bins'].attrs['units'] = ds_in['ccf'].attrs['units']
    ds_out['bins_edges'].attrs['units'] = ds_in['ccf'].attrs['units']
    
    # return dataset
    return ds_out

Function to sampled variable based on cloud controlling factor

In [7]:
def make_var_ccfsampled_2dvar(ds_ccfindex, ds_var, varname):
    # resample data on ccf bins using numpy for variables with input dimension time x ncells
    
    import warnings 
    warnings.filterwarnings("ignore", category=RuntimeWarning) 
        
    # number of bins
    nbins = ds_ccfindex['bins'].size
    var_sampled = np.zeros(nbins)
    
    # convert to numpy arrays
    indices = ds_ccfindex['ccf_indices'].values
    weights = ds_ccfindex['weights'].values
    var_x_weights =  ds_var[varname].values * weights
    
    for n in range(0, nbins):
        #print(n)
        var_sampled[n] = ( np.nansum(var_x_weights[indices==n]) 
                           / np.nansum(weights[indices==n]) )
        
    return var_sampled

def make_var_ccfsampled_3dvar(ds_ccfindex, ds_var, varname, resolution, sim):
    # resample data on ccf bins using numpy for variables with input dimension time x height x ncells
    # - the 3d-data variable for the 2km simulations can be ~60GB when converted to a numpy array,
    #   leading dask to stop for memory reasons
    # - the solution is to loop over the height levels
    
    #import dask.array as da
    
    # number of bins
    nbins = ds_ccfindex['bins'].size
    var_sampled = np.zeros((nbins, ds_var.height.size))
    
    ### convert to numpy arrays
    indices = ds_ccfindex['ccf_indices'].values
    weights = ds_ccfindex['weights'].values
    #var_x_weights =  ds_var[varname] * weights
    
    #da_oom = load_openoceanmask(sim)
    #index  = np.where(da_oom==1)[0]
    
    #ds_ccfindex.argwhere()
    
    #for n in range(0, nbins):
    #    var_sampled[n,:] = ( da.nansum(var_x_weights[indices==n], axis=0) 
    #                             / da.nansum(weights[indices==n]) )
    
    #for n in range(0, nbins):
    #    var_sampled[n,:] = ( np.nansum(var_x_weights[indices==n], axis=0) 
    #                       / np.nansum(weights[indices==n]) )
        
    for k in range(ds_var.height.size):
        print('level='+str(k))
        var_levelk_x_weights =  ds_var[varname].isel({'height':k}).values * weights
        #var_levelk_x_weights =  ds_var[varname].isel({'height':k}) * weights
        for n in range(0, nbins):
            #print(n)
            #icells = d = load_griddata(resolution).isel(ncells=index)
            var_sampled[n,k] = ( np.nansum(var_levelk_x_weights[indices==n]) 
                                 / np.nansum(weights[indices==n]) )
    
    return var_sampled

def make_var_ccfsampled(ds_ccfindex, ds_var, varname, resolution, sim):
    if ds_var[varname].ndim == 2:    # 2d data
        var_sampled = make_var_ccfsampled_2dvar(ds_ccfindex, ds_var, varname)
    elif ds_var[varname].ndim == 3:  # 3d data
        var_sampled = make_var_ccfsampled_3dvar(ds_ccfindex, ds_var, varname, resolution, sim)  
    return var_sampled

## Batch analysis over simulations, different cloud controlling factors and different variables

Define cloud controlling factor and to-be-sampled variable


## Part I - binning 

First do the indexing of the grid cells according to the cloud controlling factor

In [None]:
#dict_binning={'omega':{'nbins':50, 'binrange': [-1.0, 1.0]},
#              't_g'  :{'nbins':74, 'binrange': [273.0, 310.0]}}

dict_binning={'UTS':{'nbins':24, 'binrange': [0.0, 12.0]}}


for ccf in ['UTS']: #, 't_g']:

    for sim in list(simdict.keys()): 
        gridres = (simdict[sim])['res']
        print('Working on ', ccf, 'of simulation', sim)
        ds_ccf = prepare_ccf_dataset(sim, gridres, ccf)
        ds_ccf = nawut.drop_first_day(ds_ccf)
        ds_ccfindex = make_ccf_binindexing(ds_ccf[[ccf,'cell_area']].rename({ccf:'ccf'}), 
                                           nbins=dict_binning[ccf]['nbins'],
                                           binrange=dict_binning[ccf]['binrange'])
        ds_ccfindex.attrs['ccf'] = ccf
        ds_ccfindex.attrs['simulation'] = sim
        # store to zarr store
        zarr_store = '/work/bb1018/nawdex-hackathon_pp/ccf/'+sim+'_ccf_indexing_'+ccf+'.zarr'
        # remove any zarr_store with same name that might have been created previously
        shutil.rmtree(zarr_store, ignore_errors=True)
        ds_ccfindex.to_zarr(zarr_store)    
        del ds_ccfindex
        

## Part II - sampling 

Now sample variable on cloud conrolling factor using previously calculated grid cell indices

In [8]:
def make_sampling(sim, ccf, var):
    gridres = (simdict[sim])['res']
    ds_var = prepare_var_dataset(sim, gridres, var)
    ds_var = nawut.drop_first_day(ds_var)
    # open previously calculated ccf indexing from zarr store
    zarr_store = '/work/bb1018/nawdex-hackathon_pp/ccf/'+sim+'_ccf_indexing_'+ccf+'.zarr'
    ds_ccfindex = xr.open_zarr(zarr_store)
    var_ccfsampled = make_var_ccfsampled(ds_ccfindex, ds_var, var, gridres, sim)
    # store to netcdf file   
    if var_ccfsampled.ndim==1: # 2-d field
        var_ccfsampled = xr.DataArray(var_ccfsampled, dims=['bins'], coords={'bins': ds_ccfindex.bins})
    if var_ccfsampled.ndim==2: # 3-dfield
        var_ccfsampled = xr.DataArray(var_ccfsampled, dims=['bins', 'height'], 
                                      coords={'bins': ds_ccfindex.bins, 'height': ds_var.height})
    var_ccfsampled.name = var+'_ccfsampled'
    var_ccfsampled.attrs['ccf'] = ccf
    var_ccfsampled.attrs['description'] = var+' sampled on '+ccf+', only over open ocean'
    var_ccfsampled.attrs['simulation'] = sim
    var_ccfsampled.to_netcdf('/work/bb1018/nawdex-hackathon_pp/ccf/'+sim+'_'+var+'_sampled_on_'+ccf+'.nc', mode='w')

Sampling 3D-vars

In [None]:
ccf_list = ['UTS', 'omega', 't_g']

var3d_list = ['clc', 'ddt_temp_radlw_fromflux', 'ddt_temp_radlwclr_fromflux',
               '      ddt_temp_radsw_fromflux', 'ddt_temp_radswclr_fromflux']

for ccf in ccf_list:
    for var in var3d_list:
        print('sampling '+str(var)+' on '+str(ccf))
        for sim in list(simdict.keys()): 
            print(sim)
            make_sampling(sim, ccf, var)

ddt_temp_radlw_fromflux
nawdexnwp-80km-mis-0001
level=0


  var_sampled[n,k] = ( np.nansum(var_levelk_x_weights[indices==n])


level=1


  var_sampled[n,k] = ( np.nansum(var_levelk_x_weights[indices==n])


level=2


  var_sampled[n,k] = ( np.nansum(var_levelk_x_weights[indices==n])


level=3


  var_sampled[n,k] = ( np.nansum(var_levelk_x_weights[indices==n])


level=4


  var_sampled[n,k] = ( np.nansum(var_levelk_x_weights[indices==n])


level=5


  var_sampled[n,k] = ( np.nansum(var_levelk_x_weights[indices==n])


level=6


  var_sampled[n,k] = ( np.nansum(var_levelk_x_weights[indices==n])


level=7


  var_sampled[n,k] = ( np.nansum(var_levelk_x_weights[indices==n])


level=8


  var_sampled[n,k] = ( np.nansum(var_levelk_x_weights[indices==n])


level=9


  var_sampled[n,k] = ( np.nansum(var_levelk_x_weights[indices==n])


level=10


  var_sampled[n,k] = ( np.nansum(var_levelk_x_weights[indices==n])


level=11


  var_sampled[n,k] = ( np.nansum(var_levelk_x_weights[indices==n])


level=12


  var_sampled[n,k] = ( np.nansum(var_levelk_x_weights[indices==n])


level=13


  var_sampled[n,k] = ( np.nansum(var_levelk_x_weights[indices==n])


level=14


  var_sampled[n,k] = ( np.nansum(var_levelk_x_weights[indices==n])
  var_sampled[n,k] = ( np.nansum(var_levelk_x_weights[indices==n])


level=15
level=16


  var_sampled[n,k] = ( np.nansum(var_levelk_x_weights[indices==n])


level=17


  var_sampled[n,k] = ( np.nansum(var_levelk_x_weights[indices==n])


level=18


  var_sampled[n,k] = ( np.nansum(var_levelk_x_weights[indices==n])


level=19


  var_sampled[n,k] = ( np.nansum(var_levelk_x_weights[indices==n])


level=20


  var_sampled[n,k] = ( np.nansum(var_levelk_x_weights[indices==n])


level=21


  var_sampled[n,k] = ( np.nansum(var_levelk_x_weights[indices==n])


level=22


  var_sampled[n,k] = ( np.nansum(var_levelk_x_weights[indices==n])


level=23


  var_sampled[n,k] = ( np.nansum(var_levelk_x_weights[indices==n])


level=24


  var_sampled[n,k] = ( np.nansum(var_levelk_x_weights[indices==n])


level=25


  var_sampled[n,k] = ( np.nansum(var_levelk_x_weights[indices==n])


level=26


  var_sampled[n,k] = ( np.nansum(var_levelk_x_weights[indices==n])


level=27


  var_sampled[n,k] = ( np.nansum(var_levelk_x_weights[indices==n])


level=28


  var_sampled[n,k] = ( np.nansum(var_levelk_x_weights[indices==n])


level=29


  var_sampled[n,k] = ( np.nansum(var_levelk_x_weights[indices==n])


level=30


  var_sampled[n,k] = ( np.nansum(var_levelk_x_weights[indices==n])


level=31


  var_sampled[n,k] = ( np.nansum(var_levelk_x_weights[indices==n])


level=32


  var_sampled[n,k] = ( np.nansum(var_levelk_x_weights[indices==n])


level=33


  var_sampled[n,k] = ( np.nansum(var_levelk_x_weights[indices==n])


level=34


  var_sampled[n,k] = ( np.nansum(var_levelk_x_weights[indices==n])


level=35


  var_sampled[n,k] = ( np.nansum(var_levelk_x_weights[indices==n])


level=36


  var_sampled[n,k] = ( np.nansum(var_levelk_x_weights[indices==n])


level=37


  var_sampled[n,k] = ( np.nansum(var_levelk_x_weights[indices==n])


level=38


  var_sampled[n,k] = ( np.nansum(var_levelk_x_weights[indices==n])


level=39


  var_sampled[n,k] = ( np.nansum(var_levelk_x_weights[indices==n])


level=40


  var_sampled[n,k] = ( np.nansum(var_levelk_x_weights[indices==n])


level=41


  var_sampled[n,k] = ( np.nansum(var_levelk_x_weights[indices==n])


level=42


  var_sampled[n,k] = ( np.nansum(var_levelk_x_weights[indices==n])


level=43


  var_sampled[n,k] = ( np.nansum(var_levelk_x_weights[indices==n])


level=44


  var_sampled[n,k] = ( np.nansum(var_levelk_x_weights[indices==n])


level=45


  var_sampled[n,k] = ( np.nansum(var_levelk_x_weights[indices==n])


level=46


  var_sampled[n,k] = ( np.nansum(var_levelk_x_weights[indices==n])


level=47


  var_sampled[n,k] = ( np.nansum(var_levelk_x_weights[indices==n])


level=48


  var_sampled[n,k] = ( np.nansum(var_levelk_x_weights[indices==n])


level=49


  var_sampled[n,k] = ( np.nansum(var_levelk_x_weights[indices==n])


level=50


  var_sampled[n,k] = ( np.nansum(var_levelk_x_weights[indices==n])


level=51


  var_sampled[n,k] = ( np.nansum(var_levelk_x_weights[indices==n])


level=52


  var_sampled[n,k] = ( np.nansum(var_levelk_x_weights[indices==n])


level=53


  var_sampled[n,k] = ( np.nansum(var_levelk_x_weights[indices==n])


level=54


  var_sampled[n,k] = ( np.nansum(var_levelk_x_weights[indices==n])


level=55


  var_sampled[n,k] = ( np.nansum(var_levelk_x_weights[indices==n])


level=56


  var_sampled[n,k] = ( np.nansum(var_levelk_x_weights[indices==n])


level=57


  var_sampled[n,k] = ( np.nansum(var_levelk_x_weights[indices==n])


level=58


  var_sampled[n,k] = ( np.nansum(var_levelk_x_weights[indices==n])


level=59


  var_sampled[n,k] = ( np.nansum(var_levelk_x_weights[indices==n])


level=60


  var_sampled[n,k] = ( np.nansum(var_levelk_x_weights[indices==n])


level=61


  var_sampled[n,k] = ( np.nansum(var_levelk_x_weights[indices==n])
  var_sampled[n,k] = ( np.nansum(var_levelk_x_weights[indices==n])


level=62
level=63


  var_sampled[n,k] = ( np.nansum(var_levelk_x_weights[indices==n])


level=64


  var_sampled[n,k] = ( np.nansum(var_levelk_x_weights[indices==n])


level=65


  var_sampled[n,k] = ( np.nansum(var_levelk_x_weights[indices==n])


level=66


  var_sampled[n,k] = ( np.nansum(var_levelk_x_weights[indices==n])


level=67
level=68


  var_sampled[n,k] = ( np.nansum(var_levelk_x_weights[indices==n])
  var_sampled[n,k] = ( np.nansum(var_levelk_x_weights[indices==n])


level=69


  var_sampled[n,k] = ( np.nansum(var_levelk_x_weights[indices==n])


level=70


  var_sampled[n,k] = ( np.nansum(var_levelk_x_weights[indices==n])


level=71


  var_sampled[n,k] = ( np.nansum(var_levelk_x_weights[indices==n])


level=72


  var_sampled[n,k] = ( np.nansum(var_levelk_x_weights[indices==n])


level=73


  var_sampled[n,k] = ( np.nansum(var_levelk_x_weights[indices==n])


level=74


  var_sampled[n,k] = ( np.nansum(var_levelk_x_weights[indices==n])


nawdexnwp-40km-mis-0001
level=0
level=1
level=2
level=3
level=4
level=5
level=6
level=7
level=8
level=9
level=10
level=11
level=12
level=13
level=14
level=15
level=16
level=17
level=18
level=19
level=20
level=21
level=22
level=23
level=24
level=25
level=26
level=27
level=28
level=29
level=30
level=31
level=32
level=33
level=34
level=35
level=36
level=37
level=38
level=39
level=40
level=41
level=42
level=43
level=44
level=45
level=46
level=47
level=48
level=49
level=50
level=51
level=52
level=53
level=54
level=55
level=56
level=57
level=58
level=59
level=60
level=61
level=62
level=63
level=64
level=65
level=66
level=67
level=68
level=69
level=70
level=71
level=72
level=73
level=74
nawdexnwp-20km-mis-0001
level=0
level=1
level=2
level=3
level=4
level=5
level=6
level=7
level=8
level=9
level=10
level=11
level=12
level=13
level=14
level=15
level=16
level=17
level=18
level=19
level=20
level=21
level=22
level=23
level=24
level=25
level=26
level=27
level=28
level=29
level=30
level=31
level=32


In [None]:
Sampling 2D-vars

In [None]:
ccf_list = ['UTS', 'omega', 't_g']

var2d_list = ['clct', 'clch', 'clcm', 'clcl']

for ccf in ccf_list:
    for var in var2d_list:
        print('sampling '+str(var)+' on '+str(ccf))
        for sim in list(simdict.keys()): 
            print(sim)
            make_sampling(sim, ccf, var)

## Cleaning up before leaving

In [None]:
client.shutdown()
client.close()