# Icechunk Performance - Icechunk

Using data from the [NCAR ERA5 AWS Public Dataset](https://nsf-ncar-era5.s3.amazonaws.com/index.html).

In [1]:
import xarray as xr
import zarr
import dask
import fsspec
from dask.diagnostics import ProgressBar

import icechunk
from icechunk import IcechunkStore, StorageConfig

print('xarray:  ', xr.__version__)
print('dask:    ', dask.__version__)
print('zarr:    ', zarr.__version__)
print('icechunk:', icechunk.__version__)

xarray:   0.9.7.dev3734+g26081d4f
dask:     2024.9.1+8.g70f56e28
zarr:     3.0.0b0
icechunk: 0.1.0-alpha.1


In [2]:
zarr.config.set(
    {
        'threading.max_workers': 16,
        'async.concurrency': 128
    }
)

<donfig.config_obj.ConfigSet at 0x7f9ea9f36690>

In [3]:
url = "https://nsf-ncar-era5.s3.amazonaws.com/e5.oper.an.pl/194106/e5.oper.an.pl.128_060_pv.ll025sc.1941060100_1941060123.nc"
%time ds = xr.open_dataset(fsspec.open(url).open(), engine="h5netcdf", chunks={"time": 1})
ds = ds.drop_encoding()

CPU times: user 246 ms, sys: 51.8 ms, total: 297 ms
Wall time: 2.22 s


  var_chunks = _get_chunk(var, chunks, chunkmanager)


In [4]:
print(ds)

<xarray.Dataset> Size: 4GB
Dimensions:    (time: 24, level: 37, latitude: 721, longitude: 1440)
Coordinates:
  * latitude   (latitude) float64 6kB 90.0 89.75 89.5 ... -89.5 -89.75 -90.0
  * level      (level) float64 296B 1.0 2.0 3.0 5.0 ... 925.0 950.0 975.0 1e+03
  * longitude  (longitude) float64 12kB 0.0 0.25 0.5 0.75 ... 359.2 359.5 359.8
  * time       (time) datetime64[ns] 192B 1941-06-01 ... 1941-06-01T23:00:00
Data variables:
    PV         (time, level, latitude, longitude) float32 4GB dask.array<chunksize=(1, 37, 721, 1440), meta=np.ndarray>
    utc_date   (time) int32 96B dask.array<chunksize=(1,), meta=np.ndarray>
Attributes:
    DATA_SOURCE:          ECMWF: https://cds.climate.copernicus.eu, Copernicu...
    NETCDF_CONVERSION:    CISL RDA: Conversion from ECMWF GRIB 1 data to netC...
    NETCDF_VERSION:       4.8.1
    CONVERSION_PLATFORM:  Linux r1i4n4 4.12.14-95.51-default #1 SMP Fri Apr 1...
    CONVERSION_DATE:      Wed May 10 06:33:49 MDT 2023
    Conventions:       

### Load Data from HDF5 File

This illustrates how loading directly from HDF5 files on S3 can be slow, even with Dask.

In [5]:
with ProgressBar():
    dsl = ds.load()

[########################################] | 100% Completed | 53.73 ss


### Initialize Icechunk Repo

In [6]:
prefix = "ryan/icechunk-tests-era5-999"
store = IcechunkStore.create(
    storage=StorageConfig.s3_from_env(
        bucket="icechunk-test",
        prefix=prefix
    ),
    mode="w"
)
store

<icechunk.IcechunkStore at 0x7f9eb84402c0>

In [7]:
store.branch, store.snapshot_id

('main', 'B8ZZN2YZS6NQKM17X68G')

### Store Data To Icechunk

We specify encoding to set both compression and chunk size.

In [8]:
encoding = {
    "PV": {
        "codecs": [zarr.codecs.BytesCodec(), zarr.codecs.ZstdCodec()],
        "chunks": (1, 1, 721, 1440)
    }
}

Note that Dask is not required to obtain good performance when reading and writing. Zarr and Icechunk use multithreading and asyncio internally.

In [9]:
%time dsl.to_zarr(store, zarr_format=3, consolidated=False, encoding=encoding)

CPU times: user 54 s, sys: 1.56 s, total: 55.5 s
Wall time: 18.9 s


<xarray.backends.zarr.ZarrStore at 0x7f9ea8781ec0>

In [43]:
# with ProgressBar():
#     (dsl
#      .chunk({"time": 1, "level": 10})
#      .to_zarr(store, zarr_format=3, consolidated=False, encoding=encoding)
#     )

[########################################] | 100% Completed | 18.02 ss


In [10]:
store.commit("wrote data")

'AS64P9SQ7NY1P22P8GS0'

### Read Data Back

In [11]:
store = IcechunkStore.open_existing(
    storage=StorageConfig.s3_from_env(
        bucket="icechunk-test",
        prefix=prefix
    ),
    mode="r"
)

In [12]:
%time dsic = xr.open_dataset(store, consolidated=False, engine="zarr")

CPU times: user 16.8 ms, sys: 2.45 ms, total: 19.2 ms
Wall time: 97.4 ms


In [13]:
print(dsic)

<xarray.Dataset> Size: 4GB
Dimensions:    (level: 37, latitude: 721, longitude: 1440, time: 24)
Coordinates:
  * level      (level) float64 296B 1.0 2.0 3.0 5.0 ... 925.0 950.0 975.0 1e+03
  * latitude   (latitude) float64 6kB 90.0 89.75 89.5 ... -89.5 -89.75 -90.0
  * longitude  (longitude) float64 12kB 0.0 0.25 0.5 0.75 ... 359.2 359.5 359.8
  * time       (time) datetime64[ns] 192B 1941-06-01 ... 1941-06-01T23:00:00
Data variables:
    PV         (time, level, latitude, longitude) float32 4GB ...
    utc_date   (time) int32 96B ...
Attributes:
    CONVERSION_DATE:      Wed May 10 06:33:49 MDT 2023
    CONVERSION_PLATFORM:  Linux r1i4n4 4.12.14-95.51-default #1 SMP Fri Apr 1...
    Conventions:          CF-1.6
    DATA_SOURCE:          ECMWF: https://cds.climate.copernicus.eu, Copernicu...
    NCO:                  netCDF Operators version 5.0.3 (Homepage = http://n...
    NETCDF_COMPRESSION:   NCO: Precision-preserving compression to netCDF4/HD...
    NETCDF_CONVERSION:    CISL RDA:

In [14]:
%time dsic.PV[0, 0, 0, 0].values

CPU times: user 16.8 ms, sys: 78 μs, total: 16.8 ms
Wall time: 102 ms


array(0.00710905, dtype=float32)

As with writing, Dask is not required for performant reading of the data.
In this example we can load the entire dataset (nearly 4GB) in 8s. 

In [15]:
%time _ = dsic.compute()

CPU times: user 11 s, sys: 3.67 s, total: 14.7 s
Wall time: 2.03 s


In [16]:
xr.testing.assert_identical(_, ds)

In [17]:
dsicc = dsic.chunk({"time": 1, "level": 10})

In [19]:
from dask.diagnostics import ProgressBar
with ProgressBar():
    _ = dsicc.compute()

[########################################] | 100% Completed | 2.13 sms


In [45]:
actual = _
actual

In [46]:
xr.testing.assert_identical(actual, dsl)