In [1]:
import json
import fsspec
import os
import fsspec_reference_maker.hdf as fshdf

In [2]:
import dask.bag as db
import dask
from dask.diagnostics import ProgressBar

In [3]:
fs = fsspec.filesystem('file')
all_files = fs.glob(f'/home/ec2-user/SageMaker/nc2zarr/inputs/argo/1901126/converted/*.nc')
len(all_files)

346

In [4]:
from fsspec_reference_maker.hdf import SingleHdf5ToZarr
import os
refdir = "/home/ec2-user/SageMaker/nc2zarr/outputs/argo/refs/1901126/"
os.makedirs(refdir, exist_ok = True)

In [5]:
def save_refs(s3_fn):
#     s3_fn = 's3://' + s3_fn
    references = refdir + os.path.basename(s3_fn).replace('.nc','.json')

    if not os.path.exists(references):        
        with fsspec.open(s3_fn, 
                         anon=True, 
                         mode='rb', 
                         default_fill_cache=False, 
                         default_cache_type='none') as f:
            h5chunks = SingleHdf5ToZarr(f, s3_fn, True)
            json_str = h5chunks.translate()

        with open(references,'wt') as f:
            json.dump(json_str,f,indent=4)

    return references

In [6]:
b = db.from_sequence(all_files,npartitions=200)
references = b.map(save_refs)

In [7]:
# load profilers
from dask.diagnostics import Profiler, ResourceProfiler, CacheProfiler

with dask.config.set(scheduler='threads'):
    with ProgressBar():
        with Profiler() as prof, ResourceProfiler() as rprof, CacheProfiler() as cprof:
            references.compute()

[########################################] | 100% Completed |  0.1s


In [8]:
from dask.diagnostics import visualize
visualize([prof, rprof, cprof])