In [36]:
# kerchunk ECCO v4r4 netcdfs stored on podaac s3 
# to single jsons and multi-zarr jsons

# -- Ian Fenty
# -- 2024-05-29, 5-30, 6-10, 6-11, 6-12

# executed mainly on a g4dn.8xlarge with 32 cpus and 128 Gb ram

# performance ranges from about 0.1-1 second per granule
# depending on granule size (number of fields x number of levels)

# total processing time of all ECCO v4r4 granules is about 2 days


# Fastest Way to Crunch Through the Granules
############################################
# BEST: dask with "ncpu" workers as "processes" each with 1 thread.
# much slower: is using 8 workers each with 4 threads 
# much slower: is using "threadpool" multiprocessing

# Why?
# I think the reason is that the kerchunk calculations are
# "embarassingly parallel" and involves zero memory sharing. 
# We have 32 cores available that can work in parallel 
# to read one netcdf file from s3 and make the kerchunk json
# When tell dask to have 8 workers each with 4 threads,
# I think they often have to wait for the release of the
# python 'global interpreter lock' GIL.
# See https://dask.discourse.group/t/understanding-how-dask-is-executing-processes-vs-threads/666/4
# 
# Also see: https://stackoverflow.com/questions/51099685/best-practices-in-setting-number-of-dask-workers
# "Typically one decides between these choices based on the workload. 
# The difference here is due to Python's Global Interpreter Lock, 
# which limits parallelism for some kinds of data. If you are working
# mostly with Numpy, Pandas, Scikit-Learn, or other numerical programming 
# libraries in Python then you don't need to worry about the GIL, and you 
# probably want to prefer few processes with many threads each. This helps 
# because it allows data to move freely between your cores because it all 
# lives in the same process. However, if you're doing mostly Pure Python 
# programming, like dealing with text data, dictionaries/lists/sets, and 
# doing most of your computation in tight Python for loops then you'll 
# want to prefer having many processes with few threads each. 
# This incurs extra communication costs, but lets you bypass the GIL."

# Also: https://docs.dask.org/en/latest/best-practices.html
#If you’re doing mostly numeric work with Numpy, pandas, Scikit-learn,
# Numba, and other libraries that release the GIL, then use mostly threads.
# If you’re doing work on text data or Python collections like lists
# and dicts then use mostly processes.
#If you’re on larger machines with a high thread count (greater than 10),
# then you should probably split things up into at least a few processes 
# regardless. Python can be highly productive with 10 threads per process with
# numeric work, but not 50 threads.



import boto3
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import wait
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import as_completed

import dask
from dask.distributed import Client, LocalCluster
from datetime import datetime
import fsspec
from glob import glob
from http.cookiejar import CookieJar
from kerchunk.hdf import SingleHdf5ToZarr 
from kerchunk.combine import MultiZarrToZarr
import logging
import matplotlib.pyplot as plt
from netrc import netrc
import numpy as np
import os
from os.path import basename, isfile, isdir, join, expanduser
import pandas as pd
import pathlib
from pathlib import Path
from platform import system
from pprint import pprint
import shutil
import subprocess
import requests
import s3fs
import time as time
from tqdm import tqdm
import ujson
from urllib import request
import warnings
import xarray as xr

#https://pypi.org/project/python-cmr/
from cmr import CollectionQuery, GranuleQuery, ToolQuery, ServiceQuery, VariableQuery

warnings.filterwarnings("ignore")

# Dask often throws probably harmless warnings when the client/cluster leave
# the context manager about "stream closed" connectionPool heartbeat_worker etc.
# Silencing the warning with the following as per:
# https://github.com/dask/distributed/issues/7105
warnings.simplefilter("ignore", distributed.comm.core.CommClosedError)

NameError: name 'distributed' is not defined

<dask.config.set at 0x7f5943aa30d0>

In [9]:
dtmp =  Path('/tmp/dask-scratch-space');

if dtmp.exists():
    try:
        shutil.rmtree(dtmp)
    except: 
        print('could not delete dask scratch space')
else:
    print(f'{dtmp} does not exist')

# from the following:
# https://dask.discourse.group/t/dask-workers-killed-because-of-heartbeat-fail/856
from dask import config as cfg 
cfg.set({'distributed.scheduler.worker-ttl': None})
cfg.set({'heartbeat': '90s', 'scheduler-info-interval': '90s'})
cfg.set(temporary_directory=str('/tmp'))

/tmp/dask-scratch-space does not exist


<dask.config.set at 0x7f5a8778d150>

# subroutines

In [13]:
## Define Helper Subroutines
### Helper subroutine to log into and access files on NASA EarthData

# not pretty but it works
def setup_earthdata_login_auth(url: str='urs.earthdata.nasa.gov'):
    # look for the netrc file and use the login/password
    try:
        username, _, password = netrc(file=_netrc).authenticators(url)

    # if the file is not found, prompt the user for the login/password
    except (FileNotFoundError, TypeError):
        print('Please provide Earthdata Login credentials for access.')
        username, password = input('Username: '), getpass('Password: ')
    
    manager = request.HTTPPasswordMgrWithDefaultRealm()
    manager.add_password(None, url, username, password)
    auth = request.HTTPBasicAuthHandler(manager)
    jar = CookieJar()
    processor = request.HTTPCookieProcessor(jar)
    opener = request.build_opener(auth, processor)
    request.install_opener(opener)


def init_S3FileSystem():
    """
    This routine automatically pulls your EDL crediential from .netrc file and use it to obtain an AWS S3 credential 
    through a PO.DAAC service accessible at https://archive.podaac.earthdata.nasa.gov/s3credentials.
    From the PO.DAAC Github (https://podaac.github.io/tutorials/external/July_2022_Earthdata_Webinar.html).
    
    Returns:
    =======
    
    s3: an AWS S3 filesystem
    """
  
    import requests,s3fs
    credentials = requests.get('https://archive.podaac.earthdata.nasa.gov/s3credentials').json()
    s3 = s3fs.S3FileSystem(anon=False,
                           key=credentials['accessKeyId'],
                           secret=credentials['secretAccessKey'], 
                           token=credentials['sessionToken'])
    return s3, credentials


# Uses https://pypi.org/project/python-cmr/
def get_granule_urls(ShortName):
    gq = GranuleQuery()
    gq.short_name(ShortName)
    tmp = gq.get_all()
    
    s3_files_list = []
    for t in tmp:
        s3_files_list.append(t['links'][0]['href'])

    s3_urls = [x.split('s3://')[1] for x in s3_files_list]
    
    return s3_urls, s3_files_list
    
# Uses https://pypi.org/project/python-cmr/
def get_dataset_names():
    cq = CollectionQuery()
    ds_names = [];
    tmp = cq.keyword('ECCO*V4R4').get_all()
    for t in tmp:
        ds_names.append(t['short_name'])
    return ds_names


In [14]:
# recursive delete directory (found somewhere on the internet)
def rm_tree(pth: Path):
    try:
        for child in pth.iterdir():
            if child.is_file():
                child.unlink()
            else:
                rm_tree(child)
        pth.rmdir()
    except:
        print('could not delete ', pth)
        

In [15]:
def make_output_dir(output_dir, make_clean=False):
    # prepare output directory

    if type(output_dir)!=pathlib.PosixPath:
        output_dir = Path(output_dir)
        
    # ... delete output directory if it already exists
    if make_clean:
        try:
            print(f'deleting {output_dir}')
            rm_tree(output_dir)
        except:
            print(f'could not delete {output_dir}')
            
    # ... make new output directory
    try:
        print(f'making {output_dir}')
        
        output_dir.mkdir(exist_ok=True, parents=True)
    except:
        print(f'could not make {output_dir}')

In [16]:
# loads a dataset from the 'kerchunk' json where the
# netcdf files are stored on s3
def load_dataset_from_json_s3(json_file_path, credentials, time_chunk=20):
    
    fs = fsspec.filesystem(
        "reference", 
        fo=json_file_path, 
        remote_protocol="s3", 
        remote_options={'anon':False, 
                        'key':credentials['accessKeyId'], 
                        'secret':credentials['secretAccessKey'],
                        'token':credentials['sessionToken']},
        skip_instance_cache=True
    )
    m = fs.get_mapper("")
    if time_chunk > 0:
        ds = xr.open_dataset(m, engine='zarr', consolidated=False, chunks={'time':time_chunk})
    else:
        ds = xr.open_dataset(m, engine='zarr', consolidated=False)
    
    ds.close()
    
    return ds

In [17]:
# loads a dataset from the 'kerchunk' json where the
# netcdf files are stored on a local disk
def load_dataset_from_json_local(json_file_path, s3_options):
    fs = fsspec.filesystem(
        "reference", 
        fo=json_file_path, 
        skip_instance_cache=True
    )
    m = fs.get_mapper("")
    ds = xr.open_dataset(m, engine='zarr', consolidated=False)
    print(ds)
    ds.close()
    return ds

In [38]:

def update_credential(credentials, force=False):
    now=np.datetime64(datetime.now())
    
    # expiration time of current credential
    exp=np.datetime64(credentials['expiration'][:-6])
    # current time
    
    # how much time is left [seconds]
    td_sec = np.double(exp-now)/1e6

    # if < 1800 seconds left before credential expires, renew it
    if (td_sec < 1800) or (force==True):
        print(f'... updating credentials, {td_sec}s remaining')
        s3, credentials = init_S3FileSystem()
        exp=np.datetime64(credentials['expiration'][:-6])
        td_sec = np.double(exp-now)/1e6
        print(f'... after credential update, {td_sec}s remaining')
    else:
        print(f'... not updating credentials, {td_sec}s remaining')
        
    return credentials
    

In [19]:
# use Kerchunk to make a local json corresponding to
# a single netcdf file stored on s3 

# inputs:
# url: netcdf url on s3
# output_dir : where to save this bad boy
# credentials: podaac aws credentials
# inline treshold, see below

def gen_json(url, output_dir, credentials, inline_threshold=10000):
    
    # not surethe implication of using nonzero inline threshold
    # ... "inline_threshold – int Size below which binary blocks are included directly in the output"
    # ... "inline_threshold – Byte size below which an array will be embedded in the output. Use 0 to disable inlining."
    # from https://fsspec.github.io/kerchunk/reference.html
 
    so = dict(
        mode="rb", anon=False, 
        default_fill_cache=False,
        default_cache_type="readahead", 
        key=credentials['accessKeyId'], 
        secret=credentials['secretAccessKey'],
        token=credentials['sessionToken'],
    )
    with fsspec.open(url, **so) as inf:
        # h5chunks translate operation takes the vast majority of the time
        h5chunks = SingleHdf5ToZarr(inf, url, inline_threshold=inline_threshold).translate()

    # in contrast, ujson.dumps and write to disk are super fast
    with open(f"{output_dir}/{url.split('/')[-1]}.json", 'wb') as outf:
        outf.write(ujson.dumps(h5chunks).encode())

In [20]:
import concurrent.futures
import urllib.request

def single_kerchunk_driver_pool(ShortName, 
                                output_base_dir = Path('/home/jpluser/efs-mount-point/ifenty/ecco_v4r4'),
                                inline_threshold=10000,
                                debug=True):
    
    print('\n***************************************')
    print('\nSelected dataset ', ShortName)
    print('\n***************************************')
    
    print('locating urls:')
    s3_urls, full_urls = get_granule_urls(ShortName)   
    print(f'\n{ShortName} has {len(s3_urls)} files')

    # make output dir
    output_dir = output_base_dir / ShortName
    make_output_dir(output_dir, make_clean=True)

    urls_to_process = full_urls
    if debug:
        urls_to_process = full_urls[:ncpus*2]
    n_urls = len(urls_to_process)
    print(f'processing urls {n_urls}')

    # split into batches because of podaac credential expiration
    # each batch would process 128 granules
    nmzz_jobs = int(n_urls/(312))+1

    # divide into batches
    url_split = np.array_split(np.array(urls_to_process), nmzz_jobs) 
    url_split = [list(x) for x in url_split]
    print(f'splitting urls into {nmzz_jobs} batches')

    # begin processing granules
    start_time=time.time()

    url_split_count = 0
    for url_subset in url_split:
        # update credentials if necessary
        update_credential(credentials)
        
        url_split_count =  url_split_count +1
        print(f'... url_subset {url_split_count}, n={len(url_subset)}')

        with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
            start_time_b=time.time()

            futures = []
            for url in url_subset:
                futures.append(executor.submit(gen_json, url=url, output_dir=output_dir, credentials=credentials))
    
            n_returned = 0
            for future in concurrent.futures.as_completed(futures):
                n_returned = n_returned + 1
                if n_returned % 100 ==0:
                    print(' ', n_returned)

            end_time_b = time.time()
            tt = end_time_b-start_time_b
            tpg = tt/len(url_subset)
            
        print(f'** subset {url_split_count} finished processing {len(url_subset)} in {tt:.2f}s, time per granule {tpg:.2f}s')
        
    end_time = time.time()
    tt = end_time-start_time
    tpg = tt/n_urls
    print(f'\n** SINGLE KERCHUNK finished processing {n_urls} in {tt:.2f}s, time per granule {tpg:.2f}s')
    
    
    return output_dir
    

In [29]:
# Makes a kerchunk json for all granules in the podaac dataset with ShortName
# dask slightly speeds up the processing
# the lag is in the .translate command of the kerchunk
# it doesn't seem to parallelize. may be related to fsspec

def single_kerchunk_driver_dask(ShortName, 
                                output_base_dir = Path('/home/jpluser/efs-mount-point/ifenty/ecco_v4r4'),
                                debug=True):

    # spin up a local cluster
    with LocalCluster(n_workers=32,
                      processes=True,
                      threads_per_worker=1,
                      memory_limit='auto',
                      dashboard_address=':8787') as cluster, Client(cluster) as client:
            
        print(f'dask dashboard at : {cluster.dashboard_link}')
        ncores = client.ncores()
        num_dask_threads= np.sum([ client.ncores()[key] for key in list(ncores.keys())])
        
        print('\n***************************************')
        print('Selected dataset ', ShortName)
        print('***************************************')
    
        # get url list
        s3_urls, full_urls = get_granule_urls(ShortName)    
        print(f'\n{ShortName} has {len(s3_urls)} files')
    
        # make output dir
        output_dir = output_base_dir / ShortName
        make_output_dir(output_dir)
        
        # process nc files at urls to single kerchunk json files
        if debug:
            urls_to_process = full_urls[:128]
        else:
            urls_to_process = full_urls
    
        n_urls=len(urls_to_process)
        print(f'\nProcessing {n_urls} files!')
                
        # split into batches because of podaac credential expiration
        nmzz_jobs = int(n_urls/(312))+1
        print(f'** splitting url master list into {nmzz_jobs} jobs')
    
        # divide into batches
        url_split = np.array_split(np.array(urls_to_process), nmzz_jobs) 
        url_split = [list(x) for x in url_split]
    
        # process in batches
        start_time=time.time()
        url_split_count = 1
    
        print("** starting single kerchunk processing at ", datetime.now())
    
        # ... loop through batches
        for url_subset in url_split:
            # update credentials if necessary
            update_credential(credentials)
            
            print(f'... starting url_subset {url_split_count} of {nmzz_jobs}')
            stb = time.time()
    
            # loop through urls in this batch, append dask delayed job to "results" list
            results = []
            for url in url_subset:
                results.append(dask.delayed(gen_json)(url=url, output_dir=output_dir, credentials=credentials))
            # compute all of the delayed gen_json commands
            d = dask.compute(*results)
            
            # ... stats
            etb = time.time(); ttb = etb-stb; tpgb = ttb/len(url_subset)
            print(f'... url_subset finished processing {len(url_subset)} in {ttb:.2f}s, time per granule {tpgb:.2f}s')
            
            # increment counter
            url_split_count = url_split_count + 1
        
        # status
        end_time = time.time();tt = end_time-start_time;tpg = tt/n_urls
        print(f'** SINGLE KERCHUNK finished processing {n_urls} in {tt:.2f}s, time per granule {tpg:.2f}s')
        
        client.close(timeout=360)
        cluster.close(timeout=360)
        client=None; cluster=None
    
    return str(output_dir)

In [30]:
# Multi-Zarr to Zarr.  Converts the many individual jsons (one for each file) to a single mega json
# very important to specify identical dims so the routine doesn't add
# time dimensions to non-time coordinates and dimensions (like XC, YC)

# this version just returns the translated jsons so I can 
# use dask distributed to parallelize this step
def multi_multizarr(params, inline_threshold=10000):
    json_list = params[0]
    credentials = params[1]
    time_invariant_dims_and_coords= params[2]
    
    mzz = MultiZarrToZarr(
        json_list,
        remote_protocol="s3",
        remote_options={'anon':False, 
                        'key':credentials['accessKeyId'], 
                        'secret':credentials['secretAccessKey'],
                        'token':credentials['sessionToken']},
        concat_dims='time',
        inline_threshold=inline_threshold,
        identical_dims=time_invariant_dims_and_coords
    )
    
    return mzz.translate()
    

In [31]:
def mzz_driver(ShortName, 
               credentials, 
               single_json_output_dir, 
               mzz_output_dir, 
               use_dask =False, debug=True):
    
    ##### inputs:
    # ShortName: podaac shortname
    # credentials: dictionary of aws credentials
    # single_json_output_dir: directory with all the single kerchunk json files for this ShortName dataset
    # mzz_output_dir : directory where we will save the final single kerchunk json file for the dataset
    # use_dask : boolean
    # debug : boolean
    ####

    ##### outputs:
    # final_json_fname: the full path to the new single kerchunk json file
    
    ## MAKE LIST OF ALL SINGLE JSON KERCHUNK FILES
    print(single_json_output_dir, type(single_json_output_dir))
    json_list = np.sort(list(single_json_output_dir.glob('*json')))
    json_list = [str(p) for p in json_list]

    ## MAKE OUTPUT DIRECTORY FOR THE NEW MZZ FILE
    make_output_dir(mzz_output_dir)
    
    # MAKE THE NAME OF THE NEW MZZ FILE
    # ... pass the list of dimensions that are time invariant
    final_json_fname = f'{mzz_output_dir}/{ShortName}.json'
    print(f'\n... final reference output json filename: {final_json_fname}')

    # FIND DIMENSIONS AND COORDS OF THIS DATASET THAT ARE DO NOT VARY IN TIME
    # ... load one example json file (the first one)
    print('\nfinding time invariant dims:')
    example_dataset = load_dataset_from_json_s3(str(json_list[0]), credentials)

    if 'time' not in example_dataset.dims:
        return
        
    # ... then find all coords and dims without 'time' in the name    
    time_invariant_dims_and_coords = find_time_invariant_dims_and_coords(example_dataset)
        
    start_time=time.time()
    n_jsons = len(json_list)
    print(f'found {n_jsons} json files in {single_json_output_dir}')

    print("** starting mzz processing at ", datetime.now())
    if use_dask: # IF WE USE DASK
        # # split up the MZZ in n parallel jobs
        # # inspired by this https://gist.github.com/peterm790/5f901453ed7ac75ac28ed21a7138dcf8

        # spin up a cluster
        with LocalCluster(n_workers=8,
                          processes=True,
                          threads_per_worker=1,
                          memory_limit='auto',
                          dashboard_address=':8787') as cluster, Client(cluster) as client:

            print(f'dask dashboard at : {cluster.dashboard_link}')
            ncores = client.ncores()
            num_dask_threads= np.sum([ client.ncores()[key] for key in list(ncores.keys())])
            #print(ncores, num_dask_threads)

            # default, process all jsons, use all threads
            # split into batches because of podaac credential expiration
            
            jsons_to_process = json_list
            n_jsons = len(jsons_to_process)
            
            nmzz_jobs = int(n_jsons/(32))+1
            print(f'** splitting url master list into {nmzz_jobs} jobs')
        
            # split the long list of jsons into nmzz_jobs shorter lists
            json_split = np.array_split(np.array(jsons_to_process), nmzz_jobs) 
            json_split = [list(x) for x in json_split]    
    
            print(f'** using dask we split jsons_to_process into {len(json_split)} batches')

            credentials = update_credential(credentials)
            # using dask, we send each shorter list ot multi_multizarr
            # for parallel processing
            intermediate_jsons = []
            for json_sublist in json_split:
                params = [json_sublist, credentials, time_invariant_dims_and_coords]
                x= dask.delayed(multi_multizarr)(params)
                intermediate_jsons.append(x)

            print('... making second level dask delayed')
            # this is the trippy bit, we don't compute the intermediate jsons yet
            # we send the *intermediate* jsons to dask delayed and compute afterwards
            # ... some kind of recursive json merging
            params = [intermediate_jsons, credentials, time_invariant_dims_and_coords]
            d = dask.delayed(multi_multizarr)(params)
            print('... ordering dask compute')
            d = d.compute()
        
        
    else: # IF WE DON'T USE DASK
        if debug:
            n_jsons = 6

        jsons_to_process = json_list[:n_jsons]
        d = multi_multizarr([jsons_to_process, credentials, time_invariant_dims_and_coords])

    ## FINISHED PROCESSING ... SAVE TO DISK
    print('... writing to disk ', final_json_fname)
    with open(final_json_fname, 'wb') as f:
        f.write(ujson.dumps(d).encode())

    ## STATS
    end_time = time.time()
    tt = end_time-start_time
    tpg = tt/n_jsons
    
    print(f'** MZZ finished processing {n_jsons} in {tt:.2f}s')
    print(f'** MZZ time per jsons_file {tpg:.2f}s')

    return final_json_fname


In [32]:
# find the set of time invariant dimensions and coordinates for this dataset
# ... the only coords and dimensions that vary with time have the name time in them!
# ... this is needed for MZZ
# ... so we don't duplicate time-invariant dims and coords in the multi-zarr json

def find_time_invariant_dims_and_coords(example_dataset):
    #... use set instead of list to avoid duplicates
    time_invariant_dims_and_coords = set()
    for c in example_dataset.coords:
        if 'time' not in c:
            time_invariant_dims_and_coords.add(c)
    for d in example_dataset.dims:
        if 'time' not in d:
            time_invariant_dims_and_coords.add(d)
    
    #... convert set to tuple
    time_invariant_dims_and_coords = tuple(time_invariant_dims_and_coords)
    print(f'\ntime_invariant_dims_and_coords: {time_invariant_dims_and_coords}')
   
    return time_invariant_dims_and_coords
    

# Run from here

In [33]:
_netrc = join(expanduser('~'), "_netrc" if system()=="Windows" else ".netrc")
print(_netrc, type(_netrc))

/home/jpluser/.netrc <class 'str'>


In [34]:
import multiprocessing
ncpus = multiprocessing.cpu_count()
print(ncpus)

32


In [25]:
## time varying

#globs = []
#globs = ['GEOMETRY', 'MIX_COEFFS', '05DEG_MONTHLY', 'LLC0090GRID_MONTHLY']

#globs = ['05DEG_MONTHLY', 'LLC0090GRID_MONTHLY']
globs = [ '05DEG_DAILY', 'LLC0090GRID_DAILY', 'GRID_SNAP',
          'GEO', 'MIX_COEFFS']

ds_names_all = get_dataset_names()

ds_names_dict = dict()
for glob in globs:
    ds_names_dict[glob] = []
    
for ds_name in ds_names_all:
    for glob in globs:

        if glob in ds_name:
            ds_names_dict[glob].append(ds_name)

for glob in globs:  
    print(f'=============== {glob} ===========')
    print(len(ds_names_dict[glob]))
    pprint(ds_names_dict[glob])

13
['ECCO_L4_ATM_STATE_05DEG_DAILY_V4R4',
 'ECCO_L4_BOLUS_05DEG_DAILY_V4R4',
 'ECCO_L4_DENS_STRAT_PRESS_05DEG_DAILY_V4R4',
 'ECCO_L4_FRESH_FLUX_05DEG_DAILY_V4R4',
 'ECCO_L4_HEAT_FLUX_05DEG_DAILY_V4R4',
 'ECCO_L4_MIXED_LAYER_DEPTH_05DEG_DAILY_V4R4',
 'ECCO_L4_OBP_05DEG_DAILY_V4R4',
 'ECCO_L4_OCEAN_VEL_05DEG_DAILY_V4R4',
 'ECCO_L4_SEA_ICE_CONC_THICKNESS_05DEG_DAILY_V4R4',
 'ECCO_L4_SEA_ICE_VELOCITY_05DEG_DAILY_V4R4',
 'ECCO_L4_SSH_05DEG_DAILY_V4R4',
 'ECCO_L4_STRESS_05DEG_DAILY_V4R4',
 'ECCO_L4_TEMP_SALINITY_05DEG_DAILY_V4R4']
20
['ECCO_L4_ATM_STATE_LLC0090GRID_DAILY_V4R4',
 'ECCO_L4_BOLUS_LLC0090GRID_DAILY_V4R4',
 'ECCO_L4_DENS_STRAT_PRESS_LLC0090GRID_DAILY_V4R4',
 'ECCO_L4_FRESH_FLUX_LLC0090GRID_DAILY_V4R4',
 'ECCO_L4_HEAT_FLUX_LLC0090GRID_DAILY_V4R4',
 'ECCO_L4_MIXED_LAYER_DEPTH_LLC0090GRID_DAILY_V4R4',
 'ECCO_L4_OBP_LLC0090GRID_DAILY_V4R4',
 'ECCO_L4_OCEAN_3D_MOMENTUM_TEND_LLC0090GRID_DAILY_V4R4',
 'ECCO_L4_OCEAN_3D_SALINITY_FLUX_LLC0090GRID_DAILY_V4R4',
 'ECCO_L4_OCEAN_3D_TEMPERATUR

In [40]:
cfg.set({'distributed.scheduler.worker-ttl': None})

<dask.config.set at 0x7f578cc00290>

In [41]:
# loop through each dataset type
debug=False
use_dask=True

s3, credentials = init_S3FileSystem()
for glob in globs:
    if debug:
        output_base_dir = Path(f'/tmp/ecco_v4r4/MZZ_{glob}/')
    else:
        output_base_dir = Path(f'/home/jpluser/efs-mount-point/ifenty/ecco_v4r4/MZZ_{glob}/')
    
    ds_names_to_process = ds_names_dict[glob]

    # loop through each dataset of this type and convert
    for ShortName in ds_names_to_process:
        print('\n**** Processing ', ShortName)
        
        credentials = update_credential(credentials)

        test_path = output_base_dir / f'{ShortName}.json'
        if test_path.exists():
            print(f'found {ShortName}.json!, skipping')
            continue

        if use_dask:
            try:
                output_dir = single_kerchunk_driver_dask(ShortName, 
                                                 output_base_dir=output_base_dir, 
                                                 debug=debug)
            except Exception as ex:
                print('skdd failed with ', ex)
                
        else:
            output_dir = single_kerchunk_driver_pool(ShortName, 
                                                         output_base_dir=output_base_dir, 
                                                         debug=debug)

                                    
        # update credentials before mzz
        credentials = update_credential(credentials, force=True)
        print('mzz ', output_dir)
        final_mzz_fname    = mzz_driver(ShortName, 
                                credentials=credentials,
                                single_json_output_dir=Path(output_dir), 
                                mzz_output_dir=output_base_dir, 
                                use_dask =use_dask, debug=debug)

        print(f'** finished processing {ShortName}\n\n')


**** Processing  ECCO_L4_ATM_STATE_05DEG_DAILY_V4R4
... not updating credentials, 3599.179394s remaining
found ECCO_L4_ATM_STATE_05DEG_DAILY_V4R4.json!, skipping

**** Processing  ECCO_L4_BOLUS_05DEG_DAILY_V4R4
... not updating credentials, 3599.178401s remaining
found ECCO_L4_BOLUS_05DEG_DAILY_V4R4.json!, skipping

**** Processing  ECCO_L4_DENS_STRAT_PRESS_05DEG_DAILY_V4R4
... not updating credentials, 3599.177916s remaining
found ECCO_L4_DENS_STRAT_PRESS_05DEG_DAILY_V4R4.json!, skipping

**** Processing  ECCO_L4_FRESH_FLUX_05DEG_DAILY_V4R4
... not updating credentials, 3599.177501s remaining
found ECCO_L4_FRESH_FLUX_05DEG_DAILY_V4R4.json!, skipping

**** Processing  ECCO_L4_HEAT_FLUX_05DEG_DAILY_V4R4
... not updating credentials, 3599.177089s remaining
found ECCO_L4_HEAT_FLUX_05DEG_DAILY_V4R4.json!, skipping

**** Processing  ECCO_L4_MIXED_LAYER_DEPTH_05DEG_DAILY_V4R4
... not updating credentials, 3599.176707s remaining
found ECCO_L4_MIXED_LAYER_DEPTH_05DEG_DAILY_V4R4.json!, skippin