# Proof of concept to build Zarr aggregations on the fly

I need better tools to work with the layered (nested?) grib2 NODD datasets (HRRR, GFS, GEFS).

Concepts
1. Map grib layers to zarr/xarray hierarchy
2. Separate hierarchy/variable metadata from data variable kerchunk references
3. Build the metadata structure once based on a single GRIB2 file per forecast product
4. Store the kerchunk references in a database
5. Build a method to allow flexibly reconstructing particular subsets of the hierarchy on the fly

This is different than the existing kerchunk tools (parquet storage - [refs_to_dataframe](https://github.com/fsspec/kerchunk/blob/3c4e9fc960e159875e8f258ccd20fdbc565513df/kerchunk/df.py#L101C5-L101C22)) because it works with the layered grib2 datasets and because it makes operational usage much easier. Ingestion of a new Grib2 file is trivial - just insert the kerchunks into your favorite database. You don't need to reload or aggregate anything as new files arrive. At read time the world is a little more complicated, but not much. You read the metadata for the hierachy and combine it withe the slice of kerchunk variables you are interested in (layers, varnames, time ranges).

The proof of concept add_chunks_to_zarr method is particularly a hack - it needs more work to allow all the FMRC slices to be constructed on the fly. This may require making assumptions about the database as infering the intent from the chunks is a bit grotesque and adding enough hints to the arguments begs for nasty error conditions if the chunks don't match.

Much to improve, but I hope this is enough generate some discussion. Feed back welcome!



MIT License
Copyright (c) 2023 Camus Energy

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.


In [1]:
%load_ext autoreload
%autoreload 2

In [20]:
import logging
import io
import xarray as xr
import numpy as np
import pandas as pd
import fsspec
import zarr
import datatree
import importlib
import pprint

from ingestion.noaa_nwp import data_extractor

importlib.reload(logging)
logging.basicConfig(
    format="%(asctime)s.%(msecs)03dZ %(processName)s %(threadName)s %(levelname)s:%(name)s:%(message)s",
    datefmt="%Y-%m-%dT%H:%M:%S",
    level=logging.INFO,
)


In [4]:
hrrr_subhf = [
    "gs://high-resolution-rapid-refresh/hrrr.20230928/conus/hrrr.t00z.wrfsubhf00.grib2",
    "gs://high-resolution-rapid-refresh/hrrr.20230928/conus/hrrr.t00z.wrfsubhf01.grib2",
    "gs://high-resolution-rapid-refresh/hrrr.20230928/conus/hrrr.t00z.wrfsubhf02.grib2",
    "gs://high-resolution-rapid-refresh/hrrr.20230928/conus/hrrr.t00z.wrfsubhf03.grib2",
]

hrrr_sfcf = [
    "gs://high-resolution-rapid-refresh/hrrr.20230928/conus/hrrr.t00z.wrfsfcf00.grib2",
    "gs://high-resolution-rapid-refresh/hrrr.20230928/conus/hrrr.t00z.wrfsfcf01.grib2",
    "gs://high-resolution-rapid-refresh/hrrr.20230928/conus/hrrr.t00z.wrfsfcf02.grib2",
    "gs://high-resolution-rapid-refresh/hrrr.20230928/conus/hrrr.t00z.wrfsfcf03.grib2",
]


gefs = [
    "gs://gfs-ensemble-forecast-system/gefs.20230928/00/atmos/pgrb2sp25/geavg.t00z.pgrb2s.0p25.f000",
    "gs://gfs-ensemble-forecast-system/gefs.20230928/00/atmos/pgrb2sp25/geavg.t00z.pgrb2s.0p25.f003",
    "gs://gfs-ensemble-forecast-system/gefs.20230928/00/atmos/pgrb2sp25/geavg.t00z.pgrb2s.0p25.f006",
    "gs://gfs-ensemble-forecast-system/gefs.20230928/00/atmos/pgrb2sp25/geavg.t00z.pgrb2s.0p25.f009",

]


In [5]:
# Use when only interested in getting the chunk dataframe for several grib files
def scan_blobs(blobs) -> list[tuple[dict, dict]]:
  return [partial_chunk for blob in blobs for partial_chunk in data_extractor.scan_grib_to_groups(blob) ]


## HRRR SFCF
New grib files can be scanned and the chunks extracted any time - you only create the zarr metadata shell once for a NODD grib product and then just extract new chunk references to add.

In [6]:
# This method separates the kerchunk refernce data from the zarr datastructure
scanned_hrrr_sfcf = scan_blobs(hrrr_sfcf)

In [9]:
# this method can be run on each grib file as it arrives
hrrr_sfcf_chunks_df = data_extractor.extract_chunks(
    scanned_hrrr_sfcf,
    layers=['stepType', 'typeOfLevel'],
    forecast_valid_time='valid_time',
    forecast_run_time='time',
    model_horizon="step",
    )
hrrr_sfcf_chunks_df.head(10)

Found 114 unknown variables


Unnamed: 0,name,forecast_valid_time,forecast_run_time,model_horizon,zchunk,uri,offset,length,stepType,typeOfLevel,layer
0,refc,2023-09-28,2023-09-28,0 days,0.0,gs://high-resolution-rapid-refresh/hrrr.202309...,0,260388,instant,atmosphere,0.0
1,veril,2023-09-28,2023-09-28,0 days,0.0,gs://high-resolution-rapid-refresh/hrrr.202309...,704886,213150,instant,atmosphere,0.0
2,vis,2023-09-28,2023-09-28,0 days,0.0,gs://high-resolution-rapid-refresh/hrrr.202309...,918036,1330561,instant,surface,0.0
3,refd,2023-09-28,2023-09-28,0 days,0.0,gs://high-resolution-rapid-refresh/hrrr.202309...,2248597,158496,instant,heightAboveGround,1000.0
4,refd,2023-09-28,2023-09-28,0 days,0.0,gs://high-resolution-rapid-refresh/hrrr.202309...,2407093,130282,instant,heightAboveGround,4000.0
5,refd,2023-09-28,2023-09-28,0 days,0.0,gs://high-resolution-rapid-refresh/hrrr.202309...,2537375,135862,instant,isothermal,263.0
6,gust,2023-09-28,2023-09-28,0 days,0.0,gs://high-resolution-rapid-refresh/hrrr.202309...,2673237,1115618,instant,surface,0.0
7,u,2023-09-28,2023-09-28,0 days,0.0,gs://high-resolution-rapid-refresh/hrrr.202309...,3788855,775080,instant,isobaricInhPa,250.0
8,v,2023-09-28,2023-09-28,0 days,0.0,gs://high-resolution-rapid-refresh/hrrr.202309...,4563935,773215,instant,isobaricInhPa,250.0
9,u,2023-09-28,2023-09-28,0 days,0.0,gs://high-resolution-rapid-refresh/hrrr.202309...,5337150,756348,instant,isobaricInhPa,300.0


In [10]:
# This method need only be run once - just pick any GRIB file for the product to
# build the metadata structure and persist a copy of the coordinates for easy reference
metadata_path = "gs://dev.camus-infra.camus.store/davetest/hrrr_sfcf"
with io.BytesIO() as bio:
  zarr_store = data_extractor.treeify_groups(
      scanned_hrrr_sfcf, # Better to give just one file but this is fine too...
      layers=['stepType', 'typeOfLevel'],
      metadata_path=metadata_path,
      coords_file=bio,
      forecast_valid_time='valid_time',
      forecast_run_time='time',
      model_horizon="step",
      select_valid_time=np.datetime64("2023-09-28T00:00:00") # Just create var metadata for a single timestep
    )

  data_extractor.write_coords(metadata_path, bio)

data_extractor.write_store(metadata_path, zarr_store)

Group <zarr.hierarchy.Group '/instant/heightAboveGround'> already contains an array refd
Group <zarr.hierarchy.Group '/instant/isobaricInhPa'> already contains an array u
Group <zarr.hierarchy.Group '/instant/isobaricInhPa'> already contains an array v
Group <zarr.hierarchy.Group '/instant/isobaricInhPa'> already contains an array u
Group <zarr.hierarchy.Group '/instant/isobaricInhPa'> already contains an array v
Group <zarr.hierarchy.Group '/instant/isobaricInhPa'> already contains an array gh
Group <zarr.hierarchy.Group '/instant/isobaricInhPa'> already contains an array t
Group <zarr.hierarchy.Group '/instant/isobaricInhPa'> already contains an array dpt
Group <zarr.hierarchy.Group '/instant/isobaricInhPa'> already contains an array u
Group <zarr.hierarchy.Group '/instant/isobaricInhPa'> already contains an array v
Group <zarr.hierarchy.Group '/instant/isobaricInhPa'> already contains an array gh
Group <zarr.hierarchy.Group '/instant/isobaricInhPa'> already contains an array t
Group

### Play with the metadata zarr store...
You can already opne the zarr hierarchy as a zarr tree or datatree.
You can read the coordinate variable data, but not the data variables

In [13]:
zarr_spec = data_extractor.read_store(metadata_path)
kspec = {"refs": zarr_spec, "version": 1}

fs = fsspec.filesystem("reference", fo=kspec)
xd = datatree.open_datatree(fs.get_mapper(""), engine="zarr")
xd

1. Consolidating metadata in this existing store with zarr.consolidate_metadata().
2. Explicitly setting consolidated=False, to avoid trying to read consolidate metadata, or
3. Explicitly setting consolidated=True, to raise an error in this case instead of falling back to try reading non-consolidated metadata.
  ds = open_dataset(store, engine="zarr", **kwargs)
1. Consolidating metadata in this existing store with zarr.consolidate_metadata().
2. Explicitly setting consolidated=False, to avoid trying to read consolidate metadata, or
3. Explicitly setting consolidated=True, to raise an error in this case instead of falling back to try reading non-consolidated metadata.
  subgroup_ds = open_dataset(store, engine="zarr", group=path, **kwargs)


### Reattach the data varaibles for a subset

Now we can recombine the the metadata structure withe particular slices of the kerchunk dataframe to build a valid datatree

In [17]:
# You can safely reindex to a longer range than you have chunks for - we need to fix the missing value
dtindex = pd.date_range("2023-09-28T00:00:00", "2023-09-28T05:00:00", freq="H", name="forecast_valid_time")
layers = ['stepType', 'typeOfLevel']

In [15]:
# Pick two variables to try it with...
short_chunk = hrrr_sfcf_chunks_df.loc[(hrrr_sfcf_chunks_df.name == "dswrf") | (hrrr_sfcf_chunks_df.name ==  "vbdsf"), :]
short_chunk

Unnamed: 0,name,forecast_valid_time,forecast_run_time,model_horizon,zchunk,uri,offset,length,stepType,typeOfLevel,layer
98,dswrf,2023-09-28 00:00:00,2023-09-28,0 days 00:00:00,0.0,gs://high-resolution-rapid-refresh/hrrr.202309...,74987340,1079420,instant,surface,0.0
103,vbdsf,2023-09-28 00:00:00,2023-09-28,0 days 00:00:00,0.0,gs://high-resolution-rapid-refresh/hrrr.202309...,80633071,1297401,instant,surface,0.0
240,dswrf,2023-09-28 01:00:00,2023-09-28,0 days 01:00:00,0.0,gs://high-resolution-rapid-refresh/hrrr.202309...,85207538,651160,instant,surface,0.0
245,vbdsf,2023-09-28 01:00:00,2023-09-28,0 days 01:00:00,0.0,gs://high-resolution-rapid-refresh/hrrr.202309...,89950857,391826,instant,surface,0.0
384,dswrf,2023-09-28 02:00:00,2023-09-28,0 days 02:00:00,0.0,gs://high-resolution-rapid-refresh/hrrr.202309...,86716213,151315,instant,surface,0.0
389,vbdsf,2023-09-28 02:00:00,2023-09-28,0 days 02:00:00,0.0,gs://high-resolution-rapid-refresh/hrrr.202309...,90547904,74509,instant,surface,0.0
528,dswrf,2023-09-28 03:00:00,2023-09-28,0 days 03:00:00,0.0,gs://high-resolution-rapid-refresh/hrrr.202309...,87778932,188,instant,surface,0.0
533,vbdsf,2023-09-28 03:00:00,2023-09-28,0 days 03:00:00,0.0,gs://high-resolution-rapid-refresh/hrrr.202309...,91427324,188,instant,surface,0.0


In [21]:
zinflated = data_extractor.add_chunks_to_zarr(
    short_chunk,
    zarr_spec,
    layers,
    'forecast_run_time',
    [dtindex]
)
pprint.pprint(zinflated)

{'.zgroup': b'{\n    "zarr_format": 2\n}',
 'instant/.zattrs': b'{\n    "stepType": "instant"\n}',
 'instant/.zgroup': b'{\n    "zarr_format": 2\n}',
 'instant/surface/.zattrs': b'{\n    "GRIB_centre": "kwbc",\n    "GRIB_centr'
                            b'eDescription": "US National Weather Service - NC'
                            b'EP ",\n    "GRIB_edition": 2,\n    "GRIB_subCe'
                            b'ntre": 0,\n    "coordinates": "surface latitude l'
                            b'ongitude step time valid_time",\n    "institution'
                            b'": "US National Weather Service - NCEP ",\n    "t'
                            b'ypeOfLevel": "surface"\n}',
 'instant/surface/.zgroup': b'{\n    "zarr_format": 2\n}',
 'instant/surface/dswrf/.zarray': '{"chunks":[1,1059,1799],"compressor":null,"dtype":"<f8","fill_value":null,"filters":[{"dtype":"float64","id":"grib","var":"dswrf"}],"order":"C","shape":[6,1059,1799],"zarr_format":2}',
 'instant/surface/dswrf/.zattrs': 

In [26]:
kspec = {"refs": zinflated, "version": 1}
#kspec = {"refs": zarr_spec, "version": 1}

fs = fsspec.filesystem("reference", fo=kspec)
xd = datatree.open_datatree(fs.get_mapper(""), engine="zarr")
xd

1. Consolidating metadata in this existing store with zarr.consolidate_metadata().
2. Explicitly setting consolidated=False, to avoid trying to read consolidate metadata, or
3. Explicitly setting consolidated=True, to raise an error in this case instead of falling back to try reading non-consolidated metadata.
  ds = open_dataset(store, engine="zarr", **kwargs)
1. Consolidating metadata in this existing store with zarr.consolidate_metadata().
2. Explicitly setting consolidated=False, to avoid trying to read consolidate metadata, or
3. Explicitly setting consolidated=True, to raise an error in this case instead of falling back to try reading non-consolidated metadata.
  subgroup_ds = open_dataset(store, engine="zarr", group=path, **kwargs)


In [25]:
xd.instant.surface.vbdsf.values[:,900,100]


array([497.2, 270. , 140. ,   0. ,   0. ,   0. ])

## HRRR SUBHF

In [28]:
# This method separates the kerchunk refernce data from the zarr datastructure
scanned_hrrr_subhf = scan_blobs(hrrr_subhf)
# eccodes errors are expected for these grib files

ECCODES ERROR   :  unable to convert endStep in stepUnits
Ignoring coordinate 'step' for varname 'dswrf', raises: eccodes.WrongStepUnitError(Wrong units for step (step must be integer))
ECCODES ERROR   :  unable to convert endStep in stepUnits
Ignoring coordinate 'step' for varname 'vbdsf', raises: eccodes.WrongStepUnitError(Wrong units for step (step must be integer))
ECCODES ERROR   :  unable to convert endStep in stepUnits
Ignoring coordinate 'step' for varname 'tp', raises: eccodes.WrongStepUnitError(Wrong units for step (step must be integer))
ECCODES ERROR   :  unable to convert endStep in stepUnits
Ignoring coordinate 'step' for varname 'sdwe', raises: eccodes.WrongStepUnitError(Wrong units for step (step must be integer))
ECCODES ERROR   :  unable to convert endStep in stepUnits
Ignoring coordinate 'step' for varname 'unknown', raises: eccodes.WrongStepUnitError(Wrong units for step (step must be integer))
ECCODES ERROR   :  unable to convert endStep in stepUnits
Ignoring coord

In [29]:
# this method can be run on each grib file as it arrives
hrrr_subhf_chunks_df = data_extractor.extract_chunks(
    scanned_hrrr_subhf,
    layers=['stepType', 'typeOfLevel'],
    forecast_valid_time='valid_time',
    forecast_run_time='time',
    model_horizon="step",
    )
hrrr_subhf_chunks_df.head(10)

Found 65 unknown variables


Unnamed: 0,name,forecast_valid_time,forecast_run_time,model_horizon,zchunk,uri,offset,length,stepType,typeOfLevel,layer
0,refc,2023-09-28,2023-09-28,0 days,0.0,gs://high-resolution-rapid-refresh/hrrr.202309...,0,260388,instant,atmosphere,0.0
1,veril,2023-09-28,2023-09-28,0 days,0.0,gs://high-resolution-rapid-refresh/hrrr.202309...,704886,213150,instant,atmosphere,0.0
2,vis,2023-09-28,2023-09-28,0 days,0.0,gs://high-resolution-rapid-refresh/hrrr.202309...,918036,1330561,instant,surface,0.0
3,refd,2023-09-28,2023-09-28,0 days,0.0,gs://high-resolution-rapid-refresh/hrrr.202309...,2248597,158496,instant,heightAboveGround,1000.0
4,refd,2023-09-28,2023-09-28,0 days,0.0,gs://high-resolution-rapid-refresh/hrrr.202309...,2407093,130282,instant,heightAboveGround,4000.0
5,gust,2023-09-28,2023-09-28,0 days,0.0,gs://high-resolution-rapid-refresh/hrrr.202309...,2537375,1115618,instant,surface,0.0
6,uphl,2023-09-28,2023-09-28,0 days,0.0,gs://high-resolution-rapid-refresh/hrrr.202309...,3652993,11176,instant,heightAboveGroundLayer,
7,u,2023-09-28,2023-09-28,0 days,0.0,gs://high-resolution-rapid-refresh/hrrr.202309...,3664169,1085584,instant,heightAboveGround,80.0
8,v,2023-09-28,2023-09-28,0 days,0.0,gs://high-resolution-rapid-refresh/hrrr.202309...,4749753,1045036,instant,heightAboveGround,80.0
9,sp,2023-09-28,2023-09-28,0 days,0.0,gs://high-resolution-rapid-refresh/hrrr.202309...,5794789,1506529,instant,surface,0.0


In [30]:
# This method need only be run once - just pick any GRIB file for the product to
# build the metadata structure and persist a copy of the coordinates for easy reference
metadata_path = "gs://dev.camus-infra.camus.store/davetest/hrrr_subhf"
with io.BytesIO() as bio:
  zarr_store = data_extractor.treeify_groups(
      scanned_hrrr_subhf, # Better to give just one file but this is fine too...
      layers=['stepType', 'typeOfLevel'],
      metadata_path=metadata_path,
      coords_file=bio,
      forecast_valid_time='valid_time',
      forecast_run_time='time',
      model_horizon="step",
      select_valid_time=np.datetime64("2023-09-28T00:00:00") # Just create var metadata for a single timestep
    )

  data_extractor.write_coords(metadata_path, bio)

data_extractor.write_store(metadata_path, zarr_store)

Group <zarr.hierarchy.Group '/instant/heightAboveGround'> already contains an array refd


### Play with the metadata zarr store...
You can already opne the zarr hierarchy as a zarr tree or datatree.
You can read the coordinate variable data, but not the data variables

In [31]:
zarr_spec = data_extractor.read_store(metadata_path)
kspec = {"refs": zarr_spec, "version": 1}

fs = fsspec.filesystem("reference", fo=kspec)
xd = datatree.open_datatree(fs.get_mapper(""), engine="zarr")
xd

1. Consolidating metadata in this existing store with zarr.consolidate_metadata().
2. Explicitly setting consolidated=False, to avoid trying to read consolidate metadata, or
3. Explicitly setting consolidated=True, to raise an error in this case instead of falling back to try reading non-consolidated metadata.
  ds = open_dataset(store, engine="zarr", **kwargs)
1. Consolidating metadata in this existing store with zarr.consolidate_metadata().
2. Explicitly setting consolidated=False, to avoid trying to read consolidate metadata, or
3. Explicitly setting consolidated=True, to raise an error in this case instead of falling back to try reading non-consolidated metadata.
  subgroup_ds = open_dataset(store, engine="zarr", group=path, **kwargs)


### Reattach the data varaibles for a subset

Now we can recombine the the metadata structure withe particular slices of the kerchunk dataframe to build a valid datatree

In [32]:
# You can safely reindex to a longer range than you have chunks for - we need to fix the missing value
dtindex = pd.date_range("2023-09-28T00:00:00", "2023-09-28T05:00:00", freq="H", name="forecast_valid_time")
layers = ['stepType', 'typeOfLevel']

In [34]:
# Pick two variables to try it with...
short_chunk = hrrr_subhf_chunks_df.loc[(hrrr_subhf_chunks_df.name == "dswrf") | (hrrr_subhf_chunks_df.name ==  "vbdsf"), :]
short_chunk

Unnamed: 0,name,forecast_valid_time,forecast_run_time,model_horizon,zchunk,uri,offset,length,stepType,typeOfLevel,layer
19,dswrf,2023-09-28 00:00:00,2023-09-28,0 days 00:00:00,0.0,gs://high-resolution-rapid-refresh/hrrr.202309...,23048850,705786,avg,surface,0.0
20,vbdsf,2023-09-28 00:00:00,2023-09-28,0 days 00:00:00,0.0,gs://high-resolution-rapid-refresh/hrrr.202309...,23754636,909920,avg,surface,0.0
33,dswrf,2023-09-28 00:00:00,2023-09-28,0 days 00:00:00,0.0,gs://high-resolution-rapid-refresh/hrrr.202309...,31491331,1079420,instant,surface,0.0
37,vbdsf,2023-09-28 00:00:00,2023-09-28,0 days 00:00:00,0.0,gs://high-resolution-rapid-refresh/hrrr.202309...,37130445,1297401,instant,surface,0.0
63,dswrf,2023-09-28 00:15:00,2023-09-28,0 days 00:15:00,0.0,gs://high-resolution-rapid-refresh/hrrr.202309...,23105979,689955,avg,surface,0.0
64,vbdsf,2023-09-28 00:15:00,2023-09-28,0 days 00:15:00,0.0,gs://high-resolution-rapid-refresh/hrrr.202309...,23795934,929883,avg,surface,0.0
77,dswrf,2023-09-28 00:15:00,2023-09-28,0 days 15:00:00,0.0,gs://high-resolution-rapid-refresh/hrrr.202309...,31915535,1001750,instant,surface,0.0
81,vbdsf,2023-09-28 00:15:00,2023-09-28,0 days 15:00:00,0.0,gs://high-resolution-rapid-refresh/hrrr.202309...,37393178,913551,instant,surface,0.0
107,dswrf,2023-09-28 00:30:00,2023-09-28,1 days 06:00:00,0.0,gs://high-resolution-rapid-refresh/hrrr.202309...,69476252,575750,avg,surface,0.0
108,vbdsf,2023-09-28 00:30:00,2023-09-28,1 days 06:00:00,0.0,gs://high-resolution-rapid-refresh/hrrr.202309...,70052002,810936,avg,surface,0.0


In [35]:
zinflated = data_extractor.add_chunks_to_zarr(
    short_chunk,
    zarr_spec,
    layers,
    'forecast_run_time',
    [dtindex]
)
pprint.pprint(zinflated)

{'.zgroup': b'{\n    "zarr_format": 2\n}',
 'avg/.zattrs': b'{\n    "stepType": "avg"\n}',
 'avg/.zgroup': b'{\n    "zarr_format": 2\n}',
 'avg/surface/.zattrs': b'{\n    "GRIB_centre": "kwbc",\n    "GRIB_centreDes'
                        b'cription": "US National Weather Service - NCEP ",\n  '
                        b'  "GRIB_edition": 2,\n    "GRIB_subCentre": 0,\n  '
                        b'  "coordinates": "surface latitude longitude step ti'
                        b'me valid_time",\n    "institution": "US National Weat'
                        b'her Service - NCEP ",\n    "typeOfLevel": "surfac'
                        b'e"\n}',
 'avg/surface/.zgroup': b'{\n    "zarr_format": 2\n}',
 'avg/surface/dswrf/.zarray': '{"chunks":[1,1059,1799],"compressor":null,"dtype":"<f8","fill_value":null,"filters":[{"dtype":"float64","id":"grib","var":"dswrf"}],"order":"C","shape":[6,1059,1799],"zarr_format":2}',
 'avg/surface/dswrf/.zattrs': '{"GRIB_DxInMetres":3000.0,"GRIB_DyInMetres":3000.0,

In [36]:
kspec = {"refs": zinflated, "version": 1}
#kspec = {"refs": zarr_spec, "version": 1}

fs = fsspec.filesystem("reference", fo=kspec)
xd = datatree.open_datatree(fs.get_mapper(""), engine="zarr")
xd

1. Consolidating metadata in this existing store with zarr.consolidate_metadata().
2. Explicitly setting consolidated=False, to avoid trying to read consolidate metadata, or
3. Explicitly setting consolidated=True, to raise an error in this case instead of falling back to try reading non-consolidated metadata.
  ds = open_dataset(store, engine="zarr", **kwargs)
1. Consolidating metadata in this existing store with zarr.consolidate_metadata().
2. Explicitly setting consolidated=False, to avoid trying to read consolidate metadata, or
3. Explicitly setting consolidated=True, to raise an error in this case instead of falling back to try reading non-consolidated metadata.
  subgroup_ds = open_dataset(store, engine="zarr", group=path, **kwargs)


In [38]:
xd.avg.surface.vbdsf.values[:,900,100]


array([497., 274., 190.,   0.,   0.,   0.])

## GEFS
Not working correctly yet - but close!

In [40]:
# This method separates the kerchunk refernce data from the zarr datastructure
scanned_gefs = scan_blobs(gefs)

In [42]:
# this method can be run on each grib file as it arrives
gefs_chunks_df = data_extractor.extract_chunks(
    scanned_gefs,
    layers=['stepType', 'typeOfLevel'],
    forecast_valid_time='valid_time',
    forecast_run_time='time',
    model_horizon="step",
    )
gefs_chunks_df.head(10)

2023-11-13T02:14:38.107Z MainProcess MainThread INFO:ingestion.noaa_nwp.data_extractor:Found 140 chunks for 140 known variables


Unnamed: 0,name,forecast_valid_time,forecast_run_time,model_horizon,zchunk,uri,offset,length,stepType,typeOfLevel,layer
0,vis,2023-09-28,2023-09-28,0 days,0.0,gs://gfs-ensemble-forecast-system/gefs.2023092...,0,490988,instant,surface,0.0
1,gust,2023-09-28,2023-09-28,0 days,0.0,gs://gfs-ensemble-forecast-system/gefs.2023092...,490988,505628,instant,surface,0.0
2,mslet,2023-09-28,2023-09-28,0 days,0.0,gs://gfs-ensemble-forecast-system/gefs.2023092...,996616,897660,instant,meanSea,0.0
3,sp,2023-09-28,2023-09-28,0 days,0.0,gs://gfs-ensemble-forecast-system/gefs.2023092...,1894276,789233,instant,surface,0.0
4,orog,2023-09-28,2023-09-28,0 days,0.0,gs://gfs-ensemble-forecast-system/gefs.2023092...,2683509,477574,instant,surface,0.0
5,st,2023-09-28,2023-09-28,0 days,0.0,gs://gfs-ensemble-forecast-system/gefs.2023092...,3161083,366814,instant,depthBelowLandLayer,
6,soilw,2023-09-28,2023-09-28,0 days,0.0,gs://gfs-ensemble-forecast-system/gefs.2023092...,3527897,365458,instant,depthBelowLandLayer,
7,sdwe,2023-09-28,2023-09-28,0 days,0.0,gs://gfs-ensemble-forecast-system/gefs.2023092...,3893355,259506,instant,surface,0.0
8,sde,2023-09-28,2023-09-28,0 days,0.0,gs://gfs-ensemble-forecast-system/gefs.2023092...,4152861,241324,instant,surface,0.0
9,icetk,2023-09-28,2023-09-28,0 days,0.0,gs://gfs-ensemble-forecast-system/gefs.2023092...,4394185,84874,instant,surface,0.0


In [43]:
# This method need only be run once - just pick any GRIB file for the product to
# build the metadata structure and persist a copy of the coordinates for easy reference
metadata_path = "gs://dev.camus-infra.camus.store/davetest/gefs"
with io.BytesIO() as bio:
  zarr_store = data_extractor.treeify_groups(
      scanned_gefs, # Better to give just one file but this is fine too...
      layers=['stepType', 'typeOfLevel'],
      metadata_path=metadata_path,
      coords_file=bio,
      forecast_valid_time='valid_time',
      forecast_run_time='time',
      model_horizon="step",
      select_valid_time=np.datetime64("2023-09-28T00:00:00") # Just create var metadata for a single timestep
    )

  data_extractor.write_coords(metadata_path, bio)

data_extractor.write_store(metadata_path, zarr_store)

2023-11-13T02:15:04.375Z MainProcess MainThread INFO:ingestion.noaa_nwp.data_extractor:copied vis to dset: <zarr.core.Array '/instant/surface/vis' (721, 1440) float64>
2023-11-13T02:15:04.378Z MainProcess MainThread INFO:ingestion.noaa_nwp.data_extractor:copied latitude to dset: <zarr.core.Array '/instant/surface/latitude' (721,) float64>
2023-11-13T02:15:04.379Z MainProcess MainThread INFO:ingestion.noaa_nwp.data_extractor:copied longitude to dset: <zarr.core.Array '/instant/surface/longitude' (1440,) float64>
2023-11-13T02:15:04.380Z MainProcess MainThread INFO:ingestion.noaa_nwp.data_extractor:copied surface to dset: <zarr.core.Array '/instant/surface/surface' () float64>
2023-11-13T02:15:04.387Z MainProcess MainThread INFO:ingestion.noaa_nwp.data_extractor:copied gust to dset: <zarr.core.Array '/instant/surface/gust' (721, 1440) float64>
2023-11-13T02:15:04.394Z MainProcess MainThread INFO:ingestion.noaa_nwp.data_extractor:copied mslet to dset: <zarr.core.Array '/instant/meanSea/ms

### Play with the metadata zarr store...
You can already opne the zarr hierarchy as a zarr tree or datatree.
You can read the coordinate variable data, but not the data variables

In [49]:
zarr_spec = data_extractor.read_store(metadata_path)
kspec = {"refs": zarr_spec, "version": 1}

fs = fsspec.filesystem("reference", fo=kspec)
xd = datatree.open_datatree(fs.get_mapper(""), engine="zarr")
xd

2023-11-13T02:16:51.947Z MainProcess MainThread INFO:ingestion.noaa_nwp.data_extractor:Read 57879 bytes to gs://dev.camus-infra.camus.store/davetest/gefs/zarr_tree_store.msgpack
1. Consolidating metadata in this existing store with zarr.consolidate_metadata().
2. Explicitly setting consolidated=False, to avoid trying to read consolidate metadata, or
3. Explicitly setting consolidated=True, to raise an error in this case instead of falling back to try reading non-consolidated metadata.
  ds = open_dataset(store, engine="zarr", **kwargs)
1. Consolidating metadata in this existing store with zarr.consolidate_metadata().
2. Explicitly setting consolidated=False, to avoid trying to read consolidate metadata, or
3. Explicitly setting consolidated=True, to raise an error in this case instead of falling back to try reading non-consolidated metadata.
  subgroup_ds = open_dataset(store, engine="zarr", group=path, **kwargs)


### Reattach the data varaibles for a subset

Now we can recombine the the metadata structure withe particular slices of the kerchunk dataframe to build a valid datatree

In [50]:
# You can safely reindex to a longer range than you have chunks for - we need to fix the missing value
dtindex = pd.date_range("2023-09-28T00:00:00", "2023-09-28T05:00:00", freq="H", name="forecast_valid_time")
layers = ['stepType', 'typeOfLevel']

In [51]:
# Pick two variables to try it with...
short_chunk = gefs_chunks_df.loc[(gefs_chunks_df.name == "gust") | (gefs_chunks_df.name ==  "vis"), :]
short_chunk

Unnamed: 0,name,forecast_valid_time,forecast_run_time,model_horizon,zchunk,uri,offset,length,stepType,typeOfLevel,layer
0,vis,2023-09-28 00:00:00,2023-09-28,0 days 00:00:00,0.0,gs://gfs-ensemble-forecast-system/gefs.2023092...,0,490988,instant,surface,0.0
1,gust,2023-09-28 00:00:00,2023-09-28,0 days 00:00:00,0.0,gs://gfs-ensemble-forecast-system/gefs.2023092...,490988,505628,instant,surface,0.0
26,vis,2023-09-28 03:00:00,2023-09-28,0 days 03:00:00,0.0,gs://gfs-ensemble-forecast-system/gefs.2023092...,0,444136,instant,surface,0.0
27,gust,2023-09-28 03:00:00,2023-09-28,0 days 03:00:00,0.0,gs://gfs-ensemble-forecast-system/gefs.2023092...,444136,487740,instant,surface,0.0
64,vis,2023-09-28 06:00:00,2023-09-28,0 days 06:00:00,0.0,gs://gfs-ensemble-forecast-system/gefs.2023092...,0,433983,instant,surface,0.0
65,gust,2023-09-28 06:00:00,2023-09-28,0 days 06:00:00,0.0,gs://gfs-ensemble-forecast-system/gefs.2023092...,433983,470763,instant,surface,0.0
102,vis,2023-09-28 09:00:00,2023-09-28,0 days 09:00:00,0.0,gs://gfs-ensemble-forecast-system/gefs.2023092...,0,424160,instant,surface,0.0
103,gust,2023-09-28 09:00:00,2023-09-28,0 days 09:00:00,0.0,gs://gfs-ensemble-forecast-system/gefs.2023092...,424160,459285,instant,surface,0.0


In [52]:
zinflated = data_extractor.add_chunks_to_zarr(
    short_chunk,
    zarr_spec,
    layers,
    'forecast_run_time',
    [dtindex]
)
pprint.pprint(zinflated)

KeyError: ignored

In [None]:
kspec = {"refs": zinflated, "version": 1}
#kspec = {"refs": zarr_spec, "version": 1}

fs = fsspec.filesystem("reference", fo=kspec)
xd = datatree.open_datatree(fs.get_mapper(""), engine="zarr")
xd

In [None]:
xd.avg.surface.vbdsf.values[:,900,100]
