In [1]:
%pip install icechunk "xarray>=2024.11.0"

Collecting icechunk
  Downloading icechunk-0.1.0a12-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (11 kB)
Collecting xarray>=2024.11.0
  Downloading xarray-2025.1.1-py3-none-any.whl.metadata (11 kB)
Collecting zarr>=3 (from icechunk)
  Downloading zarr-3.0.1-py3-none-any.whl.metadata (9.5 kB)
Collecting numcodecs>=0.14 (from numcodecs[crc32c]>=0.14->zarr>=3->icechunk)
  Downloading numcodecs-0.15.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (2.9 kB)
Collecting deprecated (from numcodecs>=0.14->numcodecs[crc32c]>=0.14->zarr>=3->icechunk)
  Downloading Deprecated-1.2.15-py2.py3-none-any.whl.metadata (5.5 kB)
Collecting crc32c>=2.7 (from numcodecs[crc32c]>=0.14->zarr>=3->icechunk)
  Downloading crc32c-2.7.1-cp312-cp312-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (7.3 kB)
Downloading icechunk-0.1.0a12-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (11.3 MB)
[2K   [90m━━━━━━━━━━━━

In [1]:
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta

import earthaccess
import odc.stac
import pyproj
import pystac
import pystac_client
import xarray as xr
from dask.diagnostics import ProgressBar
from icechunk import Repository, Storage
from rasterio.enums import Resampling
from shapely.geometry import box
from shapely.ops import transform

In [2]:
CRS_STRING = "epsg:5070"
EPSG = pyproj.CRS.from_string(CRS_STRING).to_epsg()

START_DATE = datetime(year=2024, month=1, day=1)
END_DATE = datetime(year=2024, month=12, day=31, hour=23)
HLS_NODATA = -9999
HLS_COLLECTION_IDS = ["HLSL30_2.0", "HLSS30_2.0"]

URL_PREFIX = "https://data.lpdaac.earthdatacloud.nasa.gov/"

def convert_to_s3_uri(url: str) -> str:
    return url.replace(
        URL_PREFIX,
        "s3://"
    )

# 100 km bounding box
xmin, ymin = 150000, 2650000
bbox = (xmin, ymin, xmin + 200000, ymin + 200000)
aoi = box(*bbox)

# STAC items store bounding box info in epsg:4326
transformer_4326 = pyproj.Transformer.from_crs(
    crs_from=CRS_STRING,
    crs_to="epsg:4326",
    always_xy=True,
)

bbox_4326 = transform(transformer_4326.transform, aoi).bounds

In [3]:
auth = earthaccess.login(strategy="interactive", persist=True)
s3_creds = auth.get_s3_credentials("LPDAAC")

odc.stac.configure_rio(
    cloud_defaults=True,
    aws=dict(
        aws_access_key_id=s3_creds["accessKeyId"],
        aws_secret_access_key=s3_creds["secretAccessKey"],
        aws_session_token=s3_creds["sessionToken"],
        region_name="us-west-2",
    ),
)

In [4]:
item_collection_json = "hls_items.json"

load_from_json = True

if load_from_json:
    stac_items = pystac.ItemCollection.from_file(item_collection_json)
else:
    catalog = pystac_client.Client.open("https://cmr.earthdata.nasa.gov/stac/LPCLOUD")
    
    all_items = []
    current = START_DATE
    while current <= END_DATE:
        print(current)
        # Calculate the end of the current month
        # Move to first of next month and subtract one second
        next_month = (current + relativedelta(months=1))
        month_end = next_month - timedelta(seconds=1)
        
        # If this is the last interval and it exceeds end_date,
        # use end_date as the interval end
        if month_end > END_DATE:
            month_end = END_DATE
            
        monthly_stac_items = catalog.search(
            collections=HLS_COLLECTION_IDS,
            bbox=bbox_4326,
            datetime=[current, month_end],
            limit=1000,
        ).item_collection()
    
        all_items.extend(monthly_stac_items.items)
        
        current = next_month
    
    stac_items = pystac.ItemCollection(all_items)
    
    stac_items.save_object(item_collection_json)

print(len(stac_items))

1750


In [14]:
for item in stac_items:
    for asset in item.assets.values():
        if asset.href.startswith(URL_PREFIX):
            asset.href = convert_to_s3_uri(asset.href)

def group_by_sensor_and_date(
    item: pystac.Item,
    parsed: odc.stac.ParsedItem,
    idx: int,
) -> str:
    id_split = item.id.split(".")
    sensor = id_split[1]
    day = id_split[3][:7]

    return f"{sensor}_{day}"

HLS_ODC_STAC_CONFIG = {
    "HLSL30_2.0": {
        "assets": {
            "*": {
                "nodata": HLS_NODATA,
                "data_type": "int16",
            },
            "Fmask": {
                "nodata": 0,
                "data_type": "uint8",
            },
        },
        "aliases": {
            "coastal_aerosol": "B01",
            "blue": "B02",
            "green": "B03",
            "red": "B04",
            "nir_narrow": "B05",
            "swir_1": "B06",
            "swir_2": "B07",
            "cirrus": "B09",
            "thermal_infrared_1": "B10",
            "thermal": "B11",
        },
    },
    "HLSS30_2.0": {
        "assets": {
            "*": {
                "nodata": HLS_NODATA,
                "dtype": "int16",
            },
            "Fmask": {
                "nodata": 0,
                "dtype": "uint8",
            },
        },
        "aliases": {
            "coastal_aerosol": "B01",
            "blue": "B02",
            "green": "B03",
            "red": "B04",
            "red_edge_1": "B05",
            "red_edge_2": "B06",
            "red_edge_3": "B07",
            "nir_broad": "B08",
            "nir_narrow": "B8A",
            "water_vapor": "B09",
            "cirrus": "B10",
            "swir_1": "B11",
            "swir_2": "B12",
        },
    },
}

# these are the ones that we are going to use
BANDS = ["red", "green", "blue", "Fmask"]

stack = odc.stac.load(
    stac_items,
    stac_cfg=HLS_ODC_STAC_CONFIG,
    bands=BANDS,
    crs=CRS_STRING,
    resolution=30,
    chunks={"time": 1, "x": 500, "y": 500},
    groupby=group_by_sensor_and_date,
    x=(bbox[0], bbox[2]),
    y=(bbox[1], bbox[3]),
    patch_url=convert_to_s3_uri,
).sortby("time")

display(stack)

Unnamed: 0,Array,Chunk
Bytes,28.48 GiB,488.28 kiB
Shape,"(344, 6667, 6667)","(1, 500, 500)"
Dask graph,67424 chunks in 4 graph layers,67424 chunks in 4 graph layers
Data type,int16 numpy.ndarray,int16 numpy.ndarray
"Array Chunk Bytes 28.48 GiB 488.28 kiB Shape (344, 6667, 6667) (1, 500, 500) Dask graph 67424 chunks in 4 graph layers Data type int16 numpy.ndarray",6667  6667  344,

Unnamed: 0,Array,Chunk
Bytes,28.48 GiB,488.28 kiB
Shape,"(344, 6667, 6667)","(1, 500, 500)"
Dask graph,67424 chunks in 4 graph layers,67424 chunks in 4 graph layers
Data type,int16 numpy.ndarray,int16 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,28.48 GiB,488.28 kiB
Shape,"(344, 6667, 6667)","(1, 500, 500)"
Dask graph,67424 chunks in 4 graph layers,67424 chunks in 4 graph layers
Data type,int16 numpy.ndarray,int16 numpy.ndarray
"Array Chunk Bytes 28.48 GiB 488.28 kiB Shape (344, 6667, 6667) (1, 500, 500) Dask graph 67424 chunks in 4 graph layers Data type int16 numpy.ndarray",6667  6667  344,

Unnamed: 0,Array,Chunk
Bytes,28.48 GiB,488.28 kiB
Shape,"(344, 6667, 6667)","(1, 500, 500)"
Dask graph,67424 chunks in 4 graph layers,67424 chunks in 4 graph layers
Data type,int16 numpy.ndarray,int16 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,28.48 GiB,488.28 kiB
Shape,"(344, 6667, 6667)","(1, 500, 500)"
Dask graph,67424 chunks in 4 graph layers,67424 chunks in 4 graph layers
Data type,int16 numpy.ndarray,int16 numpy.ndarray
"Array Chunk Bytes 28.48 GiB 488.28 kiB Shape (344, 6667, 6667) (1, 500, 500) Dask graph 67424 chunks in 4 graph layers Data type int16 numpy.ndarray",6667  6667  344,

Unnamed: 0,Array,Chunk
Bytes,28.48 GiB,488.28 kiB
Shape,"(344, 6667, 6667)","(1, 500, 500)"
Dask graph,67424 chunks in 4 graph layers,67424 chunks in 4 graph layers
Data type,int16 numpy.ndarray,int16 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,14.24 GiB,244.14 kiB
Shape,"(344, 6667, 6667)","(1, 500, 500)"
Dask graph,67424 chunks in 4 graph layers,67424 chunks in 4 graph layers
Data type,uint8 numpy.ndarray,uint8 numpy.ndarray
"Array Chunk Bytes 14.24 GiB 244.14 kiB Shape (344, 6667, 6667) (1, 500, 500) Dask graph 67424 chunks in 4 graph layers Data type uint8 numpy.ndarray",6667  6667  344,

Unnamed: 0,Array,Chunk
Bytes,14.24 GiB,244.14 kiB
Shape,"(344, 6667, 6667)","(1, 500, 500)"
Dask graph,67424 chunks in 4 graph layers,67424 chunks in 4 graph layers
Data type,uint8 numpy.ndarray,uint8 numpy.ndarray


In [15]:
hls_mask_bitfields = [1, 2, 3]  # cloud shadow, adjacent to cloud shadow, cloud
hls_bitmask = 0
for field in hls_mask_bitfields:
    hls_bitmask |= 1 << field

fmask = stack["Fmask"].astype("uint16")
mask = fmask & hls_bitmask

nodata_mask = stack["red"] == HLS_NODATA

cloud_free = stack.where(mask == 0).where(nodata_mask == 0)

monthly_cloud_free = cloud_free[["red", "green", "blue"]].resample(time="ME").median(skipna=True)
monthly_cloud_free

Unnamed: 0,Array,Chunk
Bytes,1.99 GiB,0.95 MiB
Shape,"(12, 6667, 6667)","(1, 500, 500)"
Dask graph,2352 chunks in 76 graph layers,2352 chunks in 76 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 1.99 GiB 0.95 MiB Shape (12, 6667, 6667) (1, 500, 500) Dask graph 2352 chunks in 76 graph layers Data type float32 numpy.ndarray",6667  6667  12,

Unnamed: 0,Array,Chunk
Bytes,1.99 GiB,0.95 MiB
Shape,"(12, 6667, 6667)","(1, 500, 500)"
Dask graph,2352 chunks in 76 graph layers,2352 chunks in 76 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,1.99 GiB,0.95 MiB
Shape,"(12, 6667, 6667)","(1, 500, 500)"
Dask graph,2352 chunks in 80 graph layers,2352 chunks in 80 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 1.99 GiB 0.95 MiB Shape (12, 6667, 6667) (1, 500, 500) Dask graph 2352 chunks in 80 graph layers Data type float32 numpy.ndarray",6667  6667  12,

Unnamed: 0,Array,Chunk
Bytes,1.99 GiB,0.95 MiB
Shape,"(12, 6667, 6667)","(1, 500, 500)"
Dask graph,2352 chunks in 80 graph layers,2352 chunks in 80 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,1.99 GiB,0.95 MiB
Shape,"(12, 6667, 6667)","(1, 500, 500)"
Dask graph,2352 chunks in 80 graph layers,2352 chunks in 80 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 1.99 GiB 0.95 MiB Shape (12, 6667, 6667) (1, 500, 500) Dask graph 2352 chunks in 80 graph layers Data type float32 numpy.ndarray",6667  6667  12,

Unnamed: 0,Array,Chunk
Bytes,1.99 GiB,0.95 MiB
Shape,"(12, 6667, 6667)","(1, 500, 500)"
Dask graph,2352 chunks in 80 graph layers,2352 chunks in 80 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray


In [16]:
storage_config = Storage.new_local_filesystem("icechunk-test-30m")
repo = Repository.open_or_create(storage_config)

In [None]:
for i, t in enumerate(monthly_cloud_free.time.values):
    print(t)
    session = repo.writable_session("main")

    ds = monthly_cloud_free.sel(time=slice(t, t))
    ds.to_zarr(session.store, zarr_format=3, consolidated=False, append_dim="time" if i > 0 else None)
    session.commit(f"add {t} data to store")

2024-01-31T00:00:00.000000000


  _reproject(


2024-02-29T00:00:00.000000000
