# Kerchunk and Pangeo-Forge



## Overview

In this tutorial we are going to use Kerchunk to create reference files of a dataset. 
This allows us to read an entire dataset as if it were a single Zarr store instead of a collection of NetCDF files. 
Using Kerchunk, we don't have to create a copy of the data, instead we create a collection of reference files, so that the original data files can be read as if they were Zarr.


This notebook shares some similarities with the [Multi-File Datasets with Kerchunk](../case_studies/ARG_Weather.ipynb), as they both create references from NetCDF files. However, this notebook differs as it uses `Pangeo-Forge` as the runner to create the reference files.



## Prerequisites
| Concepts | Importance | Notes |
| --- | --- | --- |
| [Kerchunk Basics](../foundations/kerchunk_basics) | Required | Core |
| [Multiple Files and Kerchunk](../foundations/kerchunk_multi_file) | Required | Core |
| [Kerchunk and Dask](../foundations/kerchunk_dask) | Required | Core |
| [Multi-File Datasets with Kerchunk](../case_studies/ARG_Weather.ipynb) | Required | IO/Visualization |

- **Time to learn**: 45 minutes
---

## Motivation

### Why Kerchunk

For many traditional data processing pipelines, the start involves download a large amount of files to a local computer and then subsetting them for future analysis. Kerchunk gives us two large advantages: 
1. A massive reduction in used disk space.
2. Performance improvements with through parallel, chunk-specific access of the dataset. 

In addition to these speedups, once the consolidated Kerchunk reference file has been created, it can be easily shared for other users to access the dataset. 



## Pangeo-Forge & Kerchunk

Pangeo-Forge is a community project to build reproducible cloud-native ARCO (Analysis-Ready-Cloud-Optimized) datasets. The Python library (`pangeo-forge-recipes`) is the ETL pipeline to process these datasets or "recipes". While a majority of the recipes convert a legacy format such as NetCDF to Zarr stores, `pangeo-forge-recipes` can also use Kerchunk under the hood to create reference recipes. 

It is important to note that `Kerchunk` can be used independently of `pangeo-forge-recipes` and in this example, `pangeo-forge-recipes` is acting as the runner for `Kerchunk`. 

### Why Pangeo-Forge & Kerchunk

While you can use `Kerchunk` without `pangeo-forge`, we hope that `pangeo-forge` can be another tool to create sharable ARCO datasets using `Kerchunk`. 
A few potential benefits of creating `Kerchunk` based reference recipes with `pangeo-forge` may include:
- Recipe processing pipelines may be more standardized than case-by-case custom Kerchunk processing functions.
- Recipe processing can be scaled through `pangeo-forge-cloud` for large datasets.
- The infrastructure of `pangeo-forge` in GitHub may allow more community feedback on recipes.
- Additional features such as appending to datasets as new data is generated may be available in future releases of `pangeo-forge`.


## Getting to Know The Data

`gridMET` is a high-resolution daily meteorological dataset covering CONUS from 1979-2023. It is produced by the Climatology Lab at UC Merced. In this example, we are going to look create a virtual Zarr dataset of a derived variable, Burn Index. 

### Examine a Single File

In [None]:
import xarray as xr

ds = xr.open_dataset(
    "http://thredds.northwestknowledge.net:8080/thredds/dodsC/MET/bi/bi_2021.nc"
)

#### Plot the Dataset

In [None]:
ds.sel(day="2021-08-01").burning_index_g.plot()

## Create a File Pattern

To build our `pangeo-forge` pipeline, we need to create a `FilePattern` object, which is composed of all of our input urls. This dataset ranges from 1979 through 2023 and is composed of one year per file. 
 
To speed up our example, we will `prune` our recipe to select the first two entries in the `FilePattern`

In [None]:
from pangeo_forge_recipes.patterns import ConcatDim, FilePattern, MergeDim

years = list(range(1979, 2022 + 1))


time_dim = ConcatDim("time", keys=years)


def format_function(time):
    return f"http://www.northwestknowledge.net/metdata/data/bi_{time}.nc"


pattern = FilePattern(format_function, time_dim, file_type="netcdf4")


pattern = pattern.prune()

pattern

## Create a Location For Output
For this example, we are creating a temporary directory to write the data to. If we wanted to persist this Kerchunk reference, we could write to local disc or cloud storage. 

In [None]:
import os
from tempfile import TemporaryDirectory

td = TemporaryDirectory()
target_root = td.name
target_root = "esip"
store_name = "PF_kerchunk"
target_store = os.path.join(target_root, store_name)

## Build the Pangeo-Forge Beam Pipeline

Next, we will chain together a bunch of methods to create a Pangeo-Forge - Apache Beam pipeline. 
Processing steps are chained together with the pipe operator (`|`). Once the pipeline is built, it can be ran in the following cell. 

The steps are as follows:
1. Creates a starting collection of our input file patterns.
2. Passes those file_patterns to `OpenWithKerchunk`, which creates references of each file.
3. Combines the references files into a single reference file with `CombineReferences`.
4. Writes the combined reference file.

Note: You can add additional processing steps in this pipeline. 


In [None]:
import apache_beam as beam
from pangeo_forge_recipes.transforms import (
    CombineReferences,
    OpenWithKerchunk,
    WriteCombinedReference,
)

transforms = (
    # Create a beam PCollection from our input file pattern
    beam.Create(pattern.items())
    # Open with Kerchunk and create references for each file
    | OpenWithKerchunk(file_type=pattern.file_type)
    # Use Kerchunk's `MultiZarrToZarr` functionality to combine the reference files into a single
    # reference file. *Note*: Setting the correct contact_dims and identical_dims is important.
    | CombineReferences(
        concat_dims=["day"],
        identical_dims=["lat", "lon", "crs"],
    )
    # Write the combined Kerchunk reference to file.
    | WriteCombinedReference(target_root=target_root, store_name=store_name)
)

In [None]:
%%time

with beam.Pipeline() as p:
    p | transforms

In [None]:
import os

import fsspec

full_path = os.path.join(target_root, store_name, "reference.json")
print(os.path.getsize(full_path) / 1e6)

Our reference .json file is about 1MB, instead of 108GBs. That is quite the storage savings! 

## Examine the Result

In [None]:
mapper = fsspec.get_mapper(
    "reference://",
    fo=full_path,
    remote_protocol="http",
)
ds = xr.open_dataset(
    mapper, engine="zarr", decode_coords="all", backend_kwargs={"consolidated": False}
)

In [None]:
ds

In [None]:
ds.isel(day=220).burning_index_g.plot()

## Access Speed Benchmark - Kerchunk vs NetCDF

In the access test below, we had almost a 3x speedup in access time using the `Kerchunk` reference dataset vs the NetCDF file collection. This isn't a huge speed-up, but will vary a lot depending on chunking schema, access patterns etc. 
| Kerchunk      | Time (s)    |
| ------------- | ----------- |
| Kerchunk      | 10          |
| Cloud NetCDF  | 28          |


### Kerchunk

In [None]:
import fsspec
import xarray as xr

In [None]:
%%time

kerchunk_path = os.path.join(target_root, store_name, "reference.json")

mapper = fsspec.get_mapper(
    "reference://",
    fo=kerchunk_path,
    remote_protocol="http",
)
kerchunk_ds = xr.open_dataset(
    mapper, engine="zarr", decode_coords="all", backend_kwargs={"consolidated": False}
)
kerchunk_ds.sel(lat=slice(48, 47), lon=slice(-123, -122)).burning_index_g.max().values

In [None]:
kerchunk_ds

That took almost 10 seconds.

### NetCDF Cloud Access

In [None]:
# prepare urls


def url_gen(year):
    return (
        f"http://thredds.northwestknowledge.net:8080/thredds/dodsC/MET/bi/bi_{year}.nc"
    )


urls_list = [url_gen(year) for year in years]

In [None]:
%%time
netcdf_ds = xr.open_mfdataset(urls_list, engine="netcdf4")
netcdf_ds.sel(lat=slice(48, 47), lon=slice(-123, -122)).burning_index_g.mean().values

That took about about 28 seconds. 

In [None]:
netcdf_ds

## Storage Benchmark - Kerchunk vs NetCDF 
## 5200x Storage Savings

| Storage       | Mb (s)      |
| ------------- | ----------- |
| Kerchunk      | 10          |
| Cloud NetCDF  | 52122       |


In [None]:
# Kerchunk Reference File
import os

print(f"{round(os.path.getsize(kerchunk_path) / 1e6, 1)} Mb")

In [None]:
# NetCDF Files
print(f"{round(netcdf_ds.nbytes/1e6,1)} Mb")
print("or")
print(f"{round(netcdf_ds.nbytes/1e9,1)} Gb")