# Using the APEx Product Catalogue with openEO

This notebook demonstrates how the APEx Product Catalogue can be integrated with an openEO-based platform.

> **Important**\
> Please note that openEO is an open standard, and not all platforms may support every use case. Additionally, the generated STAC metadata may vary based on your project's requirements. Therefore, we strongly recommend verifying your specific use cases with your chosen provider.

## Supported Use Cases

1. Scripted upload of results generated by openEO(supported by all openEO instances).
2. Direct publishing of results to object storage (requires support for the openEO 'workspace' API).
3. Direct publishing of results to a STAC catalogue (requires support for the openEO 'workspace' API, including merging into an existing catalogue).

In this notebook, we will focus on demonstrating use case 1 in detail, with a brief overview of how to implement use cases 2 and 3.


In [1]:
!pip install openeo owslib

import openeo
import pystac
import requests
import json

from owslib.ogcapi.records import Records
from owslib.util import Authentication



## Building the openEO Processing Graph

Before diving into the different use cases, we will first build a simple openEO workflow. In this example, we will calculate the NDVI for a single Sentinel-2 observation over Gran Canaria. This section assumes the reader has basic knowledge of openEO and will help us focus on the integration with the APEx Product Catalogue.


In [9]:
con = openeo.connect("openeo.dataspace.copernicus.eu").authenticate_oidc()

Authenticated using refresh token.


In [3]:
spatial_extent = {
      "type": "Feature",
      "properties": {},
      "geometry": {
        "coordinates": [
          [
            [
              -15.918605217697035,
              28.205553799401358
            ],
            [
              -15.918605217697035,
              27.72422378541127
            ],
            [
              -15.305188375387814,
              27.72422378541127
            ],
            [
              -15.305188375387814,
              28.205553799401358
            ],
            [
              -15.918605217697035,
              28.205553799401358
            ]
          ]
        ],
        "type": "Polygon"
      }
    }

In [26]:
cube = con.load_collection(
    "SENTINEL2_L2A",
    bands=["B04", "B08"],
    temporal_extent="2019-08-19",
    spatial_extent=spatial_extent
)
cube.result_node().update_arguments(featureflags={"tilesize": 256}) #force block size in output tiff

ndvi = cube.ndvi()

result = ndvi.resample_spatial(resolution=38.21851414258813, projection="EPSG:3857")
result

## Use Case 1: Scripted Upload

This use case is the most widely supported, as it involves retrieving the STAC metadata of openEO job results, downloading it, and then ingesting it into an APEx Product Catalogue.

Key drawbacks of this method include:

- The need to first download data from the openEO backend and then re-upload it, which can be time-consuming and bandwidth-intensive. This process may also be interrupted by network instability. Such cases could be avoided when in use cases 2 and 3.
- Additional project-specific code is required to automate this task.

We start this example by downloading the result through an openEO batch job.

In [25]:
job = result.execute_batch("gran_canaria.tiff",title="APEx Demo - Gran Canaria - NDVI")

0:00:00 Job 'j-250313114527493e83f90fc346ad8fee': send 'start'
0:00:13 Job 'j-250313114527493e83f90fc346ad8fee': created (progress 0%)
0:00:18 Job 'j-250313114527493e83f90fc346ad8fee': created (progress 0%)
0:00:25 Job 'j-250313114527493e83f90fc346ad8fee': created (progress 0%)
0:00:33 Job 'j-250313114527493e83f90fc346ad8fee': created (progress 0%)
0:00:43 Job 'j-250313114527493e83f90fc346ad8fee': running (progress N/A)
0:00:55 Job 'j-250313114527493e83f90fc346ad8fee': running (progress N/A)
0:01:11 Job 'j-250313114527493e83f90fc346ad8fee': running (progress N/A)
0:01:30 Job 'j-250313114527493e83f90fc346ad8fee': running (progress N/A)
0:01:54 Job 'j-250313114527493e83f90fc346ad8fee': running (progress N/A)
0:02:24 Job 'j-250313114527493e83f90fc346ad8fee': running (progress N/A)
0:03:01 Job 'j-250313114527493e83f90fc346ad8fee': finished (progress 100%)


In [28]:
job = con.job("j-250313110405469ea3095a5b2b06ce85")
job

In this next step we are going to download the job metadata and create a STAC collection using the `pystac` library.

In [29]:
stac_metadata_dict = job.get_results().get_metadata()
stac_metadata_dict["id"] = "gran_canaria_ndvi"


# #remove collection assets, we will rely on item links
# del stac_metadata_dict["assets"]
collection = pystac.Collection.from_dict(stac_metadata_dict)
collection.license = "CC-BY-4.0"
collection

### Converting an online openEO Collection to a local Collection

An important consideration until now is that we are currently using openEO API URLs in the STAC metadata. It’s important to note that some of these URLs may require authentication. Additionally, openEO only stores batch job results temporarily. Therefore, if long-term access to the results is needed, the files should be stored in a separate, accessible storage solution, such as an S3 bucket. Use cases 2 and 3 provide a solution for this.

The code below provides an example of how to update the STAC metadata to point to a URL outside of openEO. For this example, we use a placeholder value (`/tmp`).

Currently, there is an [open issue](https://github.com/Open-EO/openeo-python-client/issues/184) aimed at improving this behavior in openEO.

In [30]:
# Remove collection and canoncial links from both the collection and their items
collection.remove_links(rel="collection")
collection.remove_links(rel="canonical")

items = list(collection.get_stac_objects(rel=pystac.RelType.ITEM))
for i in items:
    i.remove_links(rel="collection")
    i.remove_links(rel="canonical")


# Normalize all the hrefs to use the new path /tmp
collection.set_self_href("./collection.json")
collection.normalize_hrefs('/tmp/', skip_unresolved=True)

collection.save(catalog_type=pystac.CatalogType.SELF_CONTAINED)
collection

## Upload to STAC API

The collection metadata has now been cleaned up and can be added to the STAC API.
Here we create the full collection, but it's also possible to only add the generated item to a new collection.

At this point, it is also recommended to improve the metadata quality by adding additional metadata properties.

### Important note

At this point, the actual Geotiff is still located on the openEO backend, and can be accessed by any tool via the signed url.
This url will however expire, so for more permanent catalogues, the data file needs to be moved to a better location.
To do this, simply download the tiff file and upload it to an online location of your preference.


In [31]:
CATALOGUE = "https://catalogue.demo.apex.esa.int" 

### Authentication for APEx Product Catalogue

To interact with an APEx Product Catalogue, you need a valid authentication token. This token ensures that you can write metadata to an APEx-initialized STAC catalogue, provided you have been granted sufficient access rights.

To obtain an authentication token, follow the dedicated guide on [generating your OIDC token](token.md). This step is essential for securely accessing and modifying catalogue records.

In [None]:
# Helper class to support the authentication with owslib and requests libraries
class BearerAuth(requests.auth.AuthBase):
    def __init__(self, token):
        self.token = token
    def __call__(self, r):
        r.headers["authorization"] = "Bearer " + self.token
        return r
    

token = input("Please provide your OIDC token here")
auth = BearerAuth(token)

In [34]:
r = Records(CATALOGUE,auth=Authentication(auth_delegate=auth))
print(json.dumps(r.collections(), indent=2))

{
  "collections": [],
  "links": [
    {
      "rel": "root",
      "type": "application/json",
      "href": "https://catalogue.demo.apex.esa.int/"
    },
    {
      "rel": "parent",
      "type": "application/json",
      "href": "https://catalogue.demo.apex.esa.int/"
    },
    {
      "rel": "self",
      "type": "application/json",
      "href": "https://catalogue.demo.apex.esa.int/collections"
    }
  ]
}


In [35]:
coll_dict = collection.to_dict()

default_auth = {
    "_auth": {
        "read": ["anonymous"],
        "write": ["stac-admin-prod"]
    }
}

coll_dict.update(default_auth)

In [36]:
response = requests.post(f"{CATALOGUE}/collections", auth=auth,json=coll_dict)
response

<Response [201]>

In [37]:
print(json.dumps(r.collections()['collections'][0], indent=2))

{
  "id": "gran_canaria_ndvi",
  "description": "Results for batch job j-250313110405469ea3095a5b2b06ce85",
  "stac_version": "1.0.0",
  "links": [
    {
      "rel": "items",
      "type": "application/geo+json",
      "href": "https://catalogue.demo.apex.esa.int/collections/gran_canaria_ndvi/items"
    },
    {
      "rel": "parent",
      "type": "application/json",
      "href": "https://catalogue.demo.apex.esa.int/"
    },
    {
      "rel": "root",
      "type": "application/json",
      "href": "https://catalogue.demo.apex.esa.int/"
    },
    {
      "rel": "self",
      "type": "application/json",
      "href": "https://catalogue.demo.apex.esa.int/collections"
    },
    {
      "href": "https://catalogue.demo.apex.esa.int/eodata/Sentinel-2/MSI/L2A_N0500/2019/08/19/S2A_MSIL2A_20190819T115221_N0500_R123_T28RDR_20230507T090239.SAFE",
      "rel": "derived_from",
      "type": "application/json",
      "title": "Derived from /eodata/Sentinel-2/MSI/L2A_N0500/2019/08/19/S2A_MSIL2A_

In [23]:
requests.delete(f"{CATALOGUE}/collections/" + collection.id, auth=auth)

<Response [204]>

# Use Case 2 and 3: Using the `export_workspace` Process  

In these use cases, openEO exports the collection directly to external storage. This is enabled through the [Workspaces](https://docs.openeo.cloud/workspaces/) API, which provides greater control over your data—allowing you to define where it is saved and from where it is loaded.  

There are multiple options for creating a workspace. You can either provision one through a given backend or register your own storage location, such as an S3 bucket, to directly store openEO results.  

Since these use cases rely on the availability of the `export_workspace` process, we first check whether it is supported.  

In [38]:
con.describe_process("export_workspace")

In [None]:
exported_result = result.export_workspace("my_workspace_id", merge="my_stac_collection")
job = exported_result.execute_batch("gran_canaria.tiff",title="APEx Demo - Gran Canaria - NDVI",filename_prefix="gran_canaria")