## Creating STAC Items for a legacgy files referenced by Virtual Icechunk Store

There is a virtual icechunk store that is publicly available at: s3://nasa-waterinsight/virtual-zarr-store/NLDAS-3-icechunk/

This notebook goes through the current thinking for how you would set up STAC items that point to the legacy file formats referenced by the virtual store. 

In [15]:
import icechunk
import zarr
import xstac

import numpy as np
import xarray as xr

Zarr can emit a lot of warnings about Numcodecs not being including in the Zarr version 3 specification yet -- let's suppress those.

In [2]:
import warnings

warnings.filterwarnings(
    "ignore",
    message="Numcodecs codecs are not in the Zarr version 3 specification*",
    category=UserWarning,
)

These are the PRs that need to land before you can open the virtual icechunk store with zarr directly:

- https://github.com/zarr-developers/zarr-python/pull/3369
- https://github.com/earth-mover/icechunk/pull/1161

Until then:

In [3]:
storage = icechunk.s3_storage(
    bucket="nasa-waterinsight",
    prefix="virtual-zarr-store/NLDAS-3-icechunk/",
    region="us-west-2",
    anonymous=True,
)

config = icechunk.RepositoryConfig.default()
config.set_virtual_chunk_container(
    icechunk.VirtualChunkContainer(
        "s3://nasa-waterinsight/NLDAS3/forcing/daily/",
        icechunk.s3_store(region="us-west-2")
    )
)
virtual_credentials = icechunk.containers_credentials(
    {
        "s3://nasa-waterinsight/NLDAS3/forcing/daily/": icechunk.s3_anonymous_credentials()
    }
)

repo = icechunk.Repository.open(
    storage=storage,
    config=config,
    authorize_virtual_chunk_access=virtual_credentials,
)

session = repo.readonly_session('main')
ds = xr.open_zarr(session.store, consolidated=False, zarr_format=3)

From this virtual icechunk store we can get the locations of all the chunks. This technically only accesses a single snapshot, so to get them all you might have to recurse back through time (https://github.com/earth-mover/icechunk/issues/1194). In this case though 

Note that this takes some time.

In [4]:
%%time

chunk_locations = session.all_virtual_chunk_locations()

CPU times: user 24.1 s, sys: 3.28 s, total: 27.4 s
Wall time: 48.9 s


In [5]:
len(chunk_locations)

14194310

That gives the locations of every chunk. So we need to deduplicate it to get a list of files

In [6]:
legacy_files = sorted(list(set(chunk_locations)))

In [7]:
legacy_files[:5]

['s3://nasa-waterinsight/NLDAS3/forcing/daily/200101/NLDAS_FOR0010_D.A20010101.030.beta.nc',
 's3://nasa-waterinsight/NLDAS3/forcing/daily/200101/NLDAS_FOR0010_D.A20010102.030.beta.nc',
 's3://nasa-waterinsight/NLDAS3/forcing/daily/200101/NLDAS_FOR0010_D.A20010103.030.beta.nc',
 's3://nasa-waterinsight/NLDAS3/forcing/daily/200101/NLDAS_FOR0010_D.A20010104.030.beta.nc',
 's3://nasa-waterinsight/NLDAS3/forcing/daily/200101/NLDAS_FOR0010_D.A20010105.030.beta.nc']

As a sanity check we can just confirm that we have the same number of files as we do timesteps.

In [8]:
assert len(legacy_files) == len(ds.time)

For instance here is the chunkgrid for `ds.PSurf`

In [9]:
ds.PSurf.data.numblocks

(8399, 13, 13)

You can multiply all those numbers together to get the total number of chunks

In [10]:
numchunks = 1 
for n in ds.PSurf.data.numblocks:
    numchunks *= n
numchunks

1419431

That number matches the number in the dask array representation.

In [11]:
ds.PSurf.data

Unnamed: 0,Array,Chunk
Bytes,4.65 TiB,3.43 MiB
Shape,"(8399, 6500, 11700)","(1, 500, 900)"
Dask graph,1419431 chunks in 2 graph layers,1419431 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 4.65 TiB 3.43 MiB Shape (8399, 6500, 11700) (1, 500, 900) Dask graph 1419431 chunks in 2 graph layers Data type float64 numpy.ndarray",11700  6500  8399,

Unnamed: 0,Array,Chunk
Bytes,4.65 TiB,3.43 MiB
Shape,"(8399, 6500, 11700)","(1, 500, 900)"
Dask graph,1419431 chunks in 2 graph layers,1419431 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray


But we can't really know what the flat list of `chunk_locations` means in the context  of that chunk grid. What does this mean?

In [12]:
chunk_locations[0]

's3://nasa-waterinsight/NLDAS3/forcing/daily/200101/NLDAS_FOR0010_D.A20010101.030.beta.nc'

Ok so you can't really get that information out of icechunk right now as far as I can tell. So what about the other piece. 

First let's take a slice of the array that represents just one point in time. As long as all the variables have the same shape it shouldn't matter which one you pick.

In [13]:
a = ds.PSurf.data[0, :, :]
a

Unnamed: 0,Array,Chunk
Bytes,580.22 MiB,3.43 MiB
Shape,"(6500, 11700)","(500, 900)"
Dask graph,169 chunks in 3 graph layers,169 chunks in 3 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 580.22 MiB 3.43 MiB Shape (6500, 11700) (500, 900) Dask graph 169 chunks in 3 graph layers Data type float64 numpy.ndarray",11700  6500,

Unnamed: 0,Array,Chunk
Bytes,580.22 MiB,3.43 MiB
Shape,"(6500, 11700)","(500, 900)"
Dask graph,169 chunks in 3 graph layers,169 chunks in 3 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray


In [16]:
lat_chunk_indices = np.insert(np.array(a.chunks[0]).cumsum() - 1, 0, 0)
lat_chunk_bounds = ds.lat.data[lat_chunk_indices]
lat_chunk_bounds

array([ 7.00500011, 11.99499989, 16.99499893, 21.99499893, 26.99499893,
       31.99499893, 36.99499893, 41.99499893, 46.99499893, 51.99499893,
       56.99499893, 61.99499893, 66.99499512, 71.99499512])

In [17]:
lon_chunk_indices = np.insert(np.array(a.chunks[1]).cumsum() - 1, 0, 0)
lon_chunk_bounds = ds.lon.data[lon_chunk_indices]
lon_chunk_bounds

array([-168.99499512, -160.00498962, -151.00498962, -142.00498962,
       -133.00500488, -124.00499725, -115.00499725, -106.00499725,
        -97.00499725,  -88.00499725,  -79.00499725,  -70.00499725,
        -61.00499725,  -52.00499725])

So those are the min and max of each chunk. We can construct the bounding box for each chunk by stepping along the lats and lons and taking two items at a time. We'll start with an array of zeros that has the same shape as the chunk grid except the last dimension is size 4 so that it can contain the bboxes.

In [18]:
ds.PSurf.dims

('time', 'lat', 'lon')

In [19]:
bboxes = np.zeros((a.numblocks[0], a.numblocks[1], 4))

for n in range(a.numblocks[0]):
    for m in range(a.numblocks[1]):
        bboxes[n, m] = [lon_chunk_bounds[m], lat_chunk_bounds[n], lon_chunk_bounds[m + 1], lat_chunk_bounds[n + 1]]

In [20]:
bboxes[0, 0]

array([-168.99499512,    7.00500011, -160.00498962,   11.99499989])

In practice we probably don't need the bounding boxes for each chunk. We just need the bounding boxes for the legacy files which we can assume contain contiguous chunks.

In [21]:
ds.rio.bounds()

(-168.9999951170962, 7.000000114825381, -51.99999725350927, 71.99999511680303)

## Create STAC items

In order to construct a STAC Item for each of these legacy files we need to have the have the temporal and spatial bounds for each file. Unlike the STAC Collection we want the metadata in these to be pretty minimal. 

Just as a quick note we are using `pystac` here for construction and using `rustac` for reading/writing to stac-geoparquet.

In [22]:
import json
import datetime as dt

import pystac
import rustac

First we will read in the collection that we had created in the previous notebook:

In [23]:
collection = pystac.Collection.from_file("./collection.json")

We'll use some hard-coded values that are pulled straight from the `xr.Dataset`. For instance we know that the bounding box for each of these netcdf file is exactly the same they just each represent a different timestep.

In [24]:
from shapely.geometry import box
from shapely import to_geojson

bbox = ds.rio.bounds()
bbox = tuple(round(b, 4) for b in bbox)  # it might be wrong to round this, but it looked sooo close to round.
bbox_polygon = box(*bbox)

begin_time = ds.time.begin_time
end_time = ds.time.end_time

The item-specific values we will deduce from the filename for now, but ideally we would get this information in a more systematic way from the chunk information. Let's take a look at those legacy filepaths again:

In [25]:
legacy_files[0]

's3://nasa-waterinsight/NLDAS3/forcing/daily/200101/NLDAS_FOR0010_D.A20010101.030.beta.nc'

So - and this is kind of hacky - we can see that the whole date is actually encoded in the filepath. We just need to grab it out of there and parse it. Let's create a function that creates an item that basically just has the spatial and temporal extents set and points to one particular legacy file on s3.

In [26]:
def create_item(legacy_filepath):
    filename = legacy_filepath.split("/")[-1]
    date = filename.split(".030.beta.")[0][-8:]
    
    datetime = dt.datetime.strptime(date, "%Y%m%d")
    start_datetime = dt.datetime.strptime(f"{date} {begin_time}", "%Y%m%d %H%M%S")
    end_datetime = dt.datetime.strptime(f"{date} {end_time}", "%Y%m%d %H%M%S")
    
    return pystac.Item(
        filename,
        geometry=json.loads(to_geojson(bbox_polygon)),
        properties={
            "storage:schemes": collection.extra_fields["storage:schemes"]
        },
        bbox=bbox,
        datetime=datetime,
        start_datetime=start_datetime,
        end_datetime=end_datetime,
        collection=collection,
        stac_extensions=[
            "https://stac-extensions.github.io/storage/v2.0.0/schema.json",
        ],
        assets={
            "data": pystac.Asset(
                href=legacy_filepath,
                media_type="application/x-netcdf",
                roles=["data"],
                extra_fields={
                    "storage:refs": [
                        "aws-s3-nasa-waterinsight"
                    ],
                }
            )
        }
    )

We'll run it on one file to make sure it works and see how long it takes. 

In [27]:
%%time

item = create_item(legacy_files[0])
item

CPU times: user 1.03 ms, sys: 1.01 ms, total: 2.04 ms
Wall time: 14 ms


Validate the item:

In [28]:
item.validate()

['https://schemas.stacspec.org/v1.1.0/item-spec/json-schema/item.json',
 'https://stac-extensions.github.io/storage/v2.0.0/schema.json']

Now instead of just making one of these we'll want to make a whole bunch (8399 to be exact) so instead of storing each as its own json blob I'll save them as an item collection in stac-geoparquet.

In [29]:
%%time

items = [
    create_item(legacy_file).to_dict(include_self_link=False, transform_hrefs=False)
    for legacy_file in legacy_files
]

CPU times: user 631 ms, sys: 1.2 ms, total: 632 ms
Wall time: 632 ms


In [30]:
await rustac.write("items.parquet", items)

{'e_tag': '2867927-63e5ff3d79810-53b1f', 'version': None}

Read it back in just to prove that we can:

In [31]:
item_collection = await rustac.read("items.parquet")
item_collection["features"][0]

{'type': 'Feature',
 'stac_version': '1.1.0',
 'stac_extensions': ['https://stac-extensions.github.io/storage/v2.0.0/schema.json'],
 'id': 'NLDAS_FOR0010_D.A20010101.030.beta.nc',
 'geometry': {'type': 'Polygon',
  'coordinates': [[[-52.0, 7.0],
    [-52.0, 72.0],
    [-169.0, 72.0],
    [-169.0, 7.0],
    [-52.0, 7.0]]]},
 'bbox': (-169.0, 7.0, -52.0, 72.0),
 'properties': {'datetime': '2001-01-01T00:00:00Z',
  'start_datetime': '2001-01-01T00:00:00Z',
  'end_datetime': '2001-01-01T23:59:59Z',
  'storage:schemes': {'aws-s3-nasa-waterinsight': {'type': 'aws-s3',
    'platform': 'https://{bucket}.s3.{region}.amazonaws.com',
    'bucket': 'nasa-waterinsight',
    'region': 'us-west-2',
    'anonymous': True}}},
 'links': [{'href': '/home/jsignell/NASA/dse-virtual-zarr-workshop/docs/examples/collection.json',
   'rel': 'collection',
   'type': 'application/json',
   'title': 'NLDAS Forcing Data L4 Daily 0.01 x 0.01 degree V3.0 - *BETA*'}],
 'assets': {'data': {'href': 's3://nasa-waterinsi