# Benchmarking kerchunk improvements

- JSON reference file
- Parquet reference file generated using `refs_to_dataframe` in [kerchunk=0.1.0](https://github.com/fsspec/kerchunk/blob/6609486399626fc3120ae264b0aaaad4490da597/kerchunk/df.py#L28)
    - use environment `kerchunk-0.1.0`
- Parquet reference file generated using `refs_to_dataframe` in my local clone of [this PR](https://github.com/agoodm/kerchunk/blob/7ec1855c86fee1f4bc2122eefd274ad11d96ce45/kerchunk/df.py#L101) which builds on [this PR](https://github.com/fsspec/kerchunk/pull/298) (and uses [this fsspec PR](https://github.com/fsspec/filesystem_spec/pull/1188)). My clone includes edit(s) to get this function to run.
    - use environment `kerchunk-PR298`

In [1]:
%cd /g/data/tm70/ds0092/projects/dev_data_querying/cosima_intake/kerchunk_benchmark

/g/data/tm70/ds0092/projects/dev_data_querying/cosima_intake/kerchunk_benchmark


In [2]:
import os
import sys
import time
import glob
import shutil
import datetime

import pandas as pd
import numpy as np
import xarray as xr

import fsspec
import fsspec.implementations.reference
from kerchunk import df

# Write the parquet reference from a json reference

In [3]:
filename = "ocean_month"
json_ref = (
    "/g/data/tm70/ds0092/projects/dev_data_querying/cosima_intake/"
    f"kerchunk_025deg_jra55_iaf_omip2/025deg_jra55_iaf_omip2/{filename}.json"
)

conda_env = sys.executable.split(os.sep)[-3]

In [4]:
if conda_env == "kerchunk-PR298":
    row_group_size = 1000
    kwargs = dict(
        consolidated=True,
        row_group_size=row_group_size
    )
    parquet_file = f"./{filename}.{conda_env}.{row_group_size}.parq"
elif conda_env == "kerchunk-0.1.0":
    kwargs = dict(
        partition=True
    )
    parquet_file = f"./{filename}.{conda_env}.parq"
else:
    raise ValueError("Unrecognised conda environment")

In [5]:
%%time

if os.path.exists(parquet_file) and os.path.isdir(parquet_file):
    shutil.rmtree(parquet_file)
os.mkdir(parquet_file)

df.refs_to_dataframe(json_ref, parquet_file, **kwargs)

CPU times: user 2min 49s, sys: 5.58 s, total: 2min 55s
Wall time: 2min 56s


# Somewhat strenuous test calculation: calculate the total kinetic energy

These cells were run with an entire `hugemembw` node (28 cores, 1020 GB)

In [5]:
from distributed import Client

client = Client(threads_per_worker=1)
client.dashboard_link

'/proxy/8787/status'

In [6]:
def TKE(u, v):
    """ Calculate the total kinetic energy """
    
    depth_dim = "st_ocean"
    KE = 0.5*(u**2 + v**2)
    dz = xr.DataArray(
        np.gradient(ds["st_ocean"]),
        coords={"st_ocean": ds["st_ocean"]}
    )
    return (KE * dz).mean('time').sum('st_ocean')

In [7]:
# Process everything using the same chunking
chunks = {
    "cycle": 1, 
    "time": 183, 
    "st_ocean": 25, # netcdf chunking
    "yu_ocean": 108, # netcdf chunking
    "xu_ocean": 120, # netcdf chunking
}

## `kerchunk-0.1.0`: open using `open_mfdataset`
- 741617 tasks
- Each worker uses up to ~10 GB (managed), though this varies quite a bit across workers and throughout the task. Typically sits around 4 GB.
- Quite a few transfers in the task stream

**Open mfdataset took 0:00:56.892310 (h:m:s)** \
**Compute took 0:55:08.833007 (h:m:s)**

In [8]:
def open_025deg_jra55_iaf_omip2(filename):
    """ Open all cycles of 025deg_jra55_iaf_omip2 using xarray open_mfdataset """
    
    exp_root = "/g/data/ik11/outputs/access-om2-025/025deg_jra55_iaf_omip2_cycle?"
    files = []
    for cycle in sorted(glob.glob(exp_root)):
        files.append(sorted(glob.glob(f"{cycle}/output*/ocean/{filename}.nc")))

    ds = []
    for f in files:
        ds.append(
            xr.open_mfdataset(
                f,
                chunks=chunks,
                parallel=True,
                use_cftime=True,
                data_vars="minimal", 
                coords="minimal", 
                compat="override",
                drop_variables=["average_T1", "average_T2"]
            )
        )
    return xr.concat(
        ds, dim=pd.Index(range(1, len(files)+1), name="cycle")
    ).chunk(chunks)

In [9]:
if conda_env == "kerchunk-0.1.0":
    tic = time.perf_counter()
    ds = open_025deg_jra55_iaf_omip2(filename)
    toc = time.perf_counter()
    print(f"Open mfdataset took {str(datetime.timedelta(seconds=toc-tic))} (h:m:s)")
    
    tic = time.perf_counter()
    tke = TKE(ds["u"], ds["v"]).compute()
    toc = time.perf_counter()
    print(f"Compute took {str(datetime.timedelta(seconds=toc-tic))} (h:m:s)")

Open mfdataset took 0:00:56.892310 (h:m:s)
Compute took 0:55:08.833007 (h:m:s)


## `kerchunk-0.1.0`: open from JSON reference

- 9367 tasks
- Uses 25GB to get mapper
- Each worker uses ~19GB (unmanaged)
- Lots of transfers in the task stream
- Stalls fairly regularly for no obvious reason
- Throws lots of concerning-looking `OSError: Timed out during handshake while connecting to tcp://127.0.0.1:37595 after 30 s`

**Getting mapper took 0:02:28.607415 (h:m:s)** \
**Open dataset took 0:00:01.042502 (h:m:s)** \
**Compute took 1:47:11.828907 (h:m:s)**

In [8]:
if conda_env == "kerchunk-0.1.0":
    tic = time.perf_counter()
    m = fsspec.implementations.reference.ReferenceFileSystem(
        json_ref
    ).get_mapper()
    toc = time.perf_counter()
    print(f"Getting mapper took {str(datetime.timedelta(seconds=toc-tic))} (h:m:s)")
    
    tic = time.perf_counter()
    ds = xr.open_dataset(
        m,
        engine='zarr', 
        backend_kwargs={"consolidated": True},
        use_cftime=True,
        drop_variables=["average_T1", "average_T2"], # Need to work out what's wrong with these
        chunks=chunks,
        inline_array=True,
    )
    toc = time.perf_counter()
    print(f"Open dataset took {str(datetime.timedelta(seconds=toc-tic))} (h:m:s)")
    
    tic = time.perf_counter()
    tke = TKE(ds["u"], ds["v"]).compute()
    toc = time.perf_counter()
    print(f"Compute took {str(datetime.timedelta(seconds=toc-tic))} (h:m:s)")

Getting mapper took 0:02:28.607415 (h:m:s)




Open dataset took 0:00:01.042502 (h:m:s)


2023-03-09 09:42:32,583 - distributed.worker - ERROR - Worker stream died during communication: tcp://127.0.0.1:35697
Traceback (most recent call last):
  File "/g/data/tm70/ds0092/software/mambaforge/envs/kerchunk-0.1.0/lib/python3.11/asyncio/tasks.py", line 490, in wait_for
    return fut.result()
           ^^^^^^^^^^^^
  File "/g/data/tm70/ds0092/software/mambaforge/envs/kerchunk-0.1.0/lib/python3.11/site-packages/distributed/comm/tcp.py", line 225, in read
    frames_nbytes = await stream.read_bytes(fmt_size)
                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
asyncio.exceptions.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/g/data/tm70/ds0092/software/mambaforge/envs/kerchunk-0.1.0/lib/python3.11/site-packages/distributed/comm/core.py", line 328, in connect
    handshake = await asyncio.wait_for(comm.read(), time_left())
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/g/

Compute took 1:47:11.828907 (h:m:s)


## `kerchunk-0.1.0`: open from parquet reference

- 9367 tasks
- Uses 3GB to get mapper
- Each worker uses ~2GB (unmanaged)
- Quite a few transfers in the task stream

**Getting mapper took 0:00:09.013903 (h:m:s)** \
**Open dataset took 0:04:15.378189 (h:m:s)** \
**Compute took 0:44:57.622551 (h:m:s)**


In [8]:
if conda_env == "kerchunk-0.1.0":
    tic = time.perf_counter()
    m = fsspec.implementations.reference.DFReferenceFileSystem(
        f"./{filename}.kerchunk-0.1.0.parq", lazy=True
    ).get_mapper()
    toc = time.perf_counter()
    print(f"Getting mapper took {str(datetime.timedelta(seconds=toc-tic))} (h:m:s)")
    
    tic = time.perf_counter()
    ds = xr.open_dataset(
        m,
        engine='zarr', 
        backend_kwargs={"consolidated": False},
        use_cftime=True,
        drop_variables=["average_T1", "average_T2"], # Need to work out what's wrong with these
        chunks=chunks,
    )
    toc = time.perf_counter()
    print(f"Open dataset took {str(datetime.timedelta(seconds=toc-tic))} (h:m:s)")
    
    tic = time.perf_counter()
    tke = TKE(ds["u"], ds["v"]).compute()
    toc = time.perf_counter()
    print(f"Compute took {str(datetime.timedelta(seconds=toc-tic))} (h:m:s)")

Getting mapper took 0:00:09.013903 (h:m:s)




Open dataset took 0:04:15.378189 (h:m:s)
Compute took 0:44:57.622551 (h:m:s)


## `kerchunk-PR298`: open from parquet reference with row-group size = 10000

- 9367 tasks
- Uses 3GB to get mapper
- Each worker uses ~2GB (unmanaged)
- Quite a few transfers in the task stream

In [8]:
if conda_env == "kerchunk-PR298":
    tic = time.perf_counter()
    m = fsspec.implementations.reference.ReferenceFileSystem(
        f"./{filename}.kerchunk-PR298.10000.parq"
    ).get_mapper()
    toc = time.perf_counter()
    print(f"Getting mapper took {str(datetime.timedelta(seconds=toc-tic))} (h:m:s)")
    
    tic = time.perf_counter()
    ds = xr.open_dataset(
        m,
        engine='zarr', 
        backend_kwargs={"consolidated": True},
        use_cftime=True,
        drop_variables=["average_T1", "average_T2"], # Need to work out what's wrong with these
        chunks=chunks,
    )
    toc = time.perf_counter()
    print(f"Open dataset took {str(datetime.timedelta(seconds=toc-tic))} (h:m:s)")
    
    tic = time.perf_counter()
    tke = TKE(ds["u"], ds["v"]).compute()
    toc = time.perf_counter()
    print(f"Compute took {str(datetime.timedelta(seconds=toc-tic))} (h:m:s)")

Getting mapper took 0:24:27.564863 (h:m:s)




Open dataset took 0:00:01.827059 (h:m:s)




KilledWorker: Attempted to run task original-open_dataset-179e98143533bd0c1eca249e765da64du-074efa37680861a858c62702d591a8db on 3 different workers, but all those workers died while running it. The last worker that attempt to run the task was tcp://127.0.0.1:41997. Inspecting worker logs is often a good next step to diagnose what went wrong. For more information see https://distributed.dask.org/en/stable/killed.html.



## `kerchunk-PR298`: open from parquet reference with row-group size = 1000

In [None]:
if conda_env == "kerchunk-PR298":
    tic = time.perf_counter()
    m = fsspec.implementations.reference.ReferenceFileSystem(
        f"./{filename}.kerchunk-PR298.1000.parq"
    ).get_mapper()
    toc = time.perf_counter()
    print(f"Getting mapper took {str(datetime.timedelta(seconds=toc-tic))} (h:m:s)")
    
    tic = time.perf_counter()
    ds = xr.open_dataset(
        m,
        engine='zarr', 
        backend_kwargs={"consolidated": True},
        use_cftime=True,
        drop_variables=["average_T1", "average_T2"], # Need to work out what's wrong with these
        chunks=chunks,
    )
    toc = time.perf_counter()
    print(f"Open dataset took {str(datetime.timedelta(seconds=toc-tic))} (h:m:s)")
    
    tic = time.perf_counter()
    tke = TKE(ds["u"], ds["v"]).compute()
    toc = time.perf_counter()
    print(f"Compute took {str(datetime.timedelta(seconds=toc-tic))} (h:m:s)")