# Upload Global Mangrove Watch STAC metadata

Author: Henry Rodman

Rather than using the more powerful (but complex) stactools-pipeline I decided to generate the STAC metadata for this collection and post it to the STAC ingestor API in a single notebook.

In [None]:
# /// script
# requires-python = ">=3.11"
# dependencies = [
#     "boto3",
#     "httpx",
#     "pystac>=1.12.0",
#     "stactools-global-mangrove-watch==0.2.1",
#     "tqdm",
# ]
# ///

In [1]:
import asyncio
import json
import os
import re
import tempfile
import urllib.request
import zipfile
from collections import defaultdict
from typing import List

import boto3
import httpx
import tqdm
from botocore.exceptions import ClientError
from pystac import set_stac_version
from stactools.global_mangrove_watch.constants import COLLECTION_ID
from stactools.global_mangrove_watch.stac import create_item, create_collection

set_stac_version("1.0.0")

# STAC ingestor URL
INGESTOR_URL = "https://stac-ingestor.maap-project.org"

# asset storage details
DESTINATION_BUCKET = "nasa-maap-data-store"
DESTINATION_PREFIX = "file-staging/nasa-map/global-mangrove-watch/v3"

## Dataset details
The raw files are stored in zip archives hosted by Zenodo. To get all of the assets we need to download all of the geotiff zip archives and upload the contents to a NASA S3 bucket. This process only needs to be run once. The STAC items may consist of one or two assets depending on the year (one asset for 1996, two assets for all other years) so we need to be able to pair up the `cog` and `change_cog` assets.

In [2]:
ZIP_URL_FORMAT = "https://zenodo.org/records/6894273/files/gmw_v3_{group}_gtiff.zip"

BASE_YEAR = 1996
YEARS = [
    2007,
    2008,
    2009,
    2010,
    2015,
    2016,
    2017,
    2018,
    2019,
    2020,
]
ALL_GROUPS = [str(year) for year in [BASE_YEAR] + YEARS] + [
    f"f{BASE_YEAR}_t{year}" for year in YEARS
]

def parse_filename(filename: str) -> tuple[str, str]:
    """Parse GMW filename and return (item_id, arg_name) to be used for
    passing cog and change_cog asset hrefs to create_item
    """
    # Pattern for change files: GMW_N27W110_chng_f1996_t2020_v3.tif
    change_pattern = r"(GMW_[NS]\d+[WE]\d+)_chng_f\d+_t(\d+)_v3\.tif"

    # Pattern for base files: GMW_N27W110_2020_v3.tif
    base_pattern = r"(GMW_[NS]\d+[WE]\d+)_(\d+)_v3\.tif"

    # Try change pattern first
    change_match = re.match(change_pattern, filename)
    if change_match:
        location, year = change_match.groups()
        base_key = f"{location}_{year}_v3"
        return base_key, "change_asset_href"

    # Try base pattern
    base_match = re.match(base_pattern, filename)
    if base_match:
        location, year = base_match.groups()
        base_key = f"{location}_{year}_v3"
        return base_key, "cog_asset_href"

    raise ValueError(f"Unrecognized filename pattern: {filename}")


def upload_files_to_s3() -> None:
    """Download zips and upload contents to S3."""
    s3_client = boto3.client("s3")

    for group in ALL_GROUPS:
        with tempfile.TemporaryDirectory() as temp_dir:
            zip_url = ZIP_URL_FORMAT.format(group=group)
            zip_path = os.path.join(temp_dir, f"gmw_v3_{group}_gtiff.zip")

            print(f"Downloading {zip_url}")
            urllib.request.urlretrieve(zip_url, zip_path)

            with zipfile.ZipFile(zip_path, "r") as zip_ref:
                zip_ref.extractall(temp_dir)

            for root, _, files in os.walk(temp_dir):
                for filename in sorted(files):
                    if filename.endswith(".tif"):
                        local_path = os.path.join(root, filename)
                        s3_key = f"{DESTINATION_PREFIX}/{filename}"

                        # Check if file already exists in S3
                        try:
                            s3_client.head_object(Bucket=DESTINATION_BUCKET, Key=s3_key)
                            print(f"File already exists in S3: {s3_key}")
                        except ClientError as e:
                            if e.response["Error"]["Code"] == "404":
                                print(f"Uploading to S3: {s3_key}")
                                s3_client.upload_file(
                                    local_path, DESTINATION_BUCKET, s3_key
                                )
                            else:
                                # If error is not 404, re-raise it
                                raise

        print(f"Completed processing group: {group}")


def create_inventory() -> List[dict[str, str]]:
    """List files in S3 and create inventory dictionary grouped by item ID."""
    s3_client = boto3.client("s3")
    inventory = defaultdict(dict)

    # Use paginator in case there are many files
    paginator = s3_client.get_paginator('list_objects_v2')
    pages = paginator.paginate(
        Bucket=DESTINATION_BUCKET,
        Prefix=DESTINATION_PREFIX
    )

    for page in pages:
        if 'Contents' not in page:
            continue
            
        for obj in page['Contents']:
            filename = os.path.basename(obj['Key'])
            
            if not filename.endswith('.tif'):
                continue

            try:
                base_key, arg = parse_filename(filename)
            except ValueError as e:
                print(f"Warning: {e}")
                continue

            href = f"s3://{DESTINATION_BUCKET}/{obj['Key']}"
            inventory[base_key][arg] = href

    return list(inventory.values())

async def post_item(client: httpx.AsyncClient, item, token: str) -> None:
    """Post a single item to the STAC ingestor API"""
    try:
        response = await client.post(
            f"{INGESTOR_URL}/ingestions",
            json=item.to_dict(),
            headers={
                'Authorization': f'Bearer {token}',
                'Content-Type': 'application/json',
            },
        )
        response.raise_for_status()
    except Exception as e:
        print(f"Error posting item: {e}")
        raise

async def post_all_items(items: List, token: str, max_concurrent: int = 20) -> None:
    """Post all items concurrently with a limit on concurrent requests"""
    async with httpx.AsyncClient(timeout=60) as client:
        semaphore = asyncio.Semaphore(max_concurrent)
        
        async def bounded_post(item):
            async with semaphore:
                return await post_item(client, item, token)
        
        tasks = [bounded_post(item) for item in items]
        
        for task in tqdm.tqdm(asyncio.as_completed(tasks), total=len(tasks)):
            await task

### Download the zips, upload COGs
Only need to run this once

In [5]:
if False:
    upload_files_to_s3()

## Create list of arguments for create_item

In [6]:
inventory = create_inventory()
len(inventory)

16731

## Get token for the STAC Ingestor API

In [7]:
# paste MAAP SMCE AWS credentials here:
session = boto3.Session(
    region_name="us-west-2",
)
client = session.client("secretsmanager", region_name="us-west-2")

# MAAP STAC secret
response = client.get_secret_value(
    SecretId="arn:aws:secretsmanager:us-west-2:916098889494:secret:MAAP-STAC-auth-dev/MAAP-workflows-EsykqB"
)

settings = json.loads(response["SecretString"])

# function to get token for STAC ingestor
def get_token(
    client_id: str, 
    client_secret: str, 
    domain: str,
    scope: str
) -> str:
    response = httpx.post(
        f"{domain}/oauth2/token",
        headers={
            "Content-Type": "application/x-www-form-urlencoded",
        },
        auth=(client_id, client_secret),
        data={
            "grant_type": "client_credentials",
            "scope": scope,
        },
    )
    try:
        response.raise_for_status()
    except Exception:
        raise

    return response.json()["access_token"]


token = get_token(
    client_id = settings["client_id"],
    client_secret = settings["client_secret"],
    domain = settings["cognito_domain"],
    scope = settings["scope"],
)

## Create collection and post to ingestor API

In [10]:
collection = create_collection()
collection

In [9]:
post_collection = httpx.post(
    f"{INGESTOR_URL}/collections",
    json=collection.to_dict(),
    headers = {
        'Authorization': f'Bearer {token}',
        'Content-Type': 'application/json',
    }
)
print(post_collection.json())

['Successfully published: global-mangrove-watch-3.0']


## Create items and post to ingestor API

In [8]:
items = []
for args in tqdm.tqdm(inventory):
    item = create_item(**args)
    item.collection_id = COLLECTION_ID
    items.append(item)

await post_all_items(items, token)

100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 16731/16731 [00:01<00:00, 8691.47it/s]
