# Platform data management using command line tools



## Application 

The application below is a Python CLI that detects water bodies using the NDWI spectral index and the Otsu threshold.

In this context it represents the application that is then included in an Application Package encoded in CWL.

In [20]:
import os
import anyio
import asyncclick as clicka
import click
import pystac
import rasterio
from skimage.filters import threshold_otsu
from rasterio.mask import mask
from pyproj import Transformer
from shapely import box
from loguru import logger
import rasterio
import pystac
import shutil
import rio_stac
import numpy as np

np.seterr(divide="ignore", invalid="ignore")


def crop(asset: pystac.Asset, bbox, epsg):
    """_summary_

    Args:
        asset (_type_): _description_
        bbox (_type_): _description_
        epsg (_type_): _description_

    Returns:
        _type_: _description_
    """
    with rasterio.open(asset.get_absolute_href()) as src:

        transformer = Transformer.from_crs(epsg, src.crs, always_xy=True)

        minx, miny = transformer.transform(bbox[0], bbox[1])
        maxx, maxy = transformer.transform(bbox[2], bbox[3])

        transformed_bbox = box(minx, miny, maxx, maxy)

        logger.info(f"Crop {asset.get_absolute_href()}")

        out_image, out_transform = rasterio.mask.mask(
            src, [transformed_bbox], crop=True
        )
        out_meta = src.meta.copy()

        out_meta.update(
            {
                "height": out_image.shape[1],
                "width": out_image.shape[2],
                "transform": out_transform,
            }
        )

        return out_image.astype(np.float32), out_meta


def threshold(data):
    """Returns the Otsu threshold of a numpy array"""
    return data > threshold_otsu(data[np.isfinite(data)])


def normalized_difference(array1, array2):
    """Returns the normalized difference of two numpy arrays"""
    return (array1 - array2) / (array1 + array2)


def aoi2box(aoi):
    """Converts an area of interest expressed as a bounding box to a list of floats"""
    return [float(c) for c in aoi.split(",")]


def get_asset(item, common_name):
    """Returns the asset of a STAC Item defined with its common band name"""
    for _, asset in item.get_assets().items():
        if not "data" in asset.to_dict()["roles"]:
            continue

        eo_asset = pystac.extensions.eo.AssetEOExtension(asset)
        if not eo_asset.bands:
            continue
        for b in eo_asset.bands:
            if (
                "common_name" in b.properties.keys()
                and b.properties["common_name"] == common_name
            ):
                return asset


@click.command(
    short_help="Crop",
    help="Water bodies detection using the Normalized Difference Water Index (NDWI) and Otsu thresholding.",
)
@click.option(
    "--input-item",
    "item_url",
    help="Staged STAC catalog",
    required=True,
)
@click.option(
    "--aoi",
    "aoi",
    help="Area of interest expressed as a bounding box",
    required=True,
)
@click.option(
    "--epsg",
    "epsg",
    help="EPSG code",
    required=True,
)
@click.option(
    "--band",
    "bands",
    help="Common band name",
    required=True,
    multiple=True,
)
def water_bodies(item_url, aoi, bands, epsg):

    if os.path.isdir(item_url):
        catalog = pystac.read_file(os.path.join(item_url, "catalog.json"))
        item = next(catalog.get_items())
    else:
        item = pystac.read_file(item_url)

    logger.info(f"Read {item.id} from {item.get_self_href()}")

    cropped_assets = {}

    for band in bands:
        asset = get_asset(item, band)
        logger.info(f"Read asset {band} from {asset.get_absolute_href()}")

        if not asset:
            msg = f"Common band name {band} not found in the assets"
            logger.error(msg)
            raise ValueError(msg)

        bbox = aoi2box(aoi)

        out_image, out_meta = crop(asset, bbox, epsg)

        cropped_assets[band] = out_image[0]

    nd = normalized_difference(cropped_assets[bands[0]], cropped_assets[bands[1]])

    water_bodies = threshold(nd)

    out_meta.update(
        {
            "dtype": "uint8",
            "driver": "COG",
            "tiled": True,
            "compress": "lzw",
            "blockxsize": 256,
            "blockysize": 256,
        }
    )

    water_body = "otsu.tif"

    with rasterio.open(water_body, "w", **out_meta) as dst_dataset:
        logger.info(f"Write otsu.tif")
        dst_dataset.write(water_bodies, indexes=1)

    logger.info(f"Creating a STAC Catalog")
    cat = pystac.Catalog(id="catalog", description="water-bodies")

    if os.path.isdir(item_url):
        catalog = pystac.read_file(os.path.join(item_url, "catalog.json"))
        item = next(catalog.get_items())
    else:
        item = pystac.read_file(item_url)

    os.makedirs(item.id, exist_ok=True)
    shutil.copy(water_body, item.id)

    out_item = rio_stac.stac.create_stac_item(
        source=water_body,
        input_datetime=item.datetime,
        id=item.id,
        asset_roles=["data", "visual"],
        asset_href=os.path.basename(water_body),
        asset_name="data",
        with_proj=True,
        with_raster=False,
    )

    cat.add_items([out_item])

    cat.normalize_and_save(
        root_href="./", catalog_type=pystac.CatalogType.SELF_CONTAINED
    )

    os.remove(water_body)

    logger.info("Done!")


Let's print the CLI help:

In [21]:
from click.testing import CliRunner

runner = CliRunner()
result = runner.invoke(water_bodies, ['--help'])

print(result.output)

Usage: water-bodies [OPTIONS]

  Water bodies detection using the Normalized Difference Water Index (NDWI) and
  Otsu thresholding.

Options:
  --input-item TEXT  Staged STAC catalog  [required]
  --aoi TEXT         Area of interest expressed as a bounding box  [required]
  --epsg TEXT        EPSG code  [required]
  --band TEXT        Common band name  [required]
  --help             Show this message and exit.



  result = runner.invoke(water_bodies, ['--help'])


To run this application, we'll need to stage an EO acquisition

## Stage a Landsat-9 acquisition

The stage-in operation as described in OGC Best Practice for EO Application Packaging targets the creation of a STAC Catalog and Item.

In [22]:
import pystac
import stac_asset
import asyncio
import os
import nest_asyncio
nest_asyncio.apply()

In [23]:
config = stac_asset.Config(warn=True)

@clicka.command(
    short_help="Stage-in",
    help="Stages a Landsat-9 acquisition",
)
@clicka.option(
    "--href",
    "href",
    help="URL to Landsat-9 catalog entry",
    required=True,
)
async def stage(href: str):

    item = pystac.read_file(href)

    os.makedirs(os.path.join("staged", item.id), exist_ok=True)
    cwd = os.getcwd()

    os.chdir(os.path.join("staged", item.id))
    item = await stac_asset.download_item(item=item, directory=".", config=config)
    os.chdir(cwd)
    os.chdir("staged")

    cat = pystac.Catalog(
        id="catalog",
        description=f"catalog with staged {item.id}",
        title=f"catalog with staged {item.id}",
    )
    cat.add_item(item)

    cat.normalize_hrefs("./")
    cat.save(catalog_type=pystac.CatalogType.SELF_CONTAINED)
    os.chdir(cwd)
    return cat

We can print the stage-in CLI help:

In [32]:
from asyncclick.testing import CliRunner as ACliRunner

runner = ACliRunner()
result = await(runner.invoke(stage, ['--help']))

print(result.output)

Usage: stage [OPTIONS]

  Stages a Landsat-9 acquisition

Options:
  --href TEXT  URL to Landsat-9 catalog entry  [required]
  --help       Show this message and exit.



And stage a Landsat-9 acquisition:

In [33]:
arguments = ["--href", "https://planetarycomputer.microsoft.com/api/stac/v1/collections/landsat-c2-l2/items/LC09_L2SP_042033_20231015_02_T1"]

runner = ACliRunner()
result = await(runner.invoke(stage, args=arguments))

print(result.output)






Inspect the staged Landsat-9 product

In [37]:
cat = pystac.read_file(os.path.join("staged", "catalog.json"))
cat.describe()

* <Catalog id=catalog>
  * <Item id=LC09_L2SP_042033_20231015_02_T1>


In [42]:
for key, asset in next(cat.get_items()).get_assets().items():
    print(key, asset.get_absolute_href())

qa /workspace/stac-eoap/notebooks/staged/LC09_L2SP_042033_20231015_02_T1/LC09_L2SP_042033_20231015_20231016_02_T1_ST_QA.TIF
ang /workspace/stac-eoap/notebooks/staged/LC09_L2SP_042033_20231015_02_T1/LC09_L2SP_042033_20231015_20231016_02_T1_ANG.txt
red /workspace/stac-eoap/notebooks/staged/LC09_L2SP_042033_20231015_02_T1/LC09_L2SP_042033_20231015_20231016_02_T1_SR_B4.TIF
blue /workspace/stac-eoap/notebooks/staged/LC09_L2SP_042033_20231015_02_T1/LC09_L2SP_042033_20231015_20231016_02_T1_SR_B2.TIF
drad /workspace/stac-eoap/notebooks/staged/LC09_L2SP_042033_20231015_02_T1/LC09_L2SP_042033_20231015_20231016_02_T1_ST_DRAD.TIF
emis /workspace/stac-eoap/notebooks/staged/LC09_L2SP_042033_20231015_02_T1/LC09_L2SP_042033_20231015_20231016_02_T1_ST_EMIS.TIF
emsd /workspace/stac-eoap/notebooks/staged/LC09_L2SP_042033_20231015_02_T1/LC09_L2SP_042033_20231015_20231016_02_T1_ST_EMSD.TIF
trad /workspace/stac-eoap/notebooks/staged/LC09_L2SP_042033_20231015_02_T1/LC09_L2SP_042033_20231015_20231016_02_T1_ST

## Run the water bodies detection application using the staged Landsat-9 product



In [44]:
from click.testing import CliRunner

runner = CliRunner()
result = runner.invoke(water_bodies, ['--help'])

print(result.output)

Usage: water-bodies [OPTIONS]

  Water bodies detection using the Normalized Difference Water Index (NDWI) and
  Otsu thresholding.

Options:
  --input-item TEXT  Staged STAC catalog  [required]
  --aoi TEXT         Area of interest expressed as a bounding box  [required]
  --epsg TEXT        EPSG code  [required]
  --band TEXT        Common band name  [required]
  --help             Show this message and exit.



In [45]:
arguments = ["--input-item", "./staged",
                "--aoi", "-118.985,38.432,-118.183,38.938",
                "--epsg", "EPSG:4326",
                "--band", "green",
                "--band", "nir08"]

In [47]:
runner = CliRunner()
result = runner.invoke(water_bodies, args=arguments)

print(result.output)

[32m2024-10-08 05:37:30.229[0m | [1mINFO    [0m | [36m__main__[0m:[36mwater_bodies[0m:[36m128[0m - [1mRead LC09_L2SP_042033_20231015_02_T1 from /workspace/stac-eoap/notebooks/staged/LC09_L2SP_042033_20231015_02_T1/LC09_L2SP_042033_20231015_02_T1.json[0m
[32m2024-10-08 05:37:30.323[0m | [1mINFO    [0m | [36m__main__[0m:[36mwater_bodies[0m:[36m134[0m - [1mRead asset green from /workspace/stac-eoap/notebooks/staged/LC09_L2SP_042033_20231015_02_T1/LC09_L2SP_042033_20231015_20231016_02_T1_SR_B3.TIF[0m
[32m2024-10-08 05:37:30.353[0m | [1mINFO    [0m | [36m__main__[0m:[36mcrop[0m:[36m41[0m - [1mCrop /workspace/stac-eoap/notebooks/staged/LC09_L2SP_042033_20231015_02_T1/LC09_L2SP_042033_20231015_20231016_02_T1_SR_B3.TIF[0m
[32m2024-10-08 05:37:30.524[0m | [1mINFO    [0m | [36m__main__[0m:[36mwater_bodies[0m:[36m134[0m - [1mRead asset nir08 from /workspace/stac-eoap/notebooks/staged/LC09_L2SP_042033_20231015_02_T1/LC09_L2SP_042033_20231015_20231016_

  for key, value in x.items():



Inspect the generated STAC Catalog containing the application execution results:

In [49]:
results_cat = pystac.read_file(os.path.join(".", "catalog.json")) 

results_cat.describe()

* <Catalog id=catalog>
  * <Item id=LC09_L2SP_042033_20231015_02_T1>


In [50]:
for key, asset in next(results_cat.get_items()).get_assets().items():
    print(key, asset.get_absolute_href())

data /workspace/stac-eoap/notebooks/LC09_L2SP_042033_20231015_02_T1/otsu.tif


## Stage-out 

The stage-out operation as described in OGC Best Practice for EO Application Packaging targets publishing the results STAC Catalog and Item to a Platform storage.

This example uses an S3 object storage.

We'll use `boto3` and a `pystac` custom IO to write the STAC objects to S3.

In [51]:
import os
import sys
import pystac
import botocore
import boto3
import shutil
from pystac.stac_io import DefaultStacIO, StacIO
from urllib.parse import urlparse

aws_access_key_id = "test" 
aws_secret_access_key = "test" 
region_name = "us-east-1"
endpoint_url = "http://localstack:4566"

class CustomStacIO(DefaultStacIO):
    """Custom STAC IO class that uses boto3 to read from S3."""

    def __init__(self):
        self.session = botocore.session.Session()
        self.s3_client = self.session.create_client(
            service_name="s3",
            use_ssl=True,
            aws_access_key_id=aws_access_key_id,
            aws_secret_access_key=aws_secret_access_key,
            endpoint_url=endpoint_url,
            region_name=region_name,
        )

    def write_text(self, dest, txt, *args, **kwargs):
        parsed = urlparse(dest)
        if parsed.scheme == "s3":
            self.s3_client.put_object(
                Body=txt.encode("UTF-8"),
                Bucket=parsed.netloc,
                Key=parsed.path[1:],
                ContentType="application/geo+json",
            )
        else:
            super().write_text(dest, txt, *args, **kwargs)


client = boto3.client(
    "s3",
    aws_access_key_id=aws_access_key_id,
    aws_secret_access_key=aws_secret_access_key,
    endpoint_url=endpoint_url,
    region_name=region_name,
)

StacIO.set_default(CustomStacIO)


In [59]:
@click.command(
    short_help="Crop",
    help="Water bodies detection using the Normalized Difference Water Index (NDWI) and Otsu thresholding.",
)
@click.option(
    "--stac-catalog",
    "cat_url",
    help="STAC catalog to stage-out",
    required=True,
)
@click.option(
    "--s3-bucket",
    "bucket",
    help="S3 bucket",
    required=True,
)
@click.option(
    "--subfolder",
    "subfolder",
    help="Folder in S3 bucket",
    required=True,
)
def stage_out(cat_url, bucket, subfolder):

    shutil.copytree(cat_url, "/tmp/catalog")
    cat = pystac.read_file(os.path.join("/tmp/catalog", "catalog.json"))

    for item in cat.get_items():
        for key, asset in item.get_assets().items():
            s3_path = os.path.normpath(
                os.path.join(os.path.join(subfolder, item.id, asset.href))
            )
            logger.info(f"upload {asset.href} to s3://{bucket}/{s3_path}",file=sys.stderr)
            client.upload_file(
                asset.get_absolute_href(),
                bucket,
                s3_path,
            )
            asset.href = f"s3://{bucket}/{s3_path}"
            item.add_asset(key, asset)

    cat.normalize_hrefs(f"s3://{bucket}/{subfolder}")

    for item in cat.get_items():
        # upload item to S3
        logger.info(f"upload {item.id} to s3://{bucket}/{subfolder}", file=sys.stderr)
        pystac.write_file(item, item.get_self_href())

    # upload catalog to S3
    logger.info(f"upload catalog.json to s3://{bucket}/{subfolder}", file=sys.stderr)
    pystac.write_file(cat, cat.get_self_href())

    shutil.rmtree("/tmp/catalog/", )

    print(f"s3://{bucket}/{subfolder}/catalog.json", file=sys.stdout)

We can print the stage-out help:

In [60]:
from click.testing import CliRunner

runner = CliRunner()
result = runner.invoke(stage_out, ['--help'])

print(result.output)

Usage: stage-out [OPTIONS]

  Water bodies detection using the Normalized Difference Water Index (NDWI) and
  Otsu thresholding.

Options:
  --stac-catalog TEXT  STAC catalog to stage-out  [required]
  --s3-bucket TEXT     S3 bucket  [required]
  --subfolder TEXT     Folder in S3 bucket  [required]
  --help               Show this message and exit.



And invoke it:



In [61]:
arguments = ["--stac-catalog", ".", 
            "--s3-bucket", "results", 
            "--subfolder", "run-003"]

runner = CliRunner()
result = runner.invoke(stage_out, args=arguments)

print(result.output)

[32m2024-10-08 05:48:30.419[0m | [1mINFO    [0m | [36m__main__[0m:[36mstage_out[0m:[36m33[0m - [1mupload ./otsu.tif to s3://results/run-003/LC09_L2SP_042033_20231015_02_T1/otsu.tif[0m
[32m2024-10-08 05:48:30.434[0m | [1mINFO    [0m | [36m__main__[0m:[36mstage_out[0m:[36m46[0m - [1mupload LC09_L2SP_042033_20231015_02_T1 to s3://results/run-003[0m
[32m2024-10-08 05:48:30.634[0m | [1mINFO    [0m | [36m__main__[0m:[36mstage_out[0m:[36m50[0m - [1mupload catalog.json to s3://results/run-003[0m


s3://results/run-003/catalog.json

