The purpose of this notebook is to demonstrate how we can build a catalog/index of FIM maps and query it.

In [23]:
import os
import typer
import pystac
import pandas as pd
from pathlib import Path
from typing import Optional
from datetime import datetime
from google.cloud import storage
from google.cloud import bigquery

import sqlite3
from typing import Tuple, Optional

from rich.progress import Progress

In [19]:
def upload_to_gcs(local_file, bucket_name, dest_blob_name):
    client = storage.Client()
    bucket = client.bucket(bucket_name)
    blob = bucket.blob(dest_blob_name)
    blob.upload_from_filename(local_file)
    print(f"Uploaded to gs://{bucket_name}/{dest_blob_name}")

In [24]:
def upload_directory_to_gcs_with_progress(local_dir, bucket_name, gcs_prefix=""):
    client = storage.Client()
    bucket = client.bucket(bucket_name)

    # Collect all files to upload
    file_list = []
    for root, _, files in os.walk(local_dir):
        for file_name in files:
            local_path = os.path.join(root, file_name)
            relative_path = os.path.relpath(local_path, local_dir)
            blob_path = os.path.join(gcs_prefix, relative_path).replace("\\", "/")
            file_list.append((local_path, blob_path))

    with Progress() as progress:
        task = progress.add_task("Uploading", total=len(file_list), filename="")
    
        for local_path, blob_path in file_list:
            progress.update(task, filename=os.path.basename(local_path))
            blob = bucket.blob(blob_path)
            blob.upload_from_filename(local_path)
            progress.advance(task)

## Build STAC Catalog

In [51]:
# create stac catalog

gcs_bucket = 'com_res_fim_output'
prefix = ''
extension = ".cog"
output_dir = './catalog'

# Connect to the GCS bucket and all the files matching the extension "cog" in
# the all subdirectories. Save these as a list of paths.
print(
    f"Gathering COG files in {gcs_bucket} with prefix '{prefix}' and extension '{extension}'...",
    end="",
)
client = storage.Client()
bucket = client.bucket(gcs_bucket)
blobs = client.list_blobs(bucket, prefix=prefix)
matching_files = []
for blob in blobs:
    if blob.name.endswith(extension):
        matching_files.append(f"gs://{gcs_bucket}/{blob.name}")
print("done")

print(f'Found {len(matching_files)} matching files')

Gathering COG files in com_res_fim_output with prefix '' and extension '.cog'...done
Found 22293 matching files


In [52]:

# organize the matching files by reach id
# extract FIM attributes and save them for building
# that catalog later on.
items = {}
for url in matching_files:
    filename_parts = url.split('/')[-1].split('__')
    item_id = url.split('/')[-1].replace(extension, "")
    reach_id = int(filename_parts[0])
    stage = float('.'.join(filename_parts[1].split('_')[0:2]))
    flow = float(filename_parts[2].split('_')[0])

    dat = {'url': url,
           'item_id': item_id,
           'reach_id': reach_id,
           'stage': stage,
           'flow': flow}
    if reach_id not in items.keys():
        items[reach_id] = [dat]
    else:
        items[reach_id].append(dat)

In [53]:
catalog = pystac.Catalog(
    id="fim-data-catalog", description="Flood maps indexed by id, stage, and flow"
)

# For each reach_id, create a sub-catalog and add items
for reach_id, items_for_id in items.items():
    sub_catalog = pystac.Catalog(id=f"reach-{reach_id}", description=f"Catalog for reach_id {reach_id}")
    catalog.add_child(sub_catalog)

    for attrs in items_for_id:
        
        stac_item = pystac.Item(
            id=attrs['item_id'],
            geometry=None,
            bbox=None,
            datetime=datetime(
                1970, 1, 1, 0, 0, 0
            ),  # placeholder because this parameter is required
            properties={"id": attrs['reach_id'], "stage": attrs['stage'], "flow": attrs['flow']},
            )

        stac_item.add_asset(
            "cog",
            pystac.Asset(
                href=attrs['url'],
                media_type=pystac.MediaType.COG,
                roles=["data"],
                title="Cloud Optimized GeoTiff",
            ),
        )
    
        sub_catalog.add_item(stac_item)

    # Save the sub-catalog to its folder
    sub_catalog_dir = os.path.join(output_dir, f"reach-{reach_id}")
    os.makedirs(sub_catalog_dir, exist_ok=True)
    sub_catalog.normalize_and_save(root_href=sub_catalog_dir, catalog_type=pystac.CatalogType.SELF_CONTAINED)

# Save the root catalog
catalog.normalize_and_save(root_href=output_dir, catalog_type=pystac.CatalogType.SELF_CONTAINED)



In [36]:
upload_directory_to_gcs_with_progress('catalog', gcs_bucket, gcs_prefix="catalog")

Output()

## Build SQLite from STAC

Unfortunately, this database cannot be directly accessed when stored in a GCS bucket.

In [75]:
def extract_metadata_from_item(item: pystac.Item) -> Optional[Tuple[int, float, float, str]]:
    try:
        reach_id = int(item.properties.get("id"))
        stage = float(item.properties.get("stage"))
        flow = float(item.properties.get("flow"))
        cog_asset = item.assets.get("cog")

        if not cog_asset:
            return None
        cog_path = cog_asset.href
        return (reach_id, stage, flow, cog_path)
    except Exception as e:
        print(f"Skipping item {item.id}: {e}")
        return None

In [76]:
def sync_stac_to_sqlite(stac_path: str, sqlite_path: str = "inundation_index.db", allow_deletes: bool = False):
    print(f"Loading STAC catalog: {stac_path}")
    catalog = pystac.Catalog.from_file(os.path.join(stac_path, "catalog.json"))
    items = list(catalog.get_all_items())
    print(f"Found {len(items)} items in catalog.")

    # Extract new item metadata
    current_data = set()
    for item in items:
        row = extract_metadata_from_item(item)
        if row:
            current_data.add(row)

    # Connect to SQLite
    conn = sqlite3.connect(sqlite_path)
    c = conn.cursor()

    # Create table if missing
    c.execute("""
    CREATE TABLE IF NOT EXISTS inundation (
        reach_id INTEGER,
        stage REAL,
        flow REAL,
        cog_path TEXT,
        UNIQUE(reach_id, stage, flow)
    )
    """)
    conn.commit()

    # Load existing entries
    c.execute("SELECT reach_id, stage, flow, cog_path FROM inundation")
    existing_data = set(c.fetchall())

    # Insert new/updated rows
    to_insert = current_data - existing_data
    if to_insert:
        c.executemany("INSERT OR REPLACE INTO inundation VALUES (?, ?, ?, ?)", list(to_insert))
        print(f"Inserted or updated {len(to_insert)} items.")

    # Optionally delete removed items
    if allow_deletes:
        to_delete = existing_data - current_data
        if to_delete:
            for row in to_delete:
                c.execute("DELETE FROM inundation WHERE reach_id=? AND stage=? AND flow=?", row[:3])
            print(f"Deleted {len(to_delete)} items no longer in catalog.")

    # Rebuild index
    c.execute("CREATE INDEX IF NOT EXISTS idx_reach_stage_flow ON inundation(reach_id, stage, flow)")
    conn.commit()
    conn.close()

    print("✅ Sync complete.")

In [84]:
sync_stac_to_sqlite("fim-catalog", sqlite_path="fim_catalog_index.db", allow_deletes=True)

Loading STAC catalog: fim-catalog
Found 801 items in catalog.
✅ Sync complete.


Push this to GCP.

In [85]:
client = storage.Client()
bucket = client.bucket('com_res_fim_output')
blob = bucket.blob('fim_catalog_index.db')
blob.upload_from_filename('fim_catalog_index.db')

## Build BigQuery from STAC

In [63]:
def flatten_stac_items(catalog_path: str) -> pd.DataFrame:
    root_catalog = pystac.Catalog.from_file(str(Path(catalog_path) / "catalog.json"))
    
    rows = []
    for item in root_catalog.get_all_items():
        row = {
            "item_id": item.id,
            "reach_id": item.properties.get("id"),
            "stage": item.properties.get("stage"),
            "flow": item.properties.get("flow"),
            "datetime": item.datetime.isoformat() if item.datetime else None,
            "asset_url": item.assets["cog"].href if "cog" in item.assets else None,
            "public_url": f'https://storage.googleapis.com/{item.assets["cog"].href.replace("gs://", "")}'
        }
        rows.append(row)

    return pd.DataFrame(rows)

In [64]:
df = flatten_stac_items("catalog")

In [66]:
df.to_json("fim_catalog_index.jsonl", orient="records", lines=True)

In [67]:
upload_to_gcs("fim_catalog_index.jsonl", "com_res_fim_output", "fim_catalog_index.jsonl")

Uploaded to gs://com_res_fim_output/fim_catalog_index.jsonl


In [68]:
def load_stac_to_bigquery(gcs_uri: str, table_id: str):
    client = bigquery.Client()

    job_config = bigquery.LoadJobConfig(
        source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
        autodetect=True,
        write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
    )

    load_job = client.load_table_from_uri(gcs_uri, table_id, job_config=job_config)
    load_job.result()  # Wait for job to complete
    print(f"Loaded {load_job.output_rows} rows to {table_id}")

In [69]:
load_stac_to_bigquery("gs://com_res_fim_output/fim_catalog_index.jsonl", "com-res.flood_data.fim_catalog")

Loaded 22293 rows to com-res.flood_data.fim_catalog


## Query FIM STAC Catalog

The purpose of this notebook is to demonstrate how to query a STAC catalog containing FIM maps.

In [86]:
import pystac

In [87]:
# Load catalog
catalog_url = "https://storage.googleapis.com/com_res_fim_output/catalog/catalog.json"
#catalog_url = "./catalog/catalog.json"
catalog = pystac.Catalog.from_file(catalog_url)

In [88]:
catalog

In [89]:
items = catalog.get_all_items()

In [90]:
%%time
matching_items = []
for item in catalog.get_all_items():
    if item.properties.get("id") == 8584996:
        matching_items.append(item)


print(f"Found {len(matching_items)} matching items")

Found 50 matching items
CPU times: user 19.7 s, sys: 1.71 s, total: 21.4 s
Wall time: 2min 9s


## Query SQLite

This is slow, so instead we'll use SQLite for our lookups.

Test searching for a specific file.

In [80]:
%%time

conn = sqlite3.connect("fim_catalog_index.db")
c = conn.cursor()

reach_id = 8584888
stage = 12.5
flow = 3668

results = c.execute("""
    SELECT cog_path
    FROM inundation
    WHERE reach_id = ? AND stage = ? AND flow = ?
""", (reach_id, stage, flow)).fetchall()

for row in results:
    print(row[0])

conn.close()

gs://com_res_fim_output/flood_11010001/11010001_inundation/8584888/8584888__12_5_m__3668_cms_inundation.cog
CPU times: user 1.26 ms, sys: 1.1 ms, total: 2.37 ms
Wall time: 1.18 ms


Test searching for a range of files

In [82]:
%%time

conn = sqlite3.connect("fim_catalog_index.db")
c = conn.cursor()

reach_id = 8584888

results = c.execute("""
    SELECT cog_path
    FROM inundation
    WHERE reach_id = ? 
""", (reach_id,)).fetchall()

for row in results:
    print(row[0])

conn.close()

gs://com_res_fim_output/flood_11010001/11010001_inundation/8584888/8584888__0_5_m__13_cms_inundation.cog
gs://com_res_fim_output/flood_11010001/11010001_inundation/8584888/8584888__1_0_m__40_cms_inundation.cog
gs://com_res_fim_output/flood_11010001/11010001_inundation/8584888/8584888__1_5_m__78_cms_inundation.cog
gs://com_res_fim_output/flood_11010001/11010001_inundation/8584888/8584888__2_0_m__125_cms_inundation.cog
gs://com_res_fim_output/flood_11010001/11010001_inundation/8584888/8584888__2_5_m__180_cms_inundation.cog
gs://com_res_fim_output/flood_11010001/11010001_inundation/8584888/8584888__3_0_m__244_cms_inundation.cog
gs://com_res_fim_output/flood_11010001/11010001_inundation/8584888/8584888__3_5_m__315_cms_inundation.cog
gs://com_res_fim_output/flood_11010001/11010001_inundation/8584888/8584888__4_0_m__394_cms_inundation.cog
gs://com_res_fim_output/flood_11010001/11010001_inundation/8584888/8584888__4_5_m__485_cms_inundation.cog
gs://com_res_fim_output/flood_11010001/11010001_i

Unfortunately, this won't work if we want to access the database remotely :( 

## Query BigQuery

Query a single item

In [27]:
from google.cloud import bigquery

client = bigquery.Client(project="com-res")

In [28]:
%%time 

query = """
SELECT *
FROM `com-res.flood_data.fim_catalog`
WHERE reach_id = @reach_id
  AND stage = @stage
  AND flow = @flow
LIMIT 100
"""

job_config = bigquery.QueryJobConfig(
    query_parameters=[
        bigquery.ScalarQueryParameter("reach_id", "INT64", 8585042),
        bigquery.ScalarQueryParameter("stage", "FLOAT64", 3.1),
        bigquery.ScalarQueryParameter("flow", "FLOAT64", 2000),
    ]
)

query_job = client.query(query, job_config=job_config)

for row in query_job:
    print(row)

CPU times: user 20 ms, sys: 8.54 ms, total: 28.6 ms
Wall time: 1.21 s


In [29]:
row.keys()

NameError: name 'row' is not defined

In [25]:
row[3]

3.1

In [40]:
%%time 

query = """
SELECT *
FROM `com-res.flood_data.fim_catalog`
WHERE reach_id = @reach_id
ORDER BY stage ASC
"""

job_config = bigquery.QueryJobConfig(
    query_parameters=[
        bigquery.ScalarQueryParameter("reach_id", "INT64", 8584970),
    ]
)

query_job = client.query(query, job_config=job_config)

results = dict(files = [],
           flows_cms = [],
           stages_m = [])

for row in query_job:
    results['files'].append(row['public_url'])
    results['stages_m'].append(row['stage'])
    results['flows_cms'].append(row['flow'])

KeyError: "no row field 'public_url'"

In [32]:
results

{'files': ['https://storage.googleapis.com/com_res_fim_output/flood_11010001/11010001_inundation/8584970/8584970__0_5_m__24_cms_inundation.cog',
  'https://storage.googleapis.com/com_res_fim_output/flood_11010001/11010001_inundation/8584970/8584970__1_0_m__75_cms_inundation.cog',
  'https://storage.googleapis.com/com_res_fim_output/flood_11010001/11010001_inundation/8584970/8584970__1_5_m__145_cms_inundation.cog',
  'https://storage.googleapis.com/com_res_fim_output/flood_11010001/11010001_inundation/8584970/8584970__2_0_m__232_cms_inundation.cog',
  'https://storage.googleapis.com/com_res_fim_output/flood_11010001/11010001_inundation/8584970/8584970__2_5_m__333_cms_inundation.cog',
  'https://storage.googleapis.com/com_res_fim_output/flood_11010001/11010001_inundation/8584970/8584970__3_0_m__448_cms_inundation.cog',
  'https://storage.googleapis.com/com_res_fim_output/flood_11010001/11010001_inundation/8584970/8584970__3_5_m__577_cms_inundation.cog',
  'https://storage.googleapis.com/

In [138]:
print(results['flows_cms'])

[24.0, 75.0, 145.0, 232.0, 333.0, 448.0, 577.0, 716.0, 867.0, 1029.0, 1199.0, 1378.0, 1569.0, 1771.0, 1994.0, 2222.0, 2461.0, 2720.0, 2989.0, 3259.0, 3547.0, 3852.0, 4173.0, 4520.0, 4878.0, 5251.0, 5641.0, 6046.0, 6456.0, 6901.0, 7387.0, 7885.0, 8387.0, 8926.0, 9480.0, 10047.0, 10634.0, 11263.0, 11941.0, 12610.0, 13333.0, 14082.0, 14849.0, 15635.0, 16465.0, 17321.0, 18225.0, 19146.0, 20111.0, 21103.0]


In [6]:
print(results['files'])

['gs://com_res_fim_output/flood_11010001/11010001_inundation/8584970/8584970__0_5_m__24_cms_inundation.cog', 'gs://com_res_fim_output/flood_11010001/11010001_inundation/8584970/8584970__1_0_m__75_cms_inundation.cog', 'gs://com_res_fim_output/flood_11010001/11010001_inundation/8584970/8584970__1_5_m__145_cms_inundation.cog', 'gs://com_res_fim_output/flood_11010001/11010001_inundation/8584970/8584970__2_0_m__232_cms_inundation.cog', 'gs://com_res_fim_output/flood_11010001/11010001_inundation/8584970/8584970__2_5_m__333_cms_inundation.cog', 'gs://com_res_fim_output/flood_11010001/11010001_inundation/8584970/8584970__3_0_m__448_cms_inundation.cog', 'gs://com_res_fim_output/flood_11010001/11010001_inundation/8584970/8584970__3_5_m__577_cms_inundation.cog', 'gs://com_res_fim_output/flood_11010001/11010001_inundation/8584970/8584970__4_0_m__716_cms_inundation.cog', 'gs://com_res_fim_output/flood_11010001/11010001_inundation/8584970/8584970__4_5_m__867_cms_inundation.cog', 'gs://com_res_fim_ou