# Generate kerchunk sidecar file for Surface Air Temperature data in NetCDF formats (freq: daily and monthly)

In [1]:
import glob
import re
import matplotlib.pyplot as plt
import numpy as np
# import scipy as sp
import xarray as xr
import fsspec
from kerchunk.hdf import SingleHdf5ToZarr
from pathlib import Path
import ujson
import intake_esm
import intake
import fsspec
import kerchunk.hdf
from kerchunk.combine import MultiZarrToZarr

In [2]:
import dask
from dask.distributed import Client, performance_report
from dask_jobqueue import PBSCluster

In [3]:
import os
import ujson
from fsspec.implementations.local import LocalFileSystem
import kerchunk.hdf
import pandas as pd

In [4]:
# File paths
rda_scratch = "/gpfs/csfs1/collections/rda/scratch/harshah"
rda_data    = "/gpfs/csfs1/collections/rda/data/"
myrda_data  = rda_data + 'harshah/'
era5_path   = rda_data + "ds633.0/e5.oper.an.sfc/"
#
zarr_path   = rda_scratch + "/tas_zarr/"

In [5]:
def gen_json(file_url, write_json=False):
    print(f'generating {file_url}')
    with fs.open(file_url, **so) as infile:
        h5chunks = kerchunk.hdf.SingleHdf5ToZarr(infile, file_url, inline_threshold=366 )
        year = file_url.split('/')[-1].split('.')[0]
        file_basename = os.path.basename(file_url)
        outfile = f'{file_basename}.json'
        if write_json:
            with fs.open(outfile, 'wb') as f:
                print(f'writing {outfile}')
                f.write(ujson.dumps(h5chunks.translate()).encode());
        return h5chunks.translate()

In [6]:
## Find NetCDF files with tas (Surface air temperature at 2m) using glob and a search pattern
tas_pattern = era5_path + "**/e5.oper.an.sfc.128_167_2t.*.nc"
tas_ncfiles = glob.glob(tas_pattern, recursive=True)

In [7]:
# Create a PBS cluster object
cluster = PBSCluster(
    job_name = 'dask-wk24-hpc',
    cores = 1,
    memory = '8GiB',
    processes = 1,
    local_directory = rda_scratch+'/dask/spill',
    log_directory = rda_scratch +'/dask/',
    resource_spec = 'select=1:ncpus=1:mem=8GB',
    queue = 'casper',
    walltime = '2:00:00',
    #interface = 'ib0'
    interface = 'ext'
)

Perhaps you already have a cluster running?
Hosting the HTTP server on port 37103 instead
Task exception was never retrieved
future: <Task finished name='Task-114904' coro=<Client._gather.<locals>.wait() done, defined at /glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/distributed/client.py:2278> exception=AllExit()>
Traceback (most recent call last):
  File "/glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/distributed/client.py", line 2287, in wait
    raise AllExit()
distributed.client.AllExit
Task exception was never retrieved
future: <Task finished name='Task-114762' coro=<Client._gather.<locals>.wait() done, defined at /glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/distributed/client.py:2278> exception=AllExit()>
Traceback (most recent call last):
  File "/glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/distributed/client.py", line 2287, in wait
    raise AllExit()
dis

In [8]:
client = Client(cluster)
client

0,1
Connection method: Cluster object,Cluster type: dask_jobqueue.PBSCluster
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/harshah/proxy/37103/status,

0,1
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/harshah/proxy/37103/status,Workers: 0
Total threads: 0,Total memory: 0 B

0,1
Comm: tcp://128.117.208.96:44203,Workers: 0
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/harshah/proxy/37103/status,Total threads: 0
Started: Just now,Total memory: 0 B


In [9]:
cluster.scale(10)

### Open intake catalog and extract paths

In [10]:
catalog = intake.open_esm_datastore(rda_scratch + '/intake_catalogs/https/era5_catalog_https.json')
catalog

  df = pd.read_csv(


Unnamed: 0,unique
era_id,1
datatype,2
level_type,1
step_type,7
table_code,4
param_code,164
variable,212
long_name,212
units,33
year,85


In [11]:
cat_temp = catalog.search(variable ='VAR_2T')
cat_temp

Unnamed: 0,unique
era_id,1
datatype,1
level_type,0
step_type,1
table_code,1
param_code,1
variable,1
long_name,1
units,1
year,85


In [12]:
cat_temp.df['path'].head()

0    https://data.rda.ucar.edu/ds633.0/e5.oper.an.s...
1    https://data.rda.ucar.edu/ds633.0/e5.oper.an.s...
2    https://data.rda.ucar.edu/ds633.0/e5.oper.an.s...
3    https://data.rda.ucar.edu/ds633.0/e5.oper.an.s...
4    https://data.rda.ucar.edu/ds633.0/e5.oper.an.s...
Name: path, dtype: object

### Read these netcdf files and rewrite them with uniform chunk size

In [13]:
# fs = LocalFileSystem()
so = dict(mode='rb', default_fill_cache=False, default_cache_type='first')

In [14]:
def rechunk_to_zarr(file_url, zarr_directory, chunk_size, overwrite=False):
    """Rechunk a NetCDF file and save it to Zarr format with the same file name, removing '#mode=bytes'.
    Args:
        file_url (str): URL or path to the input NetCDF file.
        zarr_directory (str): Directory where the Zarr file will be saved.
        chunk_size (dict): Dictionary specifying the chunk sizes.
        overwrite (bool): Whether to overwrite existing files. Default is False.
    """
    # Extract the file name from the file URL and remove '#mode=bytes' if present
    file_name = os.path.basename(file_url).replace('#mode=bytes', '').replace('.nc', '.zarr')
    
    # Create the output path
    zarr_path = os.path.join(zarr_directory, file_name)
    
    # Check if the file already exists and overwrite is False
    if os.path.exists(zarr_path) and not overwrite:
        print(f'Zarr file {zarr_path} already exists and overwrite is set to False. Skipping.')
        return zarr_path
    
    # Open the dataset
    ds = xr.open_dataset(file_url)
    
    # Rechunk the dataset
    ds = ds.chunk(chunks=chunk_size)
    
    # Save the rechunked dataset to Zarr format
    ds.to_zarr(zarr_path, mode='w')
    print(f'Saved rechunked file to {zarr_path}')
    return zarr_path

def process_and_rechunk_to_zarr(file_urls, zarr_directory, chunk_size, overwrite=False):
    """Process a list of file URLs, rechunk them, and save them to Zarr format.
    Args:
        file_urls (list): List of URLs or paths to the input NetCDF files.
        zarr_directory (str): Directory where the Zarr files will be saved.
        chunk_size (dict): Dictionary specifying the chunk sizes.
        overwrite (bool): Whether to overwrite existing files. Default is False.
    """
    os.makedirs(zarr_directory, exist_ok=True)
    zarr_files = []
    for file_url in file_urls:
        zarr_file = rechunk_to_zarr(file_url, zarr_directory, chunk_size, overwrite)
        zarr_files.append(zarr_file)
    return zarr_files

In [15]:
def zarr_to_netcdf(zarr_path, netcdf_directory, overwrite=False):
    """Convert a Zarr file to NetCDF format.
    Args:
        zarr_path (str): Path to the Zarr file.
        netcdf_directory (str): Directory where the NetCDF file will be saved.
        overwrite (bool): Whether to overwrite existing files. Default is False.
    """
    # Extract the file name from the Zarr path and replace '.zarr' with '.nc'
    file_name = os.path.basename(zarr_path).replace('.zarr', '.nc')
    
    # Create the output path
    netcdf_path = os.path.join(netcdf_directory, file_name)
    
    # Check if the file already exists and overwrite is False
    if os.path.exists(netcdf_path) and not overwrite:
        print(f'NetCDF file {netcdf_path} already exists and overwrite is set to False. Skipping.')
        return netcdf_path
    
    # Open the Zarr dataset
    ds = xr.open_zarr(zarr_path)
    
    # Save the dataset to NetCDF format
    ds.to_netcdf(netcdf_path, mode='w')
    print(f'Saved NetCDF file to {netcdf_path}')
    return netcdf_path

def process_zarr_to_netcdf(zarr_paths, netcdf_directory, overwrite=False):
    """Process a list of Zarr paths and convert them to NetCDF format.
    Args:
        zarr_paths (list): List of paths to the Zarr files.
        netcdf_directory (str): Directory where the NetCDF files will be saved.
        overwrite (bool): Whether to overwrite existing files. Default is False.
    """
    os.makedirs(netcdf_directory, exist_ok=True)
    netcdf_files = []
    for zarr_path in zarr_paths:
        netcdf_file = zarr_to_netcdf(zarr_path, netcdf_directory, overwrite)
        netcdf_files.append(netcdf_file)
    return netcdf_files

In [17]:
def create_new_file_paths(file_urls, output_directory):
    """Create new file paths for the rechunked files."""
    new_file_paths = []
    for file_url in file_urls:
        # Extract the file name from the file URL and remove '#mode=bytes'
        file_name = os.path.basename(file_url).replace('#mode=bytes', '')
        # Create the new file path
        new_file_path = os.path.join(output_directory, file_name)
        new_file_paths.append(new_file_path)
    return new_file_paths

In [18]:
def gen_individual_json(file_url, output_directory='.', inline_threshold=366):
    """Generate individual Kerchunk JSON sidecar file for a given file URL."""
    if file_url.startswith("http"):
        so = dict(mode='rb')
    else:
        so = dict(mode='rb', default_fill_cache=False, default_cache_type='first')
    
    with fsspec.open(file_url, **so) as infile:
        h5chunks = kerchunk.hdf.SingleHdf5ToZarr(infile, file_url, inline_threshold=inline_threshold)
        file_basename = os.path.basename(file_url)
        outfile = os.path.join(output_directory, f'{file_basename}.json')
        with open(outfile, 'w') as f:
            ujson.dump(h5chunks.translate(), f)
        return outfile

def process_files(file_paths, output_directory='.'):
    """Process a list of file paths and generate individual JSON sidecar files."""
    json_files = []
    for file_path in file_paths:
        json_file = gen_individual_json(file_path, output_directory)
        json_files.append(json_file)
    return json_files

def combine_json_files(json_files, output_path, concat_dim='time'):
    """Combine individual JSON sidecar files into a single aggregate JSON sidecar file."""
    refs = [ujson.load(open(f)) for f in json_files]

    print('Combining references into a single JSON sidecar file')
    mzz = MultiZarrToZarr(
        path=json_files,
        indicts=refs, 
        concat_dims=[concat_dim],
    )
    combined = mzz.translate()
    
    with open(output_path, 'w') as f:
        ujson.dump(combined, f)
    print(f'Combined JSON sidecar file written to {output_path}')

In [23]:
%%time
output_directory = myrda_data +'era5_tas'  # Update to your actual path
chunk_size = {"time": 27, "latitude": 139, "longitude": 277}  # Update with your desired chunk sizes
# Rechunk the files and save them to the new directory
file_urls = cat_temp.df['path'][:12].tolist()

zarr_directory   = output_directory + '/zarr'  
netcdf_directory = output_directory + '/netcdf'  

zarr_files = process_and_rechunk_to_zarr(file_urls, zarr_directory, chunk_size, overwrite=False)

# rechunked_files = process_and_rechunk_files(file_urls, output_directory, chunk_size)

Zarr file /gpfs/csfs1/collections/rda/data/harshah/era5_tas/zarr/e5.oper.an.sfc.128_167_2t.ll025sc.1940010100_1940013123.zarr already exists and overwrite is set to False. Skipping.
Zarr file /gpfs/csfs1/collections/rda/data/harshah/era5_tas/zarr/e5.oper.an.sfc.128_167_2t.ll025sc.1940020100_1940022923.zarr already exists and overwrite is set to False. Skipping.
Zarr file /gpfs/csfs1/collections/rda/data/harshah/era5_tas/zarr/e5.oper.an.sfc.128_167_2t.ll025sc.1940030100_1940033123.zarr already exists and overwrite is set to False. Skipping.
Saved rechunked file to /gpfs/csfs1/collections/rda/data/harshah/era5_tas/zarr/e5.oper.an.sfc.128_167_2t.ll025sc.1940040100_1940043023.zarr
Saved rechunked file to /gpfs/csfs1/collections/rda/data/harshah/era5_tas/zarr/e5.oper.an.sfc.128_167_2t.ll025sc.1940050100_1940053123.zarr
Saved rechunked file to /gpfs/csfs1/collections/rda/data/harshah/era5_tas/zarr/e5.oper.an.sfc.128_167_2t.ll025sc.1940060100_1940063023.zarr
Saved rechunked file to /gpfs/csfs

In [20]:
%%time
# Now write them to netcdf files
netcdf_files = process_zarr_to_netcdf(zarr_files, netcdf_directory, overwrite=False)

NetCDF file /gpfs/csfs1/collections/rda/data/harshah/era5_tas/netcdf/e5.oper.an.sfc.128_167_2t.ll025sc.1940010100_1940013123.nc already exists and overwrite is set to False. Skipping.
NetCDF file /gpfs/csfs1/collections/rda/data/harshah/era5_tas/netcdf/e5.oper.an.sfc.128_167_2t.ll025sc.1940020100_1940022923.nc already exists and overwrite is set to False. Skipping.



KeyboardInterrupt



In [21]:
def create_new_file_paths(file_urls, output_directory):
    """Create new file paths for the rechunked files."""
    new_file_paths = []
    for file_url in file_urls:
        # Extract the file name from the file URL and remove '#mode=bytes'
        file_name = os.path.basename(file_url).replace('#mode=bytes', '')
        # Create the new file path
        new_file_path = os.path.join(output_directory, file_name)
        new_file_paths.append(new_file_path)
    return new_file_paths

# Create new file paths for the rechunked files
new_file_paths = create_new_file_paths(file_urls, netcdf_directory)
print(new_file_paths)

['/gpfs/csfs1/collections/rda/data/harshah/era5_tas/netcdf/e5.oper.an.sfc.128_167_2t.ll025sc.1940010100_1940013123.nc', '/gpfs/csfs1/collections/rda/data/harshah/era5_tas/netcdf/e5.oper.an.sfc.128_167_2t.ll025sc.1940020100_1940022923.nc', '/gpfs/csfs1/collections/rda/data/harshah/era5_tas/netcdf/e5.oper.an.sfc.128_167_2t.ll025sc.1940030100_1940033123.nc', '/gpfs/csfs1/collections/rda/data/harshah/era5_tas/netcdf/e5.oper.an.sfc.128_167_2t.ll025sc.1940040100_1940043023.nc', '/gpfs/csfs1/collections/rda/data/harshah/era5_tas/netcdf/e5.oper.an.sfc.128_167_2t.ll025sc.1940050100_1940053123.nc', '/gpfs/csfs1/collections/rda/data/harshah/era5_tas/netcdf/e5.oper.an.sfc.128_167_2t.ll025sc.1940060100_1940063023.nc', '/gpfs/csfs1/collections/rda/data/harshah/era5_tas/netcdf/e5.oper.an.sfc.128_167_2t.ll025sc.1940070100_1940073123.nc', '/gpfs/csfs1/collections/rda/data/harshah/era5_tas/netcdf/e5.oper.an.sfc.128_167_2t.ll025sc.1940080100_1940083123.nc', '/gpfs/csfs1/collections/rda/data/harshah/era5_

In [21]:
%%time
sidecar_directory = myrda_data + 'sidecar/era5/'
sidecar_path      = sidecar_directory + 'era5_2t_https.json'  # Update to your desired output directory
# Generate individual JSON sidecar files
json_files = process_files(new_file_paths, sidecar_directory)


KeyboardInterrupt



In [None]:
%%time
# Combine individual JSON sidecar files into a single aggregate JSON sidecar file
combine_json_files(json_files, sidecar_path)

## Test chunking of netcdf files

In [None]:
tas_nc = xr.open_mfdataset(new_file_paths[1],engine='netcdf4').VAR_2T 
tas_nc

In [None]:
tas_zarr = xr.open_zarr(zarr_files[1]).VAR_2T
tas_zarr