# STAC Client Walkthrough

This notebook mirrors the usage patterns from the [PySTAC Client documentation](https://pystac-client.readthedocs.io/en/stable/) against the EDITO data lake. It shows how to open a catalog, traverse links, and query items programmatically.

## Prerequisites
- Activate the project virtual environment (`source .venv/bin/activate.fish`).
- Install `pystac-client` (already listed in `requirements.txt`).
- Export `EDITO_API_TOKEN` or `EDITO_ACCESS_TOKEN`, or drop a fallback token into `MANUAL_TOKEN` below.

In [1]:
# If you are running this notebook standalone, uncomment the next line
%pip install -q -r "../requirements.txt"

Note: you may need to restart the kernel to use updated packages.


In [2]:
import json
import os
from collections import deque
from typing import Dict, List, Optional

import pandas as pd
from dotenv import load_dotenv
from pystac import STACError
from pystac_client import Client
from pystac_client.exceptions import APIError, ParametersError

In [3]:
load_dotenv()

API_BASE = os.getenv("EDITO_DATA_BASE_URL", "https://api.dive.edito.eu/data").rstrip("/")

TOKEN = (
    os.getenv("EDITO_API_TOKEN")
    or os.getenv("EDITO_ACCESS_TOKEN")
    or MANUAL_TOKEN
)
TOKEN_SOURCE = (
    "EDITO_API_TOKEN"
    if os.getenv("EDITO_API_TOKEN")
    else "EDITO_ACCESS_TOKEN"
    if os.getenv("EDITO_ACCESS_TOKEN")
    else "MANUAL_TOKEN"
)

if not TOKEN:
    raise RuntimeError(
        "No EDITO token detected. Export EDITO_API_TOKEN / EDITO_ACCESS_TOKEN "
        "or set MANUAL_TOKEN in your environment before running this notebook."
    )

if TOKEN_SOURCE == "MANUAL_TOKEN":
    print("⚠️ Using MANUAL_TOKEN baked into the notebook; refresh it if you encounter 401s.")

auth_headers = {"Authorization": f"Bearer {TOKEN}"}
stac_root = API_BASE
client = Client.open(stac_root, headers=auth_headers)
client

python-dotenv could not parse statement starting at line 2


APIError: <!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<title>Error</title>
</head>
<body>
<pre>UnauthorizedError: jwt expired<br> &nbsp; &nbsp;at new UnauthorizedError (/node_modules/express-jwt/dist/errors/UnauthorizedError.js:22:28)<br> &nbsp; &nbsp;at /node_modules/express-jwt/dist/index.js:139:38<br> &nbsp; &nbsp;at step (/node_modules/express-jwt/dist/index.js:33:23)<br> &nbsp; &nbsp;at Object.next (/node_modules/express-jwt/dist/index.js:14:53)<br> &nbsp; &nbsp;at fulfilled (/node_modules/express-jwt/dist/index.js:5:58)</pre>
</body>
</html>


## Inspect the catalog hierarchy
The tutorial explores how STAC catalogs relate to one another. The code below lists the immediate `child` links exposed by the EDITO root.

> ℹ️ PySTAC only forwards the headers you provide to the underlying STAC API[^1], so expired or missing bearer tokens propagate as `401 Unauthorized`. Make sure you refresh EDITO tokens before opening the client.
>
> [^1]: See “Working with APIs” in the [PySTAC documentation](https://pystac.readthedocs.io/en/stable/api.html#pystac-client) for details about the `headers` argument.

In [None]:
root_children = [
    {
        "title": link.title,
        "rel": link.rel,
        "href": link.href,
    }
    for link in client.get_links("child")
]
pd.set_option("display.max_colwidth", None)
pd.DataFrame(root_children)

### Explore the full catalog hierarchy
Walk breadth-first through every nested catalog so you can see how providers are organized. Adjust `EDITO_CATALOG_DEPTH` in your environment if you want to crawl deeper than the default.

In [None]:
MAX_CATALOG_DEPTH = int(os.getenv("EDITO_CATALOG_DEPTH", "1"))


def _link_target_id(link) -> Optional[str]:
    target_id = getattr(link, "target_id", None)
    if target_id:
        return target_id
    extra_fields = getattr(link, "extra_fields", None)
    if isinstance(extra_fields, dict):
        return extra_fields.get("title") or extra_fields.get("href")
    return link.title or link.href


def crawl_catalog_hierarchy(root_client: Client, max_depth: int = MAX_CATALOG_DEPTH) -> pd.DataFrame:
    """Breadth-first traversal of catalog child links up to the requested depth."""
    queue = deque([(root_client, 0, None)])
    seen_hrefs = set()
    rows: List[Dict] = []

    while queue:
        current_client, depth, parent_id = queue.popleft()
        self_href = getattr(current_client, "href", None)
        if not self_href and hasattr(current_client, "get_self_href"):
            self_href = current_client.get_self_href()

        if self_href in seen_hrefs:
            continue
        if self_href:
            seen_hrefs.add(self_href)

        child_links = list(current_client.get_links("child"))
        rows.append(
            {
                "depth": depth,
                "id": getattr(current_client, "id", None),
                "title": getattr(current_client, "title", None),
                "parent_id": parent_id,
                "child_count": len(child_links),
                "href": self_href,
            }
        )

        if depth >= max_depth:
            continue

        for link in child_links:
            link_id = _link_target_id(link)
            try:
                child_client = Client.open(link.href, headers=auth_headers)
                queue.append((child_client, depth + 1, getattr(current_client, "id", None)))
            except (APIError, ParametersError, STACError, Exception) as exc:
                rows.append(
                    {
                        "depth": depth + 1,
                        "id": link_id,
                        "title": link.title,
                        "parent_id": getattr(current_client, "id", None),
                        "child_count": None,
                        "href": link.href,
                        "error": str(exc),
                    }
                )

    hierarchy_df = pd.DataFrame(rows)
    if not hierarchy_df.empty:
        hierarchy_df = hierarchy_df.sort_values(["depth", "id"], na_position="last").reset_index(drop=True)
    return hierarchy_df

catalog_hierarchy_df = crawl_catalog_hierarchy(client)
catalog_hierarchy_df

## Enumerate collections via `Client.get_collections`
The PySTAC Client docs highlight `client.get_collections()` for retrieving collection metadata lazily. Convert the generator to a DataFrame so it is easy to scan provider details.

In [None]:
collections = list(client.get_collections())
collections_df = pd.DataFrame(
    [
        {
            "id": c.id,
            "title": c.title,
            "license": c.license,
            "providers": ", ".join(p.name for p in c.providers or []),
            "provider_roles": ", ".join("; ".join(p.roles or []) for p in c.providers or []),
            "provider_urls": ", ".join(p.url or "" for p in c.providers or []),
            "keywords": ", ".join(c.keywords or []),
            "description": c.description,
            "source_href": next((link.href for link in c.links if link.rel == "derived_from"), None),
        }
        for c in collections
    ]
)
with pd.option_context("display.max_rows", None, "display.max_colwidth", None):
    display(collections_df)

## Inspect a specific collection
Inspired by the CDSE example notebook, parameterize the collection ID you want to study and materialize its metadata so you can understand the spatial/temporal extents before querying items.

In [None]:
SPECIFIC_COLLECTION_ID = os.getenv("EDITO_SPECIFIC_COLLECTION", "SENTINEL1_EW_GRDM_1SDH")
specific_collection = next(
    (collection for collection in collections if collection.id == SPECIFIC_COLLECTION_ID),
    None,
)
if specific_collection is None:
    available_ids = sorted(collection.id for collection in collections[:20])
    raise ValueError(
        f"Collection {SPECIFIC_COLLECTION_ID} not found in the cached catalog list. "
        f"Set EDITO_SPECIFIC_COLLECTION to one of the available IDs (sample: {available_ids})."
    )

collection_summary = specific_collection.to_dict().get("summaries", {})
collection_row = {
    "id": specific_collection.id,
    "title": specific_collection.title,
    "license": specific_collection.license,
    "providers": ", ".join(provider.name for provider in specific_collection.providers or []),
    "spatial_extent": specific_collection.extent.spatial.bboxes if specific_collection.extent else None,
    "temporal_extent": specific_collection.extent.temporal.intervals if specific_collection.extent else None,
    "platform": collection_summary.get("platform"),
}
pd.DataFrame([collection_row])

### Search the collection with CDSE-style filters
Mirror the CDSE example by running a spatial/temporal search against the chosen collection. Adjust the bounding box or time range if your area of interest differs.

In [None]:
COLLECTION_SEARCH_BBOX = [-10.0, 50.0, 5.0, 60.0]
COLLECTION_SEARCH_DATETIME = "2024-01-01/2024-01-10"
COLLECTION_SEARCH_LIMIT = 5

specific_search = client.search(
    collections=[SPECIFIC_COLLECTION_ID],
    bbox=COLLECTION_SEARCH_BBOX,
    datetime=COLLECTION_SEARCH_DATETIME,
    max_items=COLLECTION_SEARCH_LIMIT,
)
specific_items = list(specific_search.get_items())
if not specific_items:
    raise ValueError(
        "No items returned for the specified collection/bbox/datetime; adjust the filters and retry."
    )

specific_item_rows: List[Dict] = []
for item in specific_items:
    props = item.to_dict().get("properties", {})
    specific_item_rows.append(
        {
            "id": item.id,
            "datetime": props.get("datetime"),
            "bbox": item.bbox,
            "asset_count": len(item.assets or {}),
        }
    )
pd.DataFrame(specific_item_rows)

## Open a provider catalog and inspect collections
Pick any catalog from above (for example `copernicus-marine-products`) and fetch its `collection` links.

In [None]:
TARGET_CATALOG_HREF = f"{API_BASE}/catalogs/copernicus-marine-products"
provider_client = Client.open(TARGET_CATALOG_HREF, headers=auth_headers)
collection_links = [
    {
        "id": link.target_id,
        "title": link.title,
        "href": link.href,
    }
    for link in provider_client.get_links("collection")
]
pd.DataFrame(collection_links).head(10)

## Search for CMEMS items
The tutorial demonstrates `Client.search` for filtering items. Here we look up the requested CMEMS collection IDs.

In [None]:
TARGET_COLLECTION = "NWSHELF_ANALYSISFORECAST_PHY_004_013"
search = client.search(collections=[TARGET_COLLECTION], max_items=10)
items = list(search.get_items())
if not items:
    raise ValueError("No items returned; try a different collection or relax filters.")

item_rows: List[Dict] = []
for item in items:
    props = item.to_dict().get("properties", {})
    item_rows.append(
        {
            "id": item.id,
            "collection": item.collection_id,
            "datetime": props.get("datetime"),
            "asset_keys": list(item.assets.keys()),
        }
    )
pd.DataFrame(item_rows)

### ItemCollection summary
Per the PySTAC Client guide, materialize the full STAC `ItemCollection` to inspect counts and aggregate metadata for the current search result.

In [None]:
item_collection = search.item_collection()
item_collection_dict = item_collection.to_dict()
summary = {
    "returned_features": len(item_collection_dict.get("features", [])),
    "matched": item_collection_dict.get("numberMatched"),
    "returned": item_collection_dict.get("numberReturned"),
    "bbox": item_collection_dict.get("bbox"),
}
pd.DataFrame([summary])

## Inspect assets from the first item
Mimicking the tutorial, expose the available assets, media types, and roles so you can decide what to download next.

In [None]:
first_item = items[0]
assets_df = pd.DataFrame(
    [
        {
            "asset_key": key,
            "roles": asset.roles,
            "media_type": asset.media_type,
            "href": asset.href,
        }
        for key, asset in first_item.assets.items()
    ]
)
assets_df

## Preview the full STAC item JSON
Use this when you need to inspect geometry, providers, or asset metadata in detail.

In [None]:
print(json.dumps(first_item.to_dict(), indent=2)[:2000])