# Problem: Loading 4D variables and writing output to zarr


In [None]:
pip install xmip

## Import packages

In [1]:
import numpy as np
import xarray as xr
import dask
import s3fs
import zarr
import warnings
import xmip.preprocessing as xmip
warnings.filterwarnings('ignore')

## Create a new Dask cluster with the Dask Gateway

In [2]:
from dask_gateway import Gateway
gateway = Gateway()

In [3]:
##A line of trick to clean your dask cluster before you start your computation
from dask.distributed import Client
clusters=gateway.list_clusters()
print(clusters )
for cluster in clusters :
    cluster= gateway.connect(cluster.name)
    print(cluster)
    client = Client(cluster)
    client.close()
    cluster.shutdown()

[ClusterReport<name=daskhub.194e6c0ce04c4be8aad52ac325f7c5a7, status=RUNNING>]
GatewayCluster<daskhub.194e6c0ce04c4be8aad52ac325f7c5a7, status=running>


In [3]:
cluster = gateway.new_cluster(worker_memory=8, worker_cores=2)

cluster.scale(8)
cluster

VBox(children=(HTML(value='<h2>GatewayCluster</h2>'), HBox(children=(HTML(value='\n<div>\n<style scoped>\n    …

## Get a client from the Dask Gateway Cluster

As stated above, creating a Dask `Client` is mandatory in order to perform following Daks computations on your Dask Cluster.

In [4]:
from distributed import Client

if cluster:
    client = Client(cluster) # create a dask Gateway cluster
else:
    client = Client()   # create a local dask cluster on the machine.
client

0,1
Connection method: Cluster object,Cluster type: dask_gateway.GatewayCluster
Dashboard: /jupyterhub/services/dask-gateway/clusters/daskhub.3a415940807a41b0be2838ab4c0c15ff/status,


## Install xmip to dask client ( you'll need to do this each time you start dask, if your dask client use xmip)

In [5]:
from distributed.diagnostics.plugin import PipInstall
extra_packages=['xmip']
plugin=PipInstall(extra_packages,restart=True)


In [6]:
client.register_worker_plugin(plugin)

{}

## Open dictionary of OMIP2 simulations
With file lists taken from the ESGF Search Catalog with the following search requirements:
1. On the native grid
2. Have the varaibles:`umo`, `vmo`, `so`, `thetao`, `zos`, `mlotst`, `siconc`, `deptho`, `areacello`
3. At monthly time steps
4. The last 61 years of the simulation

In [7]:
# Load in file of model names and fnames
model_fnames_dict = np.load("models.npy", allow_pickle=True).item()


## Function for writing zarr file to bucket

In [8]:
def write_dataset_to_zarr(ds,sim_name):
    # set path on bucket
    path='WAFFLES/OMIP2/'
    s3_prefix = "s3://" + path
    print(s3_prefix)
    
    # get storage keys
    access_key = !aws configure get aws_access_key_id
    access_key = access_key[0]
    secret_key = !aws configure get aws_secret_access_key
    secret_key = secret_key[0]
    
    # set storage target
    client_kwargs = {'endpoint_url': 'https://object-store.cloud.muni.cz'}
    target = s3fs.S3FileSystem(anon=False,client_kwargs=client_kwargs)
    
    # make file name for variable in simulation
    zarr_file_name = sim_name
    uri = f"{s3_prefix}/{zarr_file_name}"
    
    # get store argument for zarr
    store = zarr.storage.FSStore(uri,client_kwargs=client_kwargs,
                                 key=access_key, secret=secret_key)
    
    # write variable to zarr
    %time ds.to_zarr(store=store,mode='w',consolidated=True)
    
    return

## Function for reading from zarr bucket

In [9]:
def read_dataset_from_zarr(sim_name):
    # set path on bucket
    path='WAFFLES/OMIP2/'
    s3_prefix = "s3://" + path
    print(s3_prefix)
    
    # get storage keys
    access_key = !aws configure get aws_access_key_id
    access_key = access_key[0]
    secret_key = !aws configure get aws_secret_access_key
    secret_key = secret_key[0]
    
    # set storage target
    client_kwargs={'endpoint_url': 'https://object-store.cloud.muni.cz'}
    target = s3fs.S3FileSystem(anon=False,client_kwargs=client_kwargs)
    
    # file name for simulation
    zarr_file_name = sim_name
    uri = f"{s3_prefix}/{zarr_file_name}"
    
    # get store argument for zarr
    store = zarr.storage.FSStore(uri,client_kwargs=client_kwargs,
                                 key=access_key, secret=secret_key)
    
    # read variable from zarr
    ds=xr.open_zarr(store=store)
    
    return ds

## Using kerchunk for loading CMIP6 files.
### Function for transforming each http path of NetCDF file into Kerchunk, and save them in S3 bucket.  


In [29]:
#setting for accessing s3

# get storage keys
access_key = !aws configure get aws_access_key_id
access_key = access_key[0]
secret_key = !aws configure get aws_secret_access_key
secret_key = secret_key[0]

# set storage target
client_kwargs = {'endpoint_url': 'https://object-store.cloud.muni.cz'}

import dask
@dask.delayed
def path_to_kerchunk(httppath,modelname,client_kwargs,access_key,secret_key):
    import kerchunk.hdf
    import fsspec
    import json
    with fsspec.open(httppath) as inf:
        info = kerchunk.hdf.SingleHdf5ToZarr(inf, httppath, inline_threshold=100).translate()
    jsonname=httppath.rsplit('/')[-1].rsplit('.nc')[0]+'.json'
    path='WAFFLES/kerchunk/'+modelname
    path='tmp/kerchunk/'+modelname
    s3_prefix = "s3://" + path
    jsonfile = f"{s3_prefix}/{jsonname}"
    print(jsonfile)
    target = s3fs.S3FileSystem(anon=False,client_kwargs=client_kwargs,key=access_key,secret=secret_key)
    with target.open(jsonfile, mode='w') as f:
        json.dump(info,f)
    return jsonfile



### Verifying how many models that Waffle need from CMPI6 server

In [109]:
for a in model_fnames_dict.keys():
    print(len(model_fnames_dict.get(a)),a)

368 EC-Earth3
44 NorESM2-LM
8 MRI-ESM2-0
8 CMCC-CM2-SR5
107 CNRM-CM6-1
8 FGOALS-f3-L
28 CMCC-CM2-HR4
34 MIROC6
8 TaiESM1-TIMCOM2
107 CNRM-CM6-1-HR
260 ACCESS-OM2-025
260 ACCESS-OM2
8 TaiESM1-TIMCOM


### Run the transformation using dask but, one model by one model since the server is not stable...
See the working notebook 'Create_kerchunk_fromlist.ipynb

In [None]:
modelnames=[a for a in model_fnames_dict.keys()]
d={}
for modelname in [modelnames[10]]:
    print(modelname)
    fpaths=model_fnames_dict.get(modelname)
    httppaths=[fpath.replace('dodsC', 'fileServer') for fpath in fpaths]
    jsons=[ path_to_kerchunk(httppath,modelname,client_kwargs,access_key,secret_key) for httppath in httppaths]
    %time
    ok=dask.compute(*jsons)
    d[modelname]=ok
d

### List the transformed kerchunk files

In [12]:
#setting for accessing s3

# get storage keys
access_key = !aws configure get aws_access_key_id
access_key = access_key[0]
secret_key = !aws configure get aws_secret_access_key
secret_key = secret_key[0]

# set storage target
client_kwargs = {'endpoint_url': 'https://object-store.cloud.muni.cz'}

def dict_kerchunk(client_kwargs,access_key,secret_key):
    import kerchunk.hdf
    import fsspec

    path='WAFFLES/kerchunk/'
    #path='tmp/kerchunk/'


    target = s3fs.S3FileSystem(anon=False,client_kwargs=client_kwargs,key=access_key,secret=secret_key)
    modelnames=target.ls(path)
    modelnames=[name.rsplit('/')[-1] for name in modelnames]
    d={}
    for name in modelnames :
        d[name]=target.ls(path+name)
        d[name]=['s3://'+n for n in d[name]]
#        print(path+name)
    return d
d=dict_kerchunk(client_kwargs,access_key,secret_key)


for a in model_fnames_dict.keys():
    computed=len(d.get(a)) if (a in d) else 'None'
    print(a, len(model_fnames_dict.get(a)), 'transformed done' ,computed)

EC-Earth3 368 transformed done 368
NorESM2-LM 44 transformed done 44
MRI-ESM2-0 8 transformed done None
CMCC-CM2-SR5 8 transformed done 8
CNRM-CM6-1 107 transformed done 107
FGOALS-f3-L 8 transformed done None
CMCC-CM2-HR4 28 transformed done 28
MIROC6 34 transformed done 1
TaiESM1-TIMCOM2 8 transformed done 8
CNRM-CM6-1-HR 107 transformed done 107
ACCESS-OM2-025 260 transformed done 260
ACCESS-OM2 260 transformed done 260
TaiESM1-TIMCOM 8 transformed done 8


In [17]:
target_options={
            "anon":False,
            "client_kwargs":client_kwargs,
            "key":access_key, 
            "secret":secret_key}


In [13]:
model = 'CMCC-CM2-SR5'
variables = ['umo', 'vmo', 'so']#, 'thetao', 'zos', 'mlotst', 'siconc', 'deptho', 'areacello']
model_fnames_dict=d


In [14]:
#def ds_from_esgf(model,model_fnames_dict,variables,flg_onefile=False,testing=False,rechunk=False,target_options):    


## Generate filename from model_fnames_dict
fnames_i = model_fnames_dict[model]
#print(fnames_i.replace('dodsC','fileServer'))

print(fnames_i)
dss = {}
print('Going through the variables...')

['s3://WAFFLES/kerchunk/CMCC-CM2-SR5/areacello_Ofx_CMCC-CM2-SR5_omip1_r1i1p1f1_gn.json', 's3://WAFFLES/kerchunk/CMCC-CM2-SR5/deptho_Ofx_CMCC-CM2-SR5_omip1_r1i1p1f1_gn.json', 's3://WAFFLES/kerchunk/CMCC-CM2-SR5/mlotst_Omon_CMCC-CM2-SR5_omip2_r1i1p1f1_gn_165301-201812.json', 's3://WAFFLES/kerchunk/CMCC-CM2-SR5/so_Omon_CMCC-CM2-SR5_omip2_r1i1p1f1_gn_195801-201812.json', 's3://WAFFLES/kerchunk/CMCC-CM2-SR5/thetao_Omon_CMCC-CM2-SR5_omip2_r1i1p1f1_gn_195801-201812.json', 's3://WAFFLES/kerchunk/CMCC-CM2-SR5/umo_Omon_CMCC-CM2-SR5_omip2_r1i1p1f1_gn_195801-201812.json', 's3://WAFFLES/kerchunk/CMCC-CM2-SR5/vmo_Omon_CMCC-CM2-SR5_omip2_r1i1p1f1_gn_195801-201812.json', 's3://WAFFLES/kerchunk/CMCC-CM2-SR5/zos_Omon_CMCC-CM2-SR5_omip2_r1i1p1f1_gn_165301-201812.json']
Going through the variables...


In [19]:
def model_preproc_new(ds):
    return (
        ds.drop_vars(["bnds", "vertex"], errors="ignore")
        #fix naming 
        .pipe(xmip.rename_cmip6)
        # reindex y if lat is decreasing
        .pipe(reindex_lat)
        # promote empty dims to actual coordinates
        .pipe(xmip.promote_empty_dims)
        # demote coordinates from data_variables
        .pipe(xmip.correct_coordinates)
        # broadcast lon/lat
        .pipe(xmip.broadcast_lonlat)
        # shift all lons to consistent 0-360
        .pipe(xmip.correct_lon)
        # fix the units
        .pipe(xmip.correct_units)
        # rename the `bounds` according to their style (bound or vertex)
        .pipe(xmip.parse_lon_lat_bounds)
        # sort verticies in a consistent manner
        .pipe(xmip.sort_vertex_order)
        # convert vertex into bounds and vice versa, so both are available
        .pipe(xmip.maybe_convert_bounds_to_vertex)
        .pipe(xmip.maybe_convert_vertex_to_bounds)
        .pipe(xmip.fix_metadata)
        .drop_vars(["bnds", "vertex"], errors="ignore")
        )
def reindex_lat(ds):
    # check if lat is decreasing
    if ds.lat.isel(x=0,y=0) > 0:
        ds = ds.reindex(y=list(reversed(ds.y))).assign_coords(y=ds.y)
    
    return ds 

In [20]:
raw_names={
    v: [f for f in fnames_i if v+'_' in f]
    for v in variables
}

names={key: value for key,value in raw_names.items() if value }
names

dss={var: [xr.open_dataset(
                    "reference://", engine="zarr",
                    backend_kwargs={
                        "storage_options": {
                            "fo":f,
                            "target_options":target_options
                        },
                        "consolidated": False
                    } ,chunks={'lev':-1}                    
                ).pipe(model_preproc_new)
           for f in urls] for var, urls in names.items()}
dss_concat = {
    var: xr.concat(values,dim='time',coords='minimal',data_vars='minimal',compat='override') 
    for var, values in dss.items()}

    
chunks={'time':-1,'lev':-1,'x':25,'y':25}

dsnow = (
    xr.merge([ds.chunk({name: chunksize for name, chunksize in chunks.items() if name in ds.dims}) for ds in dss_concat.values()], compat='override')
    .assign_coords(lat= lambda ds : ds.lat.compute(),lon= lambda ds : ds.lon.compute())
    .where(lambda ds: ds['lat']>=50 ,drop=True)
    )
dsnow
    

Unnamed: 0,Array,Chunk
Bytes,800 B,800 B
Shape,"(50, 2)","(50, 2)"
Count,4 Graph Layers,1 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 800 B 800 B Shape (50, 2) (50, 2) Count 4 Graph Layers 1 Chunks Type float64 numpy.ndarray",2  50,

Unnamed: 0,Array,Chunk
Bytes,800 B,800 B
Shape,"(50, 2)","(50, 2)"
Count,4 Graph Layers,1 Chunks
Type,float64,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,11.44 kiB,11.44 kiB
Shape,"(732, 2)","(732, 2)"
Count,4 Graph Layers,1 Chunks
Type,object,numpy.ndarray
"Array Chunk Bytes 11.44 kiB 11.44 kiB Shape (732, 2) (732, 2) Count 4 Graph Layers 1 Chunks Type object numpy.ndarray",2  732,

Unnamed: 0,Array,Chunk
Bytes,11.44 kiB,11.44 kiB
Shape,"(732, 2)","(732, 2)"
Count,4 Graph Layers,1 Chunks
Type,object,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,807.75 kiB,9.77 kiB
Shape,"(72, 359, 4)","(25, 25, 2)"
Count,6 Graph Layers,180 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 807.75 kiB 9.77 kiB Shape (72, 359, 4) (25, 25, 2) Count 6 Graph Layers 180 Chunks Type float64 numpy.ndarray",4  359  72,

Unnamed: 0,Array,Chunk
Bytes,807.75 kiB,9.77 kiB
Shape,"(72, 359, 4)","(25, 25, 2)"
Count,6 Graph Layers,180 Chunks
Type,float64,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,807.75 kiB,9.77 kiB
Shape,"(72, 359, 4)","(25, 25, 2)"
Count,9 Graph Layers,180 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 807.75 kiB 9.77 kiB Shape (72, 359, 4) (25, 25, 2) Count 9 Graph Layers 180 Chunks Type float64 numpy.ndarray",4  359  72,

Unnamed: 0,Array,Chunk
Bytes,807.75 kiB,9.77 kiB
Shape,"(72, 359, 4)","(25, 25, 2)"
Count,9 Graph Layers,180 Chunks
Type,float64,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,403.88 kiB,4.88 kiB
Shape,"(2, 72, 359)","(1, 25, 25)"
Count,18 Graph Layers,120 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 403.88 kiB 4.88 kiB Shape (2, 72, 359) (1, 25, 25) Count 18 Graph Layers 120 Chunks Type float64 numpy.ndarray",359  72  2,

Unnamed: 0,Array,Chunk
Bytes,403.88 kiB,4.88 kiB
Shape,"(2, 72, 359)","(1, 25, 25)"
Count,18 Graph Layers,120 Chunks
Type,float64,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,403.88 kiB,4.88 kiB
Shape,"(2, 72, 359)","(1, 25, 25)"
Count,15 Graph Layers,120 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 403.88 kiB 4.88 kiB Shape (2, 72, 359) (1, 25, 25) Count 15 Graph Layers 120 Chunks Type float64 numpy.ndarray",359  72  2,

Unnamed: 0,Array,Chunk
Bytes,403.88 kiB,4.88 kiB
Shape,"(2, 72, 359)","(1, 25, 25)"
Count,15 Graph Layers,120 Chunks
Type,float64,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,3.52 GiB,87.26 MiB
Shape,"(732, 50, 72, 359)","(732, 50, 25, 25)"
Count,9 Graph Layers,60 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 3.52 GiB 87.26 MiB Shape (732, 50, 72, 359) (732, 50, 25, 25) Count 9 Graph Layers 60 Chunks Type float32 numpy.ndarray",732  1  359  72  50,

Unnamed: 0,Array,Chunk
Bytes,3.52 GiB,87.26 MiB
Shape,"(732, 50, 72, 359)","(732, 50, 25, 25)"
Count,9 Graph Layers,60 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,3.52 GiB,87.26 MiB
Shape,"(732, 50, 72, 359)","(732, 50, 25, 25)"
Count,9 Graph Layers,60 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 3.52 GiB 87.26 MiB Shape (732, 50, 72, 359) (732, 50, 25, 25) Count 9 Graph Layers 60 Chunks Type float32 numpy.ndarray",732  1  359  72  50,

Unnamed: 0,Array,Chunk
Bytes,3.52 GiB,87.26 MiB
Shape,"(732, 50, 72, 359)","(732, 50, 25, 25)"
Count,9 Graph Layers,60 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,3.52 GiB,87.26 MiB
Shape,"(732, 50, 72, 359)","(732, 50, 25, 25)"
Count,9 Graph Layers,60 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 3.52 GiB 87.26 MiB Shape (732, 50, 72, 359) (732, 50, 25, 25) Count 9 Graph Layers 60 Chunks Type float32 numpy.ndarray",732  1  359  72  50,

Unnamed: 0,Array,Chunk
Bytes,3.52 GiB,87.26 MiB
Shape,"(732, 50, 72, 359)","(732, 50, 25, 25)"
Count,9 Graph Layers,60 Chunks
Type,float32,numpy.ndarray


In [None]:
write_dataset_to_zarr(dsnow,'tinatest1')

s3://WAFFLES/OMIP2/


## Demonstrate loading error
This error is really inconsistent. It fails on different files when I run this and only fails when I'm trying to read in multiple variables at the same time. Also, doesn't always fail...

In [None]:
sim_name1 = 'CMCC-CM2-SR5'
#variables = ['vmo','thetao','so','umo','zos','mlotst','areacello','deptho']
variables = ['vmo']

In [None]:
error1#.persist()

In [None]:
error1.chunk('auto')

In [None]:
write_dataset_to_zarr(error1,'tinatest1')

## Demonstrate writting error
Basically every model has this problem and this happens when trying to save one of the 4D variables ('vmo','thetao','so','umo') and only happens for some of the models when trying to save 3D variables ('zos','mlotst'). This error basically never happens when saving 2D variables ('areacello','deptho').

In [None]:
sim_name2 = 'TaiESM1-TIMCOM2'
variables = ['thetao']

In [None]:
%%time
error2 = ds_from_esgf(sim_name2,model_fnames_dict,variables)

In [None]:
error2

In [None]:
write_dataset_to_zarr(error2,sim_name2)

In [None]:
test = read_dataset_from_zarr(sim_name2)

In [None]:
test

In [None]:
test.thetao.isel(time=0,lev=0).plot(robust=True)