# ZARR2: Join JSON refs for NetCDF3 files using MultiZarrToZarr

In [1]:
cluster_type = 'Coiled'

if cluster_type == 'Coiled':
    import coiled
    cluster = coiled.Cluster(
        region="us-east-1",
        arm=True,   # run on ARM to save energy & cost
        worker_vm_types=["t4g.large"],  # cheap, small ARM instances, 2cpus, 2GB RAM
        worker_options={'nthreads':2},
        n_workers=30,
        wait_for_workers=False,
        compute_purchase_option="spot_with_fallback",
        name='fvcom-cluster',   # Dask cluster name
        software='pangeo-worker',  # Conda environment name
        workspace='esip-lab',
        timeout=180   # leave cluster running for 3 min in case we want to use it again
    )

    client = cluster.get_client()



Output()

Could not get token from client GCP session. This is not a concern unless you're planning to use forwarded GCP credentials on your cluster. The error was: Could not automatically determine credentials. Please set GOOGLE_APPLICATION_CREDENTIALS or explicitly create credentials and re-run the application. For more information, please see https://cloud.google.com/docs/authentication/getting-started


In [2]:
import fsspec
import kerchunk
from kerchunk.combine import MultiZarrToZarr
from fsspec.implementations.reference import LazyReferenceMapper
from pathlib import Path
import numpy as np
import ujson
kerchunk.__version__

'0.2.7'

In [3]:
import zarr
print(zarr.__version__)

2.18.7


In [4]:
so = dict(anon=True)

In [5]:
fs = fsspec.filesystem('s3', **so)

In [6]:
json_dir = 's3://umassd-fvcom/gom3/hindcast/individual_jsons'

In [7]:
ref_list = fs.glob(f'{json_dir}/*.json')
print(len(ref_list))
print(ref_list[0])
print(ref_list[-1])

468
umassd-fvcom/gom3/hindcast/individual_jsons/gom3_197801.json
umassd-fvcom/gom3/hindcast/individual_jsons/gom3_201612.json


In [8]:
import xarray as xr

In [9]:
drop_vars = ['Itime', 'Itime2', 'Times', 'file_date', 'iint', 'nprocs']

In [10]:
identical_dims = ['partition',
 'x',
 'y',
 'lon',
 'lat',
 'xc',
 'yc',
 'lonc',
 'latc',
 'siglay',
 'siglev',
 'h',
 'nv',
 'nbe',
 'ntsn',
 'nbsn',
 'ntve',
 'nbve',
 'a1u',
 'a2u',
 'aw0',
 'awx',
 'awy',
 'art2',
 'art1',
 'nprocs']

In [11]:
ref_list = [f's3://{ref}' for ref in ref_list]

In [12]:
from kerchunk.combine import MultiZarrToZarr, auto_dask, JustLoad

In [13]:
mzz_kwargs = dict(target_options=dict(anon=True),
        concat_dims=["time"],
        preprocess=kerchunk.combine.drop(tuple(drop_vars)),
        identical_dims=identical_dims)

In [17]:
n_batches = 60

In [18]:
%%time
d = auto_dask(
    ref_list,
    single_driver=JustLoad,
    single_kwargs={"storage_options": {"anon": True}},
    mzz_kwargs=mzz_kwargs,
    n_batches=n_batches,   # give one batch to each worker
    remote_protocol="s3",
    remote_options={"anon": True}
)

CPU times: user 7.07 s, sys: 1.73 s, total: 8.8 s
Wall time: 6min 54s


In [19]:
fs_local = fsspec.filesystem('file')

In [20]:
with fs_local.open('combined.json', 'wb') as f:
    f.write(ujson.dumps(d).encode())

In [21]:
from kerchunk import df

In [22]:
df.refs_to_dataframe('combined.json', 'combined.parq')

In [None]:
fs_s3 = fsspec.filesystem('s3', profile='umassd-fvcom')

In [None]:
_ = fs_s3.upload('combined.parq', 's3://umassd-fvcom/gom3/hindcast/parquet/', recursive=True, )