In [1]:
from dask_jobqueue import PBSCluster, SLURMCluster
from distributed import Client
import xarray as xr
import dask.array as da

In [2]:
''' create a cluster object to specify PBS queue requirement, project should be set as your project account '''
''' processes means number of core will be used. I set processes = 1 to process large files, you can set to 
    at most 10 to process small files '''
cluster=PBSCluster(
            cores=36,
            memory='109GB',
            processes=1,
            local_directory='$TMPDIR',
            interface='ib0',
            queue='regular',
            walltime='00:30:00',
            project='*****',
)

In [5]:
''' for large file, you can set manual scaling workers = 2 or more '''
''' workers here mean number of nodes will be used '''
cluster

VBox(children=(HTML(value='<h2>PBSCluster</h2>'), HBox(children=(HTML(value='\n<div>\n  <style scoped>\n    .d…

In [6]:
client=Client(cluster)
client

0,1
Client  Scheduler: tcp://10.148.10.15:40993  Dashboard: https://jupyterhub.ucar.edu/ch/user/haiyingx/proxy/8787/status,Cluster  Workers: 2  Cores: 72  Memory: 134.16 GB


In [7]:
''' open the Falko dataset with chunk setting as nCells = 4194305 '''
filename='/glade/scratch/fjudt/projects/ongoing/dyamond/dyamond_1/runs/3.75km/history.2016-08-01_00.00.00.nc'
chunk_dict = {'nCells': 4194305}
ds=xr.open_dataset(filename,chunks=chunk_dict)

In [9]:
def convert_zarr(ds, output_filename, output_dir, chunk_dict, error_bound=None):

    import zarr
    from numcodecs.zfpy import _zfpy, ZFPY
    
    ''' manually set chunk size for zarr file '''
    ''' you should change to a chunk size that meet your specific dimensions'''
    ds1 = ds.chunk(chunks=chunk_dict)
    if error_bound is not None:
        ''' create a compressor with compressor zfp in mode a, absolute tolerance should be equal to error_bound '''
        compressor = ZFPY(mode=_zfpy.mode_fixed_accuracy,tolerance=error_bound)
        
        for k, v in ds1.items():
            
            ''' check for time variant variables to be compressed '''
            if ds1[k].dtype=='float32' and len(ds1[k].dims) >= 2:
                ''' set the variable k to the compressor zfp'''
                ds1[k].encoding['compressor']=compressor
    else:
        ''' default compressor is zlib at level 5 '''
        error_bound = 'zlib.5'
    ''' create a zarr file name '''
    filename=output_dir+'/'+output_filename+('.')+str(error_bound)+'.zarr'

    ''' write out to zarr format '''
    ds1.to_zarr(filename, mode='w', consolidated=True)  
    return filename

In [10]:
''' read in compressed zarr file and write to netcdf file with zlib=5 '''

def write_to_netcdf(zarr_name):
    
    ds=xr.open_zarr(zarr_name)

    comp = dict(zlib=True, complevel=5)
    encoding = {var: comp for var in ds.data_vars if ds[var].dtype=='float32' and len(ds[var].dims)>=2}
    output_filename=zarr_name[:-5]+'.nc'
    ds.to_netcdf(output_filename,encoding=encoding)


In [11]:
%%time  
from os.path import basename

output_dir='/glade/scratch/haiyingx'
error_bound = 0.1
''' if passing argument error_bound, zfp compressor will be used, if not, zlib level 1 will be used '''
zarr_name=convert_zarr(ds, basename(filename)[:-3], output_dir, chunk_dict, error_bound)
zarr_name

CPU times: user 3.18 s, sys: 561 ms, total: 3.74 s
Wall time: 2min 28s


'/glade/scratch/haiyingx/history.2016-08-01_00.00.00.0.1.zarr'

In [None]:
''' This procedure is very slow for this input file'''
#write_to_netcdf(zarr_name)

In [12]:
%%time
''' open the compressed data file to validate '''
ds2 = xr.open_zarr(zarr_name)
''' read variable w data '''
decomp_arr = ds2.w.data
orig_arr = ds.w.data
''' compare if decomp_arr and orig_arr are element-wise equal within a tolerance '''
''' I set atol=0.01 to check on compressed to 0.1 data, we will get false '''
''' if we set atol=0.1, we will get back true '''
validate = da.allclose(decomp_arr,orig_arr,rtol=0.0, atol=0.01)
''' if two arrays are equal, allclose will return true '''
validate.compute()

CPU times: user 587 ms, sys: 86.2 ms, total: 673 ms
Wall time: 30.6 s


False