Skip to content

async task errors with GRIB2 kerchunk references #562

@JackReevesEyre-NOAA

Description

@JackReevesEyre-NOAA

I am using kerchunk to accelerate reading of arbitrary sets of GFS GRIB2 files. My program has two main parts:

  1. Given a list of GRIB2 files, check for existing json reference files.
    a. If they don't exist, create them
    b. Return the list of reference files
  2. Return an xarray dataset (lazy loaded at this point) from the list of reference files.
    The example usage of this dataset is to select time series from a number of "station" locations (which is when the computation happens).

The problem:
The first time I run this for a given set of GRIB2 files, kerchunk creates the reference file as expected, but loading the dataset throws a number of errors related to asyncio tasks attached to different loops (example traceback below).

But... the second time I run the same script, which then uses the json reference files already created on the first pass, the dataset is loaded as expected. My question is why doesn't it get to this point on the first pass?

I guess this could be an issue with zarr or fsspec, but from the sequence described above, my impression is that it's related to kerchunk.

Below are a self-contained example, error traceback, and my environment.

import xarray as xr
import datetime as dt
import pandas as pd
import pathlib
import fsspec
import ujson
import zarr
from kerchunk.grib2 import scan_grib
from kerchunk.combine import MultiZarrToZarr


def main():
    # ----- Get list of GFS grib files.
    fs_read = fsspec.filesystem('s3', anon=True, skip_instance_cache=True)
    init_datetime = dt.datetime(2025,1,15,0,0)
    glob_str = f"s3://noaa-gfs-bdp-pds/gfs.{init_datetime.strftime('%Y%m%d')}/{init_datetime.strftime('%H')}/atmos/gfs.t{init_datetime.strftime('%H')}z.sfluxgrbf*.grib2"
    # We just pick two, for speed, and don't include the zeroth one cause it has a slightly different format. 
    grib_file_list = [f for f in fs_read.glob(glob_str)][1:3]
    
    # ----- Create/get the reference files.
    ref_files = []
    for filename in grib_file_list:
        ltrefs = kerchunk_grib(filename)
        ref_files = ref_files + ltrefs

    # ----- Load the dataset.
    ds = open_ds_kerchunk_ref(ref_files)
    
    # ----- Extract data at locations.
    print("Subsetting data...")
    df_stations = pd.DataFrame(
        data={
            'station':['8720218', '8720357', '8447930'],
            'latitude':[30 + (23.9/60.0), 30 + (11.5/60.0), 41 + (31.4/60.0)],
            'longitude':[360.0 - 81 - (25.7/60.0), 360.0 - 81 - (41.3/60.0), 360.0 -70 - (40.3/60.0)]
        }
    )
    ds_stations = df_stations.set_index('station').to_xarray()
    df = ds.sel(
        latitude=ds_stations.latitude,
        longitude=ds_stations.longitude,
        method='nearest'
    ).to_dataframe().reset_index().set_index(['station','time'])
    return df

def kerchunk_grib(grib_filename):
    """Create (if needed) and return list of kerchunk reference jsons."""
    # File system to write to: currently local (even if from AWS).
    fs_write = fsspec.filesystem('')
    
    # Create directory to save reference files to.
    json_dir = pathlib.Path('./kerchunk_refs') / pathlib.Path(grib_filename).parents[0]
    json_dir.mkdir(parents=True, exist_ok=True)
    
    # Get file name root for creating json file names.
    # Just the filename without the ".grib2" suffix.
    json_name_root = pathlib.Path(grib_filename).stem

    # Create variable filter (hardcoded for now).
    var_filter = {'cfVarName': ['u10', 'v10', 'sp']}
    storage_opts = {"anon": True}

    # Check if there are existing reference files.
    # We could do some fancy checks for variable names here, but 
    # let's do that another day.
    existing_refs = fs_write.glob(str(json_dir / json_name_root) 
                                  + '_message*.json')
    if len(existing_refs) < len(var_filter['cfVarName']):
        print("Creating reference files...")
        # Create the reference files, and keep a list of their names.
        out = scan_grib('s3://' + grib_filename, 
                        storage_options=storage_opts, 
                        filter=var_filter)
        ref_filename_list = []
        # Loop over outputs  (scan_grib returns list with one reference per grib message/variable)
        for i, message in enumerate(out):
            out_file_name = f'{json_dir / json_name_root}_message{i}.json'
            ref_filename_list.append(out_file_name)
            print(f'writing json file {out_file_name}')
            with fs_write.open(out_file_name, "w") as f: 
                f.write(ujson.dumps(message)) #write to file 
    else:
        ref_filename_list = existing_refs
    
    # Return the reference file name list.
    return ref_filename_list


def open_ds_kerchunk_ref(filelist):
    """Open a dataset from a list of kerchunk reference files."""
    print('Opening dataset from reference files...')
    mzz = MultiZarrToZarr(
        filelist,
        concat_dims=['valid_time'],
        identical_dims=['latitude', 'longitude', 
                        'heightAboveGround', 'surface', 
                        'step']
    )
    mzz_trans = mzz.translate()
    # Open dataset as zarr object using fsspec reference file system and xarray
    fs = fsspec.filesystem("reference", fo=mzz_trans, 
                           remote_protocol='s3', 
                           remote_options={'anon':True},
                           target_options={})
    store = zarr.storage.FsspecStore(fs)
    ds = xr.open_dataset(store, engine="zarr", 
                         backend_kwargs=dict(consolidated=False), 
                         chunks={'valid_time':1}, 
                         decode_timedelta=True)
    return ds.drop_vars(['step', 'surface', 'heightAboveGround'], 
                        errors='ignore')


if __name__ == '__main__':
    df = main()

Subset of errors after first pass of script:

future: <Future finished exception=RuntimeError("Task <Task pending name='Task-4708' coro=<TCPConnector._resolve_host_with_throttle() running at /home/Jack.Reeveseyre/seanode/.venv/lib/python3.12/site-packages/aiohttp/connector.py:1026>> got Future <Future pending cb=[_chain_future.<locals>._call_check_cancel() at /home/Jack.Reeveseyre/conda/envs/py312/lib/python3.12/asyncio/futures.py:389]> attached to a different loop")>
Traceback (most recent call last):
  File "/home/Jack.Reeveseyre/seanode/.venv/lib/python3.12/site-packages/aiobotocore/httpsession.py", line 222, in send
    response = await session.request(
               ^^^^^^^^^^^^^^^^^^^^^^
  File "/home/Jack.Reeveseyre/seanode/.venv/lib/python3.12/site-packages/aiohttp/client.py", line 703, in _request
    conn = await self._connector.connect(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/Jack.Reeveseyre/seanode/.venv/lib/python3.12/site-packages/aiohttp/connector.py", line 548, in connect
    proto = await self._create_connection(req, traces, timeout)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/Jack.Reeveseyre/seanode/.venv/lib/python3.12/site-packages/aiohttp/connector.py", line 1056, in _create_connection
    _, proto = await self._create_direct_connection(req, traces, timeout)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/Jack.Reeveseyre/seanode/.venv/lib/python3.12/site-packages/aiohttp/connector.py", line 1357, in _create_direct_connection
    hosts = await self._resolve_host(host, port, traces=traces)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/Jack.Reeveseyre/seanode/.venv/lib/python3.12/site-packages/aiohttp/connector.py", line 995, in _resolve_host
    return await asyncio.shield(resolved_host_task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/Jack.Reeveseyre/seanode/.venv/lib/python3.12/site-packages/aiohttp/connector.py", line 1026, in _resolve_host_with_throttle
    addrs = await self._resolver.resolve(host, port, family=self._family)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/Jack.Reeveseyre/seanode/.venv/lib/python3.12/site-packages/aiohttp/resolver.py", line 39, in resolve
    infos = await self._loop.getaddrinfo(
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/Jack.Reeveseyre/conda/envs/py312/lib/python3.12/asyncio/base_events.py", line 905, in getaddrinfo
    return await self.run_in_executor(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
RuntimeError: Task <Task pending name='Task-4708' coro=<TCPConnector._resolve_host_with_throttle() running at /home/Jack.Reeveseyre/seanode/.venv/lib/python3.12/site-packages/aiohttp/connector.py:1026>> got Future <Future pending cb=[_chain_future.<locals>._call_check_cancel() at /home/Jack.Reeveseyre/conda/envs/py312/lib/python3.12/asyncio/futures.py:389]> attached to a different loop
ERROR:asyncio:Future exception was never retrieved

Output from xarray.show_versions() plus kerchunk.__version__:

kerchunk__version__ # '0.2.8

INSTALLED VERSIONS
------------------
commit: None
python: 3.12.8 | packaged by conda-forge | (main, Dec  5 2024, 14:24:40) [GCC 13.3.0]
python-bits: 64
OS: Linux
OS-release: 6.6.87.2-microsoft-standard-WSL2
machine: x86_64
processor: x86_64
byteorder: little
LC_ALL: None
LANG: C.UTF-8
LOCALE: ('C', 'UTF-8')
libhdf5: 1.14.2
libnetcdf: 4.9.4-development

xarray: 2025.4.0
pandas: 2.2.3
numpy: 2.2.5
scipy: 1.15.3
netCDF4: 1.7.2
pydap: 3.5.5
h5netcdf: 1.6.1
h5py: 3.13.0
zarr: 3.0.8
cftime: 1.6.4.post1
nc_time_axis: None
iris: None
bottleneck: 1.4.2
dask: 2025.4.1
distributed: None
matplotlib: None
cartopy: None
seaborn: None
numbagg: 0.9.0
fsspec: 2025.3.2
cupy: None
pint: None
sparse: None
flox: None
numpy_groupies: None
setuptools: None
pip: 24.3.1
conda: None
pytest: None
mypy: None
IPython: None
sphinx: None

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions