## ITS_LIVE Serverless Query

The serverless search for ITS_LIVE takes advantage of the rustac package, it uses static geoparquet files and queries them with DuckDB.
In order to make this search faster, the ITS_LIVE collection has been spatially partitioned in lat-lon squares and Uber's H3 cells. The search uses `get_overlapping_grid_names()` to retrieve the spatial partitions that overlap with a geojson polygon so we don't have to scan the whole collection.

This notebook uses 2 approaches, one uses rustac and the STAC API search notation, which is simpler and more user friendly, and a DuckDB only query, which as of now is not totally compelte and doesn't serialize each row into a STAC item. Using DuckDB directly is faster as we avoid the serialization and retrieve only the assets' links.

In [1]:
import json
import logging
import os
import math

import rustac
import duckdb
import s3fs

# from pqdm.threads import pqdm
from shapely.geometry import shape, box

logging.basicConfig()
logger = logging.getLogger()
logger.setLevel(logging.INFO)


# Connect to DuckDB
con = duckdb.connect()

# Load spatial extension required for the spatial queries
con.execute("INSTALL spatial")
con.execute("LOAD spatial")


colombia = {
  "type": "Feature",
  "geometry": {
    "type": "Polygon",
    "coordinates": [
          [
            [
              -76.8118477368562,
              10.725951506531445
            ],
            [
              -76.8118477368562,
              4.325087420870503
            ],
            [
              -69.60861905318234,
              4.325087420870503
            ],
            [
              -69.60861905318234,
              10.725951506531445
            ],
            [
              -76.8118477368562,
              10.725951506531445
            ]
          ]
        ]
  },
  "properties": {}
}

hma = {
  "type": "Feature",
  "properties": {},
  "geometry": {
    "type": "Polygon",
    "coordinates": [
      [
        [83.221436, 31.503629],
        [84.968262, 31.503629],
        [84.968262, 32.833443],
        [83.221436, 32.833443],
        [83.221436, 31.503629]
      ]
    ]
  }
}

greenland = {
  "type": "Feature",
  "properties": {},
  "geometry": {
    "type": "Polygon",
    "coordinates": [
    [
        [-45.05, 61.93527564358849],
        [-44.537947278569234, 61.93439779524844],
        [-44.07595464722729, 61.931764440940775],
        [-43.61408214918781, 61.927376152621925],
        [-43.1523897340054, 61.92123388294972],
        [-43.13737905069332, 62.13891672639419],
        [-43.122122552714636, 62.35679581675177],
        [-43.10661415425373, 62.57486983167596],
        [-43.09084756700362, 62.793137439704125],
        [-43.56790381583535, 62.79950724447399],
        [-44.04515874612781, 62.804058153589295],
        [-44.52254622269042, 62.80678911869741],
        [-45.05, 62.80769951036124],
        [-45.05, 62.5892987291677],
        [-45.05, 62.37109363832143],
        [-45.05, 62.153085518672796],
        [-45.05, 61.93527564358849]]
    ]
  }
}

### Serverless Search logic

In [2]:
def get_overlapping_grid_names(geojson_geometry: dict = {},
                               base_href: str = "s3://its-live-data/test-space/stac/geoparquet/latlon",
                               partition_type: str = "latlon",
                               resolution: int = 2,
                               overlap: str = "overlap"):
    """
    Generates a list of S3 path prefixes corresponding to spatial grid tiles that overlap
    with the provided GeoJSON geometry. These paths are intended for discovering Parquet files
    in a spatially partitioned STAC dataset.

    This is a workaround: ideally, spatial filtering should be handled within the Parquet metadata
    or using spatial indices rather than inferring intersecting tiles manually.

    Parameters:
    ----------
    geojson_geometry : dict, optional
        A GeoJSON geometry dictionary specifying the spatial region of interest.
        The function will find grid cells (by centroid) that intersect with this geometry.
    base_href : str, optional
        The base S3 path where partitioned STAC data is stored. The function will append
        grid identifiers and mission names to this prefix.
    partition_type : str, optional
        Type of partitioning used. Supports:
        - "latlon": Fixed 10x10 degree lat/lon grids with cell names like "N60W040"
        - "h3": H3 hexagonal grid system using resolution and overlap
    resolution : int, optional
        Only used if `partition_type` is "h3". Specifies the resolution of the H3 hex cells.
    overlap : str, optional
        Only used if `partition_type` is "h3". Passed to the `h3shape_to_cells_experimental` function
        to control overlap behavior.

    Returns:
    -------
    List[str]
        A list of valid S3-style path prefixes (with wildcards) that point to `.parquet` files
        under spatial partitions overlapping the input geometry.
    
    """
    if partition_type == "latlon":
        # ITS_LIVE uses a fixed 10 by 10 grid  (centroid as name for the cell e.g. N60W040)
        def lat_prefix(lat):
            return f"N{abs(lat):02d}" if lat >= 0 else f"S{abs(lat):02d}"

        def lon_prefix(lon):
            return f"E{abs(lon):03d}" if lon >= 0 else f"W{abs(lon):03d}"
            
        geom = shape(geojson_geometry)
        missions = ["landsatOLI", "sentinel1", "sentinel2"]
        
        if not geom.is_valid:
            geom = geom.buffer(0)
    
        minx, miny, maxx, maxy = geom.bounds
    
        # Center-based grid! 
        lon_center_start = int(math.floor((minx - 5) / 10.0)) * 10
        lon_center_end   = int(math.ceil((maxx + 5) / 10.0)) * 10
        lat_center_start = int(math.floor((miny - 5) / 10.0)) * 10
        lat_center_end   = int(math.ceil((maxy + 5) / 10.0)) * 10
        
        grids = set()
        for lon_c in range(lon_center_start, lon_center_end + 1, 10):
            for lat_c in range(lat_center_start, lat_center_end + 1, 10):
                tile = box(lon_c - 5, lat_c - 5, lon_c + 5, lat_c + 5)
                if geom.intersects(tile):
                    name = f"{lat_prefix(lat_c)}{lon_prefix(lon_c)}"
                    grids.add(name)
                    
        prefixes = [f"{base_href}/{p}/{i}" for p in missions for i in list(grids)]
        search_prefixes = [f"{path}/**/*.parquet" for path in prefixes if path_exists(path)]       
        return search_prefixes
    elif partition_type=="h3":
        import h3
        grids_hex = h3.h3shape_to_cells_experimental(h3.geo_to_h3shape(geojson_geometry), resolution, overlap)
        grids = [int(hs, 16) for hs in grids_hex]
        prefixes = [f"{base_href}/{p}" for p in grids]
        search_prefixes = [f"{prefix}/**/*.parquet" for prefix in prefixes if path_exists(prefix)]
        return search_prefixes
    else:
        raise NotImplementedError(f"Partition {partition_type} not implemented.")

def expr_to_sql(expr):
    """
    Transform a cql expression into SQL, I wonder if the library does it.
    """
    op = expr["op"]
    left, right = expr["args"]
    
    # Get property name if dict with "property" key, else literal
    def val_to_sql(val):
        if isinstance(val, dict) and "property" in val:
            prop = val["property"]
            if not prop.isidentifier():
                return f'"{prop}"'
            return prop
        elif isinstance(val, str):
            # quote strings
            return f"'{val}'"
        else:
            return str(val)

    left_sql = val_to_sql(left)
    right_sql = val_to_sql(right)

    # Map operators
    op_map = {
        "=": "=",
        "==": "=",
        ">=": ">=",
        "<=": "<=",
        ">": ">",
        "<": "<",
        "!=": "<>",
        "<>": "<>"
    }
    sql_op = op_map.get(op, op)
    return f"{left_sql} {sql_op} {right_sql}"

def filters_to_where(filters):
    # filters is a list of expressions combined with AND
    sql_parts = [expr_to_sql(f) for f in filters]
    return " AND ".join(sql_parts)
    
def path_exists(path: str) -> bool:
    if path.startswith("s3://"):
        fs = s3fs.S3FileSystem(anon=True)
        return fs.exists(path)
    else:
        return os.path.exists(path)

def build_cql2_filter(filters_list):
    if not filters_list:
        return None
    return filters_list[0] if len(filters_list) == 1 else {"op": "and", "args": filters_list}


def serverless_search(base_catalog_href: str = "s3://its-live-data/test-space/stac/geoparquet/latlon",
                      search_kwargs: dict = {},
                      engine: str = "rustac",
                      reduce_spatial_search=True,
                      partition_type: str = "latlon",
                      resolution: int = 2,
                      overlap: str = "overlap", 
                      asset_type: str = ".nc"):
    """
    Performs a serverless!! search over partitioned STAC catalogs stored in Parquet format for the ITS_LIVE project.

    Parameters
    ----------
    base_catalog_href : str
        Base URI of the ITS_LIVE STAC catalog or geoparquet collection. This should point to the
        root location where spatial partitions are stored (e.g. "s3://its-live-data/test-space/stac/geoparquet/latlon").
    search_kwargs : dict, optional
        Dictionary of search parameters compatible with the STAC API. Can include spatial queries
        (e.g., `intersects`) and metadata filters (e.g., `datetime`, `platform`, etc).
    engine : str, optional
        The backend engine to use for querying. Supported options:
        - "rustac": Uses the Rust STAC client (`rustac.DuckdbClient`)
        - "duckdb": Uses DuckDB SQL for querying parquet partitions
    reduce_spatial_search : bool, optional
        Whether to pre-filter the list of parquet files using overlapping spatial partitions.
        If False, all files under the base path will be searched.
    partition_type : str, optional
        The spatial partitioning scheme used. Supports:
        - "latlon": 10x10 degree tiles (default)
        - "h3": Hexagonal grid (requires `resolution` and `overlap`)
    resolution : int, optional
        Only used if `partition_type` is "h3". Defines the granularity of H3 spatial partitioning.
    overlap : str, optional
        Only used with H3 partitioning. Passed to the `h3shape_to_cells_experimental()` function
        to handle partial overlaps.
    asset_type : str, optional
        A string suffix filter to match asset HREFs (e.g., ".nc" for NetCDF files).

    Returns
    -------
    List[str]
        A list of asset URLs (typically `.nc` NetCDF files) that match the search criteria.

    """
    client = rustac.DuckdbClient()
    store = base_catalog_href

    if reduce_spatial_search:
        if "intersects" in search_kwargs:
            search_prefixes = get_overlapping_grid_names(base_href=store,
                                                         geojson_geometry=search_kwargs["intersects"],
                                                         partition_type=partition_type,
                                                         resolution=resolution,
                                                         overlap=overlap)
    else:
        if partition_type == "latlon":
            search_prefixes = [f"{store}/{mission}/**/*.parquet" for mission in ["landsatOLI", "sentinel1", "sentinel2"]]
        else:
            search_prefixes = [f"{store}/**/*.parquet"]

    print((f"Searching in {search_prefixes}"))

    filters = search_kwargs.get("filter")
    filters_sql = filters_to_where(filters)
    search_kwargs["filter"] = build_cql2_filter(filters)

    hrefs = []
    # TODO: this could run in parallel on a thread or could be passed all to DuckDB/rustac as a combined list of paths.
    # for debugging purposes querying one by one is more convenient for now.
    for prefix in search_prefixes:
        try:
            if engine == "duckdb":
                # TODO: make it more flexible
                print(f"Filters as SQL: {filters_sql}")
                geojson_str = json.dumps(search_kwargs["intersects"])
                query = f"""
                    SELECT 
                        '{prefix}' AS source_parquet,
                        assets -> 'data' ->> 'href' AS data_href
                    FROM read_parquet('{prefix}', union_by_name=true)
                    WHERE ST_Intersects(
                        geometry,
                        ST_GeomFromGeoJSON('{geojson_str}')
                    ) AND {filters_sql}
                """
                items = con.execute(query).df()
                links = items["data_href"].to_list()
                hrefs.extend(links)
            elif engine == "rustac":
                # can we use include to only bring the asset links?
                items = client.search(prefix, **search_kwargs)
                for item in items:
                    for asset in item["assets"].values():
                        if "data" in asset["roles"] and asset["href"].endswith(".nc"):
                            hrefs.append(asset["href"])
            else:
                raise NotImplementedError(f"Not a valid query engine: {engine}")
            print(f"Prefx: {prefix} items found: {len(items)}")
        except Exception as e:
            print(f"Error while searching in {prefix}: {e}")
        
    return sorted(list(set(hrefs)))

In [3]:
%%time

filters = [
    {"op": ">=", "args": [{"property": "percent_valid_pixels"}, 1]},
    {'op': '=', 'args': [{'property': 'proj:code'}, 'EPSG:3413']}
]

# filters = cql2.parse_text("percent_valid_pixels>=1 AND proj:code='EPSG:3413'").to_json() 

search_kwargs = {
    "intersects": greenland["geometry"], # <- has to be in lat lon 
    # "datetime": "1980-01-01T00:00:00Z/2025-12-31T23:59:59Z",
    "filter": filters
}

catalog_base_href = "s3://its-live-data/test-space/stac/geoparquet/latlon"
# catalog_base_href = "../catalog/h3v2r1"
results = serverless_search(catalog_base_href, search_kwargs=search_kwargs)
len(results)

Searching in ['s3://its-live-data/test-space/stac/geoparquet/latlon/landsatOLI/N60W040/**/*.parquet', 's3://its-live-data/test-space/stac/geoparquet/latlon/landsatOLI/N60W050/**/*.parquet', 's3://its-live-data/test-space/stac/geoparquet/latlon/sentinel1/N60W040/**/*.parquet', 's3://its-live-data/test-space/stac/geoparquet/latlon/sentinel1/N60W050/**/*.parquet', 's3://its-live-data/test-space/stac/geoparquet/latlon/sentinel2/N60W040/**/*.parquet', 's3://its-live-data/test-space/stac/geoparquet/latlon/sentinel2/N60W050/**/*.parquet']
Prefx: s3://its-live-data/test-space/stac/geoparquet/latlon/landsatOLI/N60W040/**/*.parquet items found: 26470
Prefx: s3://its-live-data/test-space/stac/geoparquet/latlon/landsatOLI/N60W050/**/*.parquet items found: 0
Prefx: s3://its-live-data/test-space/stac/geoparquet/latlon/sentinel1/N60W040/**/*.parquet items found: 4029
Prefx: s3://its-live-data/test-space/stac/geoparquet/latlon/sentinel1/N60W050/**/*.parquet items found: 0
Prefx: s3://its-live-data/tes

57334

In [None]:
results[0]

## DuckDB queries using Uber's H3 grid

In [4]:
%%time

filters = [
    {"op": ">=", "args": [{"property": "percent_valid_pixels"}, 1]},
    {'op': '=', 'args': [{'property': 'proj:code'}, 'EPSG:3413']}
]

search_kwargs = {
    "intersects": greenland["geometry"], # <- has to be in lat lon 
    # "datetime": "1980-01-01T00:00:00Z/2025-12-31T23:59:59Z",
    "filter": filters
}

catalog_base_href = "s3://its-live-data/test-space/stac/geoparquet/h3r2" # <- base catalog url changes to use the h3 grids!
duck_results = serverless_search(catalog_base_href,
                            search_kwargs=search_kwargs,
                            engine="duckdb",
                            partition_type = "h3",
                            resolution = 2,
                            overlap = "bbox_overlap")
len(duck_results)

Searching in ['s3://its-live-data/test-space/stac/geoparquet/h3r2/585580101744197631/**/*.parquet', 's3://its-live-data/test-space/stac/geoparquet/h3r2/585587798325592063/**/*.parquet', 's3://its-live-data/test-space/stac/geoparquet/h3r2/585589997348847615/**/*.parquet', 's3://its-live-data/test-space/stac/geoparquet/h3r2/585945689360433151/**/*.parquet']
Filters as SQL: percent_valid_pixels >= 1 AND "proj:code" = 'EPSG:3413'
Prefx: s3://its-live-data/test-space/stac/geoparquet/h3r2/585580101744197631/**/*.parquet items found: 0
Filters as SQL: percent_valid_pixels >= 1 AND "proj:code" = 'EPSG:3413'


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

Prefx: s3://its-live-data/test-space/stac/geoparquet/h3r2/585587798325592063/**/*.parquet items found: 3813
Filters as SQL: percent_valid_pixels >= 1 AND "proj:code" = 'EPSG:3413'


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

Prefx: s3://its-live-data/test-space/stac/geoparquet/h3r2/585589997348847615/**/*.parquet items found: 26783
Filters as SQL: percent_valid_pixels >= 1 AND "proj:code" = 'EPSG:3413'


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

Prefx: s3://its-live-data/test-space/stac/geoparquet/h3r2/585945689360433151/**/*.parquet items found: 26738
CPU times: user 8.52 s, sys: 606 ms, total: 9.13 s
Wall time: 37.6 s


57334

In [6]:
%%time

catalog_base_href = "s3://its-live-data/test-space/stac/geoparquet/h3r1" # <- base catalog url changes again to use the h3 level 1 grids!

filters = [
    {"op": ">=", "args": [{"property": "percent_valid_pixels"}, 1]},
    {'op': '=', 'args': [{'property': 'proj:code'}, 'EPSG:3413']}
]

search_kwargs = {
    "intersects": greenland["geometry"], # <- has to be in lat lon 
    # "datetime": "1980-01-01T00:00:00Z/2025-12-31T23:59:59Z",
    "filter": filters
}

duck_results = serverless_search(catalog_base_href,
                            search_kwargs=search_kwargs,
                            engine="duckdb",
                            partition_type = "h3",
                            resolution = 1,
                            overlap = "bbox_overlap")
len(duck_results)

Searching in ['s3://its-live-data/test-space/stac/geoparquet/h3r1/581078701140082687/**/*.parquet', 's3://its-live-data/test-space/stac/geoparquet/h3r1/581087497233104895/**/*.parquet', 's3://its-live-data/test-space/stac/geoparquet/h3r1/581443739000504319/**/*.parquet']
Filters as SQL: percent_valid_pixels >= 1 AND "proj:code" = 'EPSG:3413'


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

Prefx: s3://its-live-data/test-space/stac/geoparquet/h3r1/581078701140082687/**/*.parquet items found: 0
Filters as SQL: percent_valid_pixels >= 1 AND "proj:code" = 'EPSG:3413'


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

Prefx: s3://its-live-data/test-space/stac/geoparquet/h3r1/581087497233104895/**/*.parquet items found: 31136
Filters as SQL: percent_valid_pixels >= 1 AND "proj:code" = 'EPSG:3413'


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

Prefx: s3://its-live-data/test-space/stac/geoparquet/h3r1/581443739000504319/**/*.parquet items found: 26198
CPU times: user 7.04 s, sys: 813 ms, total: 7.85 s
Wall time: 41.9 s


57334

## STAC API

In [None]:
from pystac_client import Client

def search_stac(stac_catalog, page_size=100, filter_list=[], **kwargs):
    """
    Returns only the url to the data asset, for ITS_LIVE a netcdf file.
    """
    catalog = Client.open(stac_catalog)
    search_kwargs = {
        "collections": ["itslive-granules"],
        "limit": page_size,
        **kwargs
    }

    def build_cql2_filter(filters_list):
        if not filters_list:
            return None
        return filters_list[0] if len(filters_list) == 1 else {"op": "and", "args": filters_list}
    if filter_list:
        filters = build_cql2_filter(filter_list)
        search_kwargs["filter"] = build_cql2_filter(filters)
        search_kwargs["filter_lang"] = "cql2-json"

    search = catalog.search(**search_kwargs)
    
    hrefs = []
    pages_count = 0
    proj_matches = 0
    for page in search.pages():
        pages_count += 1
        for item in page:
            for asset in item.assets.values():
                if "data" in asset.roles and asset.href.endswith(".nc"):
                    hrefs.append(asset.href)
    print(f"Requested pages: {pages_count}")
    return hrefs

In [None]:
%%time

filters = [
    {"op": ">=", "args": [{"property": "percent_valid_pixels"}, 1]},
    {"op": "=", "args": [{"property": "proj:code"}, "EPSG:3413"]}
]

items = search_stac("https://stac.itslive.cloud/",
                    # datetime="2010-06-01/2023-06-30",
                    intersects=greenland["geometry"],
                    filter_list=filters,
                    page_size=2000)
len(items)

## DuckDB queries

Because of its size, queryng all the catalog(without spatial punning) is slow over the network.
Can we make this more efficient if we use a global dataset instead of these partitioned parquets? probably. 

In [7]:
%%time

catalog_url = "s3://its-live-data/test-space/stac/geoparquet/latlon"

duckdb.sql(f"select count(*) from read_parquet('{catalog_url}/**/*.parquet', union_by_name=true)")

┌──────────────┐
│ count_star() │
│    int64     │
├──────────────┤
│     38158260 │
└──────────────┘

This query will use a local copy

In [9]:
%%time


catalog_url = "../catalog/geoparquet"

filters = [
    {"op": ">=", "args": [{"property": "percent_valid_pixels"}, 1]},
    {"op": "=", "args": [{"property": "proj:code"}, "EPSG:3413"]}
]

geojson_str = json.dumps(greenland["geometry"])

filters_sql = filters_to_where(filters)

query = f"""
    SELECT 
        assets -> 'data' ->> 'href' AS data_href
    FROM read_parquet('{catalog_url}/**/*.parquet', union_by_name=true)
    WHERE ST_Intersects(
        geometry,
        ST_GeomFromGeoJSON('{geojson_str}')
    ) AND {filters_sql}
"""

items = con.execute(query).df()
links = items["data_href"].to_list()
len(links)

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

CPU times: user 28.6 s, sys: 3.29 s, total: 31.9 s
Wall time: 6.09 s


57334