In [30]:
from typing import Iterable

from pystac import Collection
from pystac_client import Client
from pystac_client.stac_api_io import StacApiIO
from urllib3 import Retry

In [23]:
# Refer to pystac-client docs:
# https://pystac-client.readthedocs.io/en/stable/usage.html

retry = Retry(
    total=5, backoff_factor=1, status_forcelist=[502, 503, 504], allowed_methods=None
)
stac_api_io = StacApiIO(max_retries=retry)

catalog = Client.open(
    "http://localhost:8000", stac_io=stac_api_io
)

# Checking if the API supports item searching
item_search = catalog.conforms_to("ITEM_SEARCH")
print("Item search:", item_search)

# Get all available collections in STAC API
for collection in catalog.get_all_collections():
    print(collection.id)

Item search: True
canari
icenet


In [56]:
class STAC:
    def __init__(self, STAC_FASTAPI_URL: str) -> None:
        self._url = STAC_FASTAPI_URL
        self._catalog = Client.open(STAC_FASTAPI_URL)

    def get_all_collection_ids(self, resolve: bool = False) -> Iterable[Collection] | tuple[Collection]:
        # Get all available collections in STAC API
        collections = self._catalog.get_all_collections()
        return tuple(collections) if resolve else collections

    def get_all_forecast_init_dates(self, collection_id):
        search = self._catalog.search(collections=[collection_id], max_items=None)
        datetimes = sorted({item.datetime for item in search.items() if item.datetime is not None})
        return datetimes


In [57]:
STAC_FASTAPI_URL = "http://localhost:8000"
collection_id = "icenet"

stac = STAC(STAC_FASTAPI_URL)
collections = stac.get_all_collection_ids(resolve=True)
forecast_init_dates = stac.get_all_forecast_init_dates(collection_id)

In [58]:
print(forecast_init_dates)

[datetime.datetime(2025, 1, 2, 0, 0, tzinfo=tzutc()), datetime.datetime(2025, 1, 3, 0, 0, tzinfo=tzutc()), datetime.datetime(2025, 1, 4, 0, 0, tzinfo=tzutc()), datetime.datetime(2025, 1, 5, 0, 0, tzinfo=tzutc()), datetime.datetime(2025, 1, 6, 0, 0, tzinfo=tzutc()), datetime.datetime(2025, 1, 7, 0, 0, tzinfo=tzutc()), datetime.datetime(2025, 1, 8, 0, 0, tzinfo=tzutc()), datetime.datetime(2025, 1, 9, 0, 0, tzinfo=tzutc()), datetime.datetime(2025, 1, 10, 0, 0, tzinfo=tzutc()), datetime.datetime(2025, 1, 11, 0, 0, tzinfo=tzutc()), datetime.datetime(2025, 1, 12, 0, 0, tzinfo=tzutc()), datetime.datetime(2025, 1, 13, 0, 0, tzinfo=tzutc()), datetime.datetime(2025, 1, 14, 0, 0, tzinfo=tzutc()), datetime.datetime(2025, 1, 15, 0, 0, tzinfo=tzutc()), datetime.datetime(2025, 1, 16, 0, 0, tzinfo=tzutc()), datetime.datetime(2025, 1, 17, 0, 0, tzinfo=tzutc()), datetime.datetime(2025, 1, 18, 0, 0, tzinfo=tzutc()), datetime.datetime(2025, 1, 20, 0, 0, tzinfo=tzutc()), datetime.datetime(2025, 1, 21, 0, 0

In [53]:
for i, collection in enumerate(collections):
    search = stac._catalog.search(collections=[collection.id], max_items=None)
    # Extract all unique datetimes
    datetimes = sorted({item.datetime for item in search.items() if item.datetime is not None})
    print(i, datetimes)


0 [datetime.datetime(2000, 2, 1, 0, 0, tzinfo=tzutc())]
1 [datetime.datetime(2025, 1, 2, 0, 0, tzinfo=tzutc()), datetime.datetime(2025, 1, 3, 0, 0, tzinfo=tzutc()), datetime.datetime(2025, 1, 4, 0, 0, tzinfo=tzutc()), datetime.datetime(2025, 1, 5, 0, 0, tzinfo=tzutc()), datetime.datetime(2025, 1, 6, 0, 0, tzinfo=tzutc()), datetime.datetime(2025, 1, 7, 0, 0, tzinfo=tzutc()), datetime.datetime(2025, 1, 8, 0, 0, tzinfo=tzutc()), datetime.datetime(2025, 1, 9, 0, 0, tzinfo=tzutc()), datetime.datetime(2025, 1, 10, 0, 0, tzinfo=tzutc()), datetime.datetime(2025, 1, 11, 0, 0, tzinfo=tzutc()), datetime.datetime(2025, 1, 12, 0, 0, tzinfo=tzutc()), datetime.datetime(2025, 1, 13, 0, 0, tzinfo=tzutc()), datetime.datetime(2025, 1, 14, 0, 0, tzinfo=tzutc()), datetime.datetime(2025, 1, 15, 0, 0, tzinfo=tzutc()), datetime.datetime(2025, 1, 16, 0, 0, tzinfo=tzutc()), datetime.datetime(2025, 1, 17, 0, 0, tzinfo=tzutc()), datetime.datetime(2025, 1, 18, 0, 0, tzinfo=tzutc()), datetime.datetime(2025, 1, 20, 