## 4. Kerchunk input data and the kerchunk API

Within this series, we cannot explain how kerchunking works. For now, it is only important to understand that it leverages the zarr benefits of both small memory requirements for opening as well as consolidated metadata for virtual aggregation.

We now design the script such that it 
- opens *kerchunk* references instead of files
- enables access trough the kerchunk API

With the kerchunk API, we do not necessarily need a dask cluster anymore (but without a dask cluster, the dask API will not work).

**Differences to the first example**:

- we open data through the so called *lazy reference* mapper with
    ```python
    fsspec.get_mapper(
        lazy=True,
        )
    ```
    which we pass to xarray afterwards. This only works for kerchunked input data.
- we add a *dict* of fspec mappern to the kerchunk plguin by setting `kp.mapper_dict`
    Xpublish will recognize the

In [None]:
%%writefile xpublish_references.py

ssl_keyfile="/work/bm0021/k204210/cloudify/workshop/key.pem"
ssl_certfile="/work/bm0021/k204210/cloudify/workshop/cert.pem"

from cloudify.plugins.stacer import *
from cloudify.utils.daskhelper import *
from cloudify.plugins.kerchunk import *
import xarray as xr
import xpublish as xp
import asyncio
#import nest_asyncio
import sys
import os
import socket

def is_port_free(port, host="localhost"):
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        return s.connect_ex((host, port)) != 0  # Returns True if the port is free

def find_free_port(start=5000, end=5100, host="localhost"):
    for port in range(start, end + 1):
        if is_port_free(port, host):
            return port
    return None  # No free ports found

port = find_free_port(9000,9100)
if not port:
    raise ValueError("Could not find a free port for service")

SO=dict(
    remote_protocol="slk",
    remote_options=dict(
        slk_cache="/scratch/k/k202134/INTAKE_CACHE"
    ),
    lazy=True,
    cache_size=0
)
#nest_asyncio.apply()


if __name__ == "__main__":  # This avoids infinite subprocess creation
    #import dask
    #zarrcluster = asyncio.get_event_loop().run_until_complete(get_dask_cluster())
    #os.environ["ZARR_ADDRESS"]=zarrcluster.scheduler._address
    
    glob_inp=sys.argv[1:]

    dsdict={}
    mapper_dict={}
    for g in glob_inp:
        dsname=g.split('/')[-1]
        source="reference::/"+g
        print(source)
        fsmap = fsspec.get_mapper(
                source,
                **SO
                )
        ds=xr.open_dataset(
                fsmap,
                engine="zarr",
                chunks="auto",
                consolidated=False
                )
        mapper_dict[source]=fsmap
        ds=ds.drop_encoding()
        ds.encoding["source"]=source
        dsdict[dsname]=ds

    kp = KerchunkPass()
    kp.mapper_dict = mapper_dict    
    
    collection = xp.Rest(dsdict)
    collection.register_plugin(Stac())
    collection.register_plugin(kp)
    collection.serve(
        host="0.0.0.0",
        port=port,
        ssl_keyfile=ssl_keyfile,
        ssl_certfile=ssl_certfile
    )

We run this app with ERA5 data:

```
dsname="era5"
glob_inp="/work/bm1344/DKRZ/kerchunks_single/testera/E5_sf_an_1D.parquet"
```

by applying:

In [None]:
%%bash --bg
source activate /work/bm0021/conda-envs/cloudify
python xpublish_references.py /work/bm1344/DKRZ/kerchunks_single/testera/E5_sf_an_1D.parquet

If sth goes wrong, you can check for *cloudify* processes that you can *kill* by ID.

In [None]:
!ps -ef | grep k204210

In [None]:
!kill 198487

**Data access via the kerchunk API**

You can get the host url with the hostname of the levante node you work on and the port that you used for the app:

In [None]:
port=9010
hostname=!echo $HOSTNAME
hosturl="https://"+hostname[0]+":"+str(port)
print(hosturl)

We have to tell the python programs to do not verify ssl certificates for our purposes:

In [None]:
from aiohttp import ClientTimeout
storage_options=dict(
    verify_ssl=False,
    client_kwargs=dict(
        timeout=ClientTimeout(total=5)
    )
)

**Xarray**

Our era dataset is available via both the *zarr* API **and** the *kerchunk* API.
They are named similar:

In [None]:
dsname="era5"
zarr_url='/'.join([hosturl,"datasets",dsname,"zarr"])
kerchunk_url='/'.join([hosturl,"datasets",dsname,"kerchunk"])
print(kerchunk_url)

In [None]:
import xarray as xr
kerchunk_url="https://l40000.lvt.dkrz.de:9000/datasets/ngc4008_P1D_4_slk.parq/kerchunk"
#kerchunk_url="http://l40243.lvt.dkrz.de:9000/ngc4008_P1D_4.zarr"
ds=xr.open_zarr(
    kerchunk_url,
    consolidated=True,
    storage_options=storage_options
)

In [None]:
ds

In [None]:
subset=ds.isel(time=slice(0,2000))

In [None]:
subset.nbytes/1024**3

In [None]:
subset=subset[
    list(
        set(ds.data_vars)-set(
            [
                "ocean_fraction_depth_half",
                "ocean_fraction_surface",
                "ocean_fraction_depth_full",
                "dzghalf",
                "zghalf",
                "zg"
            ]
        )
    )
]

In [None]:
%%time
subset.load()

**Intake**

The default **method** for intake datasets is *kerchunk* i.e. the datasets are loaded through the kerchunk API per default.

In [None]:
intake_url='/'.join([hosturl,"intake.yaml"])
print(intake_url)

In [None]:
import intake
cat=intake.open_catalog(
    intake_url,
    storage_options=storage_options
)
list(cat)

In [None]:
cat[dsname](storage_options=storage_options).to_dask()

In [None]:
stac_url=zarr_url.replace('/zarr','/stac')

In [None]:
import pystac
import fsspec
import json
pystac.item.Item.from_dict(
    json.load(fsspec.open(stac_url,**storage_options).open())
)