# Process Maxar's Open data catalog into OAM specific STAC

In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import datetime as dt
import json
from concurrent.futures import ThreadPoolExecutor, as_completed

import pystac
import requests

from stactools.hotosm.maxar.stac import create_collection, create_item

In [3]:
MAXAR_ROOT_CATALOG = "https://maxar-opendata.s3.amazonaws.com/events/catalog.json"
MAXAR_EVENT_INFO = "https://maxar-opendata.s3.amazonaws.com/event_info.json"

In [4]:
resp = requests.get(MAXAR_EVENT_INFO)
maxar_event_info = resp.json()

maxar_event_dates = sorted(
    [dt.datetime.strptime(info["date"], "%Y-%m-%d") for info in maxar_event_info]
)

In [5]:
maxar_catalog = pystac.read_file(MAXAR_ROOT_CATALOG)
maxar_catalog

In [7]:
collection = create_collection(
    maxar_catalog, min(maxar_event_dates), max(maxar_event_dates)
)

pystac.write_file(
    collection, include_self_link=False, dest_href="maxar-opendata-collection.json"
)

collection

In [8]:
maxar_items = []
with ThreadPoolExecutor() as pool:
    futures_find = {}
    futures_translate = {}

    # Find all items up front in threads since it takes a while to
    # scan through static catalog
    collections = maxar_catalog.get_collections()

    for collection in collections:
        subcollections = list(collection.get_collections())
        for subcollection in subcollections:
            future = pool.submit(lambda coln: list(coln.get_all_items()), subcollection)
            futures_find[future] = f"{collection.id}/{subcollection.id}"

    print("Submitted all Collections. Finding Items")

    item_i = 0
    for future in as_completed(futures_find):
        collection_id = futures_find[future]
        items_found = future.result()
        for item_found in items_found:
            item_i += 1
            if item_i % 5000 == 0:
                print(f"Submitting item {item_i}")
            future = pool.submit(create_item, item_found)
            futures_translate[future] = item_found

    print("Found all Items.")

    for rewrite_i, future in enumerate(as_completed(futures_translate)):
        if rewrite_i % 2000 == 0:
            print(f"Processed item {rewrite_i} of {item_i}")
        original_item = futures_translate[future]
        try:
            result = future.result()
        except Exception as exc:
            print(f"Unexpected exception for {original_item}! {exc}")
            raise
        else:
            maxar_items.append(result)

converted_at = dt.datetime.now()

Submitted all Collections. Finding Items
Submitting item 5000


STACError: HREF: 'https://maxar-opendata.s3.amazonaws.com/events/Kahramanmaras-turkey-earthquake-23/ard/37/031133032312/2023-03-05/10300100E304B400.json' does not resolve to a STAC object

In [None]:
print(json.dumps(maxar_items[24_000].to_dict(), indent=2))

## Write to NDJSON for ingestion into (PgSTAC) STAC Catalog

In [None]:
destination = f"maxar-opendata-{converted_at.strftime('%Y%m%dT%H%M%S')}.ndjson"

with open(destination, "w") as dst:
    for item in maxar_items:
        dst.write(f"{json.dumps(item.to_dict())}\n")

print(f"Wrote {len(maxar_items)} STAC Items to {destination}")

## Write to STAC GeoParquet for ingestion into (PgSTAC) STAC Catalog

We can also write our Items to STAC GeoParquet as a serialization format. This has a few advantages over JSON or NDJSON,

* Smaller: Storing data in a compressed binary format is smaller than in JSON
* Searchable: We can query Items by reading columns for the desired properties and quickly subset rows that match
* Tooling: STAC GeoParquet is a new standard that the ecosystem are developing tools for

In [None]:
import rustac

destination = f"maxar-opendata-{converted_at.strftime('%Y%m%dT%H%M%S')}.parquet"

await rustac.write(destination, [item.to_dict() for item in maxar_items])
print(f"Wrote {len(maxar_items)} STAC Items to {destination}")

In [None]:
! ls -lh maxar-opendata-*