## Scattered notebook for exploring icechunk


In [1]:
import icechunk
import icechunk.storage
import icechunk.xarray
import numpy as np
import pandas as pd
import xarray as xr

In [2]:
# cluster = coiled.Cluster(
# n_workers=2,
# worker_vm_types=["t4g.2xlarge"],
# scheduler_vm_types=["m8g.2xlarge"],
# region="us-west-2",
# spot_policy="spot_with_fallback")


# cluster.adapt(minimum=2, maximum=20)

# client = cluster.get_client()

In [3]:
def gen_wget_strings(start_year: int, end_year: int) -> list:
    """
    Access the UHE-daily gridded data product that formed the basis for
    Tuholske et al (2021).
    """
    daterange = pd.date_range(f"{start_year}-01-01", f"{end_year}-12-31")
    return [
        f"https://data.chc.ucsb.edu/people/cascade/UHE-daily/wbgtmax/{date.strftime('%Y')}/wbgtmax.{date.strftime('%Y.%m.%d')}.tif"
        for date in daterange
    ]


def parse_ds(ds: xr.Dataset) -> xr.Dataset:
    ds = (
        ds.expand_dims(
            time=[np.datetime64("-".join(ds.encoding["source"].split(".")[-4:-1]))]
        )
        .squeeze(dim=["band"], drop=True)
        .drop("spatial_ref")
        .rename({"band_data": "WBGT", "x": "lon", "y": "lat"})
        .sortby("lat")
    )
    ds = ds.chunk({"time": 1, "lat": 2600, "lon": 7200})
    return ds

In [5]:
ic_storage = icechunk.storage.s3_storage(
    bucket="carbonplan-scratch",
    prefix="uhe_daily",
    region="us-west-2",
)
repo = icechunk.Repository.open_or_create(storage=ic_storage)

In [None]:
session = repo.writable_session("main")
store = session.store
# template.to_zarr(store, zarr_format=3, compute=False, mode="w")
# icechunk.xarray.to_icechunk(template, store=store,compute=False, mode='w')
session.commit("Add template")

In [None]:
from dask.distributed import Client

client = Client(n_workers=1, threads_per_worker=1)
client

In [None]:
import icechunk.session


def delayed_write_region(write_region_task: tuple) -> icechunk.Session:
    url, session = write_region_task

    ds = xr.open_dataset(url, engine="rasterio")
    ds = parse_ds(ds)

    store = session.store

    icechunk.xarray.to_icechunk(
        ds, store=store, region="auto", consoldated=False, zarr_format=3
    )
    # ds.to_zarr(
    #     store,
    #     zarr_format=3,
    #     region="auto",
    #     compute=True,
    #     consolidated=False
    # )
    return session


session = repo.writable_session("main")
# write_region_task = [(url, session) for url in uhe_urls]

# map_result = client.map(delayed_write_region, write_region_task)
# worker_changes = client.gather(map_result)

In [23]:
# for worker_session in worker_changes:
#     session.merge(worker_session)
# commit_res = session.commit("distributed commit")

In [None]:
# ds = xr.open_zarr(session.store)