In [1]:
import dataclasses
import datetime

@dataclasses.dataclass
class ZarrId:
    run_hour: datetime.datetime
    level_type: str
    var_level: str
    var_name: str
    model_type: str
        
    def format_chunk_id(self, chunk_id):
        if self.model_type == "fcst": 
            # Extra id part since forecasts have an additional (time) dimension
            return "0." + str(chunk_id)
        else:
            return chunk_id

In [2]:
level_type = "sfc"
model_type = "fcst"
run_hour = datetime.datetime(2021, 1, 1, 7)

def create_hrrr_zarr_explorer_url(level_type, model_type, run_hour):
    url = "https://hrrrzarr.s3.amazonaws.com/index.html"
    url += run_hour.strftime(
        f"#{level_type}/%Y%m%d/%Y%m%d_%Hz_{model_type}.zarr/")
    return url

print(create_hrrr_zarr_explorer_url(level_type, model_type, run_hour))
print(create_hrrr_zarr_explorer_url("prs", "anl", run_hour))

https://hrrrzarr.s3.amazonaws.com/index.html#sfc/20210101/20210101_07z_fcst.zarr/
https://hrrrzarr.s3.amazonaws.com/index.html#prs/20210101/20210101_07z_anl.zarr/


In [3]:
zarr_id = ZarrId(
                run_hour=datetime.datetime(2020, 8, 1, 0), # Aug 1, 0Z
                level_type="sfc",
                var_level="1000mb",
                var_name="TMP",
                model_type="anl"
                )
chunk_id = "4.3"

def create_https_chunk_url(zarr_id, chunk_id):
    url = "https://hrrrzarr.s3.amazonaws.com"
    url += zarr_id.run_hour.strftime(
        f"/{zarr_id.level_type}/%Y%m%d/%Y%m%d_%Hz_{zarr_id.model_type}.zarr/")
    url += f"{zarr_id.var_level}/{zarr_id.var_name}/{zarr_id.var_level}/{zarr_id.var_name}"
    url += f"/{zarr_id.format_chunk_id(chunk_id)}"
    return url

create_https_chunk_url(zarr_id, chunk_id)

'https://hrrrzarr.s3.amazonaws.com/sfc/20200801/20200801_00z_anl.zarr/1000mb/TMP/1000mb/TMP/4.3'

In [4]:
def create_s3_group_url(zarr_id, prefix=True):
    url = "s3://hrrrzarr/" if prefix else "" # Skip when using boto3
    url += zarr_id.run_hour.strftime(
        f"{zarr_id.level_type}/%Y%m%d/%Y%m%d_%Hz_{zarr_id.model_type}.zarr/")
    url += f"{zarr_id.var_level}/{zarr_id.var_name}"
    return url

create_s3_group_url(zarr_id)

's3://hrrrzarr/sfc/20200801/20200801_00z_anl.zarr/1000mb/TMP'

In [5]:
def create_s3_subgroup_url(zarr_id, prefix=True):
    url = create_s3_group_url(zarr_id, prefix)
    url += f"/{zarr_id.var_level}"
    return url

create_s3_subgroup_url(zarr_id)

's3://hrrrzarr/sfc/20200801/20200801_00z_anl.zarr/1000mb/TMP/1000mb'

In [6]:
def create_s3_chunk_url(zarr_id, chunk_id, prefix=False):
    url = create_s3_subgroup_url(zarr_id, prefix)
    url += f"/{zarr_id.var_name}/{zarr_id.format_chunk_id(chunk_id)}"
    return url

create_s3_chunk_url(zarr_id, chunk_id)

'sfc/20200801/20200801_00z_anl.zarr/1000mb/TMP/1000mb/TMP/4.3'

In [7]:
import cartopy.crs as ccrs

projection = ccrs.LambertConformal(central_longitude=262.5, 
                                   central_latitude=38.5, 
                                   standard_parallels=(38.5, 38.5),
                                    globe=ccrs.Globe(semimajor_axis=6371229,
                                                     semiminor_axis=6371229))

In [8]:
import xarray as xr 
import s3fs 
import metpy 
fs = s3fs.S3FileSystem(anon=True) 

In [9]:
chunk_index = xr.open_zarr(s3fs.S3Map("s3://hrrrzarr/grid/HRRR_chunk_index.zarr", s3=fs))
chunk_index

Unnamed: 0,Array,Chunk
Bytes,14.54 MiB,465.82 kiB
Shape,"(1799, 1059)","(225, 265)"
Dask graph,32 chunks in 2 graph layers,32 chunks in 2 graph layers
Data type,object numpy.ndarray,object numpy.ndarray
"Array Chunk Bytes 14.54 MiB 465.82 kiB Shape (1799, 1059) (225, 265) Dask graph 32 chunks in 2 graph layers Data type object numpy.ndarray",1059  1799,

Unnamed: 0,Array,Chunk
Bytes,14.54 MiB,465.82 kiB
Shape,"(1799, 1059)","(225, 265)"
Dask graph,32 chunks in 2 graph layers,32 chunks in 2 graph layers
Data type,object numpy.ndarray,object numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,7.03 kiB,7.03 kiB
Shape,"(1799,)","(1799,)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,int32 numpy.ndarray,int32 numpy.ndarray
"Array Chunk Bytes 7.03 kiB 7.03 kiB Shape (1799,) (1799,) Dask graph 1 chunks in 2 graph layers Data type int32 numpy.ndarray",1799  1,

Unnamed: 0,Array,Chunk
Bytes,7.03 kiB,7.03 kiB
Shape,"(1799,)","(1799,)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,int32 numpy.ndarray,int32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,4.14 kiB,4.14 kiB
Shape,"(1059,)","(1059,)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,int32 numpy.ndarray,int32 numpy.ndarray
"Array Chunk Bytes 4.14 kiB 4.14 kiB Shape (1059,) (1059,) Dask graph 1 chunks in 2 graph layers Data type int32 numpy.ndarray",1059  1,

Unnamed: 0,Array,Chunk
Bytes,4.14 kiB,4.14 kiB
Shape,"(1059,)","(1059,)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,int32 numpy.ndarray,int32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,7.03 kiB,7.03 kiB
Shape,"(1799,)","(1799,)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,int32 numpy.ndarray,int32 numpy.ndarray
"Array Chunk Bytes 7.03 kiB 7.03 kiB Shape (1799,) (1799,) Dask graph 1 chunks in 2 graph layers Data type int32 numpy.ndarray",1799  1,

Unnamed: 0,Array,Chunk
Bytes,7.03 kiB,7.03 kiB
Shape,"(1799,)","(1799,)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,int32 numpy.ndarray,int32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,4.14 kiB,4.14 kiB
Shape,"(1059,)","(1059,)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,int32 numpy.ndarray,int32 numpy.ndarray
"Array Chunk Bytes 4.14 kiB 4.14 kiB Shape (1059,) (1059,) Dask graph 1 chunks in 2 graph layers Data type int32 numpy.ndarray",1059  1,

Unnamed: 0,Array,Chunk
Bytes,4.14 kiB,4.14 kiB
Shape,"(1059,)","(1059,)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,int32 numpy.ndarray,int32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,7.03 kiB,7.03 kiB
Shape,"(1799,)","(1799,)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,int32 numpy.ndarray,int32 numpy.ndarray
"Array Chunk Bytes 7.03 kiB 7.03 kiB Shape (1799,) (1799,) Dask graph 1 chunks in 2 graph layers Data type int32 numpy.ndarray",1799  1,

Unnamed: 0,Array,Chunk
Bytes,7.03 kiB,7.03 kiB
Shape,"(1799,)","(1799,)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,int32 numpy.ndarray,int32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,4.14 kiB,4.14 kiB
Shape,"(1059,)","(1059,)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,int32 numpy.ndarray,int32 numpy.ndarray
"Array Chunk Bytes 4.14 kiB 4.14 kiB Shape (1059,) (1059,) Dask graph 1 chunks in 2 graph layers Data type int32 numpy.ndarray",1059  1,

Unnamed: 0,Array,Chunk
Bytes,4.14 kiB,4.14 kiB
Shape,"(1059,)","(1059,)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,int32 numpy.ndarray,int32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,14.54 MiB,467.58 kiB
Shape,"(1059, 1799)","(133, 450)"
Dask graph,32 chunks in 2 graph layers,32 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 14.54 MiB 467.58 kiB Shape (1059, 1799) (133, 450) Dask graph 32 chunks in 2 graph layers Data type float64 numpy.ndarray",1799  1059,

Unnamed: 0,Array,Chunk
Bytes,14.54 MiB,467.58 kiB
Shape,"(1059, 1799)","(133, 450)"
Dask graph,32 chunks in 2 graph layers,32 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,14.54 MiB,467.58 kiB
Shape,"(1059, 1799)","(133, 450)"
Dask graph,32 chunks in 2 graph layers,32 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 14.54 MiB 467.58 kiB Shape (1059, 1799) (133, 450) Dask graph 32 chunks in 2 graph layers Data type float64 numpy.ndarray",1799  1059,

Unnamed: 0,Array,Chunk
Bytes,14.54 MiB,467.58 kiB
Shape,"(1059, 1799)","(133, 450)"
Dask graph,32 chunks in 2 graph layers,32 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray


In [10]:
def get_nearest_point(projection, chunk_index, longitude, latitude):
    x, y = projection.transform_point(longitude, latitude, ccrs.PlateCarree())
    return chunk_index.sel(x=x, y=y, method="nearest")

# lon, lat = -111.8910, 40.7608
lon, lat = -109.5287, 40.4555
nearest_point = get_nearest_point(projection, chunk_index, lon, lat)
chunk_id = nearest_point.chunk_id.values
print(chunk_id)

4.3


In [11]:
import boto3
from botocore import UNSIGNED
from botocore.config import Config

# Don't recreate this resource in a loop! That caused a 3-4x slowdown for me.
s3 = boto3.resource(service_name='s3', region_name='us-west-1', config=Config(signature_version=UNSIGNED))

def retrieve_object(s3, s3_url):
    obj = s3.Object('hrrrzarr', s3_url)
    return obj.get()['Body'].read()

zarr_id = ZarrId(
                # run_hour=datetime.datetime(2019, 1, 1, 12),
                run_hour = datetime.datetime(2023,11,7,0,0,0),
                level_type="sfc",
                var_level="2m_above_ground",
                var_name="TMP",
                model_type="anl"
                )

compressed_data = retrieve_object(s3, create_s3_chunk_url(zarr_id, chunk_id))

In [12]:
import numcodecs as ncd
import numpy as np

def decompress_chunk(zarr_id, compressed_data):
    buffer = ncd.blosc.decompress(compressed_data)
    
    dtype = "<f2"
    if zarr_id.var_level == "surface" and zarr_id.var_name == "PRES":
        dtype = "<f4"
        
    chunk = np.frombuffer(buffer, dtype=dtype)
    
    if zarr_id.model_type == "anl":
        data_array = np.reshape(chunk, (150, 150))
    else:
        entry_size = 22500
        data_array = np.reshape(chunk, (len(chunk)//entry_size, 150, 150))
        
    return data_array

chunk_data = decompress_chunk(zarr_id, compressed_data)

In [13]:
chunk_data[nearest_point.in_chunk_y.values, nearest_point.in_chunk_x.values]


287.0

In [14]:
def get_value(zarr_id, chunk_id, nearest_point):
    compressed_data = retrieve_object(s3, create_s3_chunk_url(zarr_id, chunk_id))
    chunk_data = decompress_chunk(zarr_id, compressed_data)
    if zarr_id.model_type == "fcst":
        return chunk_data[:, nearest_point.in_chunk_y.values, nearest_point.in_chunk_x.values]
    else:
        return chunk_data[nearest_point.in_chunk_y.values, nearest_point.in_chunk_x.values]
    
zarr_ids = [dataclasses.replace(zarr_id, run_hour=zarr_id.run_hour + datetime.timedelta(hours=time_delta))
           for time_delta in range(12)]
print(np.array([get_value(zid, chunk_id, nearest_point) for zid in zarr_ids]) - 273.15)

[13.75 10.75 10.75  9.    7.75  7.5   7.5   6.25  4.75  4.25  4.25  4.  ]


In [15]:
# forecast example
fcst_zarr_id = ZarrId(
                run_hour=datetime.datetime(2023, 11, 7, 20),
                level_type="sfc",
                var_level="surface",
                var_name="PRES",
                model_type="fcst"
                )
zarr_ids = [dataclasses.replace(fcst_zarr_id, run_hour=zarr_id.run_hour + datetime.timedelta(hours=time_delta))
           for time_delta in range(10)]
[get_value(zid, chunk_id, nearest_point) for zid in zarr_ids]

[array([83150., 83130., 83150., 83180., 83140., 83180., 83190., 83210.,
        83230., 83240., 83240., 83250., 83270., 83290., 83310., 83320.,
        83360., 83330., 83290., 83260., 83210., 83200., 83180., 83210.,
        83260., 83280., 83290., 83290., 83360., 83550., 83650., 83690.,
        83740., 83800., 83830., 83910., 84010., 84080., 84160., 84200.,
        84240., 84250., 84230., 84240., 84250., 84280., 84290., 84300.],
       dtype=float32),
 array([83160., 83100., 83140., 83140., 83200., 83160., 83200., 83250.,
        83270., 83270., 83250., 83260., 83280., 83320., 83380., 83350.,
        83280., 83230.], dtype=float32),
 array([83140., 83140., 83120., 83190., 83180., 83220., 83270., 83270.,
        83260., 83260., 83290., 83290., 83290., 83310., 83320., 83240.,
        83220., 83200.], dtype=float32),
 array([83110., 83100., 83210., 83240., 83260., 83260., 83270., 83250.,
        83290., 83290., 83290., 83290., 83270., 83300., 83290., 83240.,
        83190., 83200.], dtype

In [16]:
lat_top = 39
lat_bottom = 34
lon_top = -107
lon_bottom = -110 # Four Corners region

def check_boundaries(data):
    return (lat_bottom < data.latitude) & (data.latitude < lat_top) & (
        lon_bottom < data.longitude) & (data.longitude < lon_top)

area = chunk_index.where(check_boundaries, drop=True)
area

KeyError: 'Indexing with a boolean dask array is not allowed. This will result in a dask array of unknown shape. Such arrays are unsupported by Xarray.Please compute the indexer first using .compute()'

In [None]:
def get_unique(data):
    # We have to implement our own "unique" logic since missing values are NaN (a float) and the rest are string
    data = data.fillna(None).values.flatten()
    data = data[data != None]
    return np.unique(data)

chunk_ids = get_unique(area.chunk_id)

In [None]:
def get_chunk(zarr_id, chunk_id):
    # retrieve data as before
    compressed_data = retrieve_object(s3, create_s3_chunk_url(zarr_id, chunk_id))
    chunk_data = decompress_chunk(zarr_id, compressed_data)
    
    # combine retrieved data with the chunk grid
    chunk_xarray = chunk_index.where(lambda x: x.chunk_id == chunk_id, drop=True)
    dimensions = ("y", "x") if zarr_id.model_type == "anl" else ("time", "y", "x")
    chunk_xarray[zarr_id.var_name] = (dimensions, chunk_data)
    return chunk_xarray

def get_chunks_combined(zarr_id, chunk_ids):
    chunks = [get_chunk(zarr_id, chunk_id) for chunk_id in chunk_ids]
    return xr.merge(chunks)

    
data = get_chunks_combined(zarr_id, chunk_ids)
data

In [None]:
data.where(check_boundaries, drop=True)


In [None]:
data.TMP.plot()


In [None]:
start = datetime.datetime(2018, 1, 1, 0)
times = [start + datetime.timedelta(weeks=week_delta) for week_delta in range(2)]

zarr_ids = [dataclasses.replace(zarr_id, run_hour=time) for time in times]

def get_data(zarr_ids, chunk_ids, is_forecast):
    datasets = []
    for zarr_id in zarr_ids:
        data = get_chunks_combined(zarr_id, chunk_ids)
        new_time_dimension = "run_time" if is_forecast else "time"
        data[new_time_dimension] = zarr_id.run_hour
        datasets.append(data)
    ds = xr.concat(datasets, dim=new_time_dimension, combine_attrs="override")
    return ds
    
get_data(zarr_ids, chunk_ids, False)