# GNEO Axis 3 Hub Processing demo
Demonstration of Processing endpoints.

In [13]:
%load_ext autoreload
%autoreload 2
import jwt
import json
from owslib.csw import CatalogueServiceWeb
import os
from time import sleep

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


## Initialise

Platform domain...

In [14]:
#https://iam.ax3hub-dev.planetek.net/realms/axes-3/protocol/openid-connect/token
base_domain = "ax3hub-dev.planetek.net"

### User
User authenticates and the client receives an ID Token (JWT) that represents the user.

In [26]:
#-------------------------------------------------------------------------------
# Helper function to authenticate user and get access token
#-------------------------------------------------------------------------------
import requests

realm = "axes-3"
keycloak = f"https://iam.{base_domain}"
token_endpoint = f"{keycloak}/realms/{realm}/protocol/openid-connect/token"
client_id = "admin-cli"
client_secret = "changeme"

print(token_endpoint)
def get_user_token(username, password):
    headers = {
        "Cache-Control": "no-cache",
        "Content-Type": "application/x-www-form-urlencoded"
    }
    data = {
        #"scope": "openid profile email",
        "grant_type": "password",
        "username": username,
        "password": password,
        "client_id": client_id,
        #"client_secret": client_secret
    }
    response = requests.post(token_endpoint, headers=headers, data=data)
    if response.ok:
        return response.json()["access_token"]
    else:
        print(response)
        return None

https://iam.ax3hub-dev.planetek.net/realms/axes-3/protocol/openid-connect/token


In [30]:
#-------------------------------------------------------------------------------
# Authenticate as user `eric` and get an access token
#-------------------------------------------------------------------------------
USER_NAME="gfenoy"
USER_PASSWORD="v>Mp5>424`;8"
user_access_token = get_user_token(USER_NAME, USER_PASSWORD)

headers = {
    "Accept": "application/json",
    "Authorization": f"Bearer {user_access_token}"
}

## ZOO-Project-DRU
The ZOO-Project-DRU provides user-specific endpoints, using a URL path prefix.

In [31]:
# ZOO-Project-DRU URLs
ades_base_url = f"https://processing.{base_domain}"
ades_proc_url = f"{ades_base_url}/{USER_NAME}/ogc-api"
print(f"ZOO-Project-DRU API Processes endpoint for user {USER_NAME}: {ades_proc_url}")

ZOO-Project-DRU API Processes endpoint for user gfenoy: https://processing.ax3hub-dev.planetek.net/gfenoy/ogc-api


### ZOO-Project-DRU API Processes

In [32]:
app_name = "convert-url"

#### ZOO-Project-DRU: List Processes
**GET {service_url}/processes**<br>
Provides a list of all processes 

In [33]:
# API Processes - List Processes
response = requests.get(f"{ades_proc_url}/processes", headers=headers)
response.raise_for_status()

process_list = response.json()
print("Processes...")
app_is_already_deployed = False
for process in process_list["processes"]:
    process_id = process["id"]
    if process_id == app_name:
        app_is_already_deployed = True
    print("  ", process_id)

Processes...
   echo


#### ZOO-Project-DRU: Undeploy Application (in case its already deployed)
**DELETE {service_url}/processes/{application_name}**<br>
Undeploys application from the ZOO-Project-DRU

In [34]:
# API Processes - Undeploy Application

if app_is_already_deployed:
    print(f"Application {app_name} is already deployed. Undeploying...")

    response = requests.delete(
        f"{ades_proc_url}/processes/{app_name}",
        headers=headers
    )
    response.raise_for_status()
else:
    print(f"Application {app_name} is NOT yet deployed. No need to delete.")

Application convert-url is NOT yet deployed. No need to delete.


In [24]:
# API Processes - List Processes
response = requests.get(f"{ades_proc_url}/processes", headers=headers)
response.raise_for_status()

process_list = response.json()
print("Processes...")
for process in process_list["processes"]:
    print("  ", process["id"])

Processes...
   echo


#### ZOO-Project-DRU: Deploy Application

**POST {service_url}/processes**<br>
Deploys application to the ZOO-Project-DRU.<br>
Request body is json in which the input is the Application Package describing the application.<br>
The Application Package is a CWL Workflow that is typically provided as an href with content type **_application/atom+xml_** or **_application/cwl_**. The url to the application package can be either a http or s3 endpoint.

In [35]:
# API Processes - Deploy Application
response = requests.post(
    f"{ades_proc_url}/processes",
    headers = dict([("Content-Type", "application/json")], **headers),
    json = {
      "executionUnit": {
        "href": "https://raw.githubusercontent.com/EOEPCA/deployment-guide/main/scripts/processing/oapip/examples/convert-url-app.cwl",
        "type": "application/cwl"
      }
    }
)
if response.status_code == 409:
    print(f"Application {app_name} is already deployed")
else:
    response.raise_for_status()
response

<Response [201]>

In [36]:
response.content

b'{"id":"convert-url","title":"convert url app","description":"Convert URL","mutable":true,"version":"0.1.2","metadata":[{"role":"https://schema.org/softwareVersion","value":"0.1.2"}],"outputTransmission":["value","reference"],"jobControlOptions":["async-execute","dismiss"],"links":[{"rel":"http://www.opengis.net/def/rel/ogc/1.0/execute","type":"application/json","title":"Execute End Point","href":"https://processing.ax3hub-dev.planetek.net/gfenoy/ogc-api/processes/convert-url/execution"}]}\n'

Check that the application has been correctly deployed

In [37]:
# API Processes - List Processes
response = requests.get(f"{ades_proc_url}/processes", headers=headers)
response.raise_for_status()

process_list = response.json()
print("Processes...")
for process in process_list["processes"]:
    print("  ", process["id"])

Processes...
   echo
   convert-url


#### ZOO-Project-DRU: Get Application Details
**GET {service_url}/processes/{application_name}**<br>
Provides details of the deployed application<br>
The response includes the API Processes json application description.

In [38]:
# API Processes - Get Application Details
response = requests.get(f"{ades_proc_url}/processes/{app_name}", headers=headers)
response.raise_for_status()

print(json.dumps(response.json(), indent = 2))

{
  "id": "convert-url",
  "title": "convert url app",
  "description": "Convert URL",
  "mutable": true,
  "version": "0.1.2",
  "metadata": [
    {
      "role": "https://schema.org/softwareVersion",
      "value": "0.1.2"
    }
  ],
  "outputTransmission": [
    "value",
    "reference"
  ],
  "jobControlOptions": [
    "async-execute",
    "dismiss"
  ],
  "links": [
    {
      "rel": "http://www.opengis.net/def/rel/ogc/1.0/execute",
      "type": "application/json",
      "title": "Execute End Point",
      "href": "https://processing.ax3hub-dev.planetek.net/gfenoy/ogc-api/processes/convert-url/execution"
    }
  ],
  "inputs": {
    "fn": {
      "title": "the operation to perform",
      "description": "the operation to perform",
      "schema": {
        "type": "string",
        "default": "resize",
        "nullable": true
      }
    },
    "size": {
      "title": "the percentage for a resize operation",
      "description": "the percentage for a resize operation",
      "

#### ZOO-Project-DRU: Execute Application

**POST {service_url}/processes/{application_name}/execution**<br>
Request body is json as defined by API Processes to define the inputs and outputs, consistent with the CWL Workflow application package.<br>
The response returns **201 CREATED** to indicate that the job has been successfully initiated.<br>
The response **Location header** provides the path (/processes/{application_name}/jobs/{job_id}) to follow the job status.

In [39]:
# API Processes - Execute Application
response = requests.post(
    f"{ades_proc_url}/processes/{app_name}/execution",
    headers = dict([("Content-Type", "application/json"), ("Prefer", "respond-async")], **headers),
    json = {
      "inputs": {
          "fn": "resize",
          "url":  "https://eoepca.org/media_portal/images/logo6_med.original.png",
          "size": "50%"
      }
    }
)
response.raise_for_status()
response.content

b'{"progress":0,"id":"539473e4-05a5-11f0-8314-befc17a3de49","jobID":"539473e4-05a5-11f0-8314-befc17a3de49","type":"process","processID":"convert-url","created":"2025-03-20T16:06:48.309Z","started":"2025-03-20T16:06:48.309Z","updated":"2025-03-20T16:06:48.309Z","status":"running","message":"ZOO-Kernel accepted to run your service!","links":[{"title":"Status location","rel":"monitor","type":"application/json","href":"https://processing.ax3hub-dev.planetek.net/gfenoy/ogc-api/jobs/539473e4-05a5-11f0-8314-befc17a3de49"}]}\n'

In [40]:
job_status_url = response.headers["Location"]
job_status_url

'https://processing.ax3hub-dev.planetek.net/gfenoy/ogc-api/jobs/539473e4-05a5-11f0-8314-befc17a3de49'

#### ZOO-Project-DRU: Job Status

**GET {service_url}/jobs/{job_id}**<br>
Checks the status of the previously submitted job - using the URL returned in the Location header of the execute request.<br>
The response body json provides a status string (success/running/failed) and a % progress.<br>
In the case of a failure then a descriptive message is provided.

In [41]:
# API Processes - Job Status
def get_job_status():
    return requests.get(job_status_url, headers=headers)

response = get_job_status()
response.raise_for_status()

print(json.dumps(response.json(), indent = 2))

{
  "progress": 23,
  "id": "539473e4-05a5-11f0-8314-befc17a3de49",
  "jobID": "539473e4-05a5-11f0-8314-befc17a3de49",
  "type": "process",
  "processID": "convert-url",
  "created": "2025-03-20T16:06:48.309Z",
  "started": "2025-03-20T16:06:48.309Z",
  "updated": "2025-03-20T16:07:35.991Z",
  "status": "running",
  "message": "execution submitted",
  "links": [
    {
      "title": "Status location",
      "rel": "monitor",
      "type": "application/json",
      "href": "https://processing.ax3hub-dev.planetek.net/gfenoy/ogc-api/jobs/539473e4-05a5-11f0-8314-befc17a3de49"
    }
  ]
}


In [None]:
# API Processes - Job Status (keep polling for completion)
status = "running"
interval = 10
error_count = 0
max_error_count = 5
while status == 'running':
    response = get_job_status()
    # Expecting a 200 response
    if response.status_code == 200:
        error_count = 0
        status = response.json()["status"]
        print(f"Job status is: {status}", end="")
        if status != 'successful' and status != 'failed':
            print(" - waiting...")
            sleep(interval)
        else:
            print(" - DONE")
            break
    # Unexpected response, latch the error
    else:
        error_count += 1
        print(f"Unexpected response {response.status_code} requesting job status")
        sleep(interval)
    # Too many consecutive errors
    if error_count > max_error_count:
        print("ERROR: Too many failed attempts to get job status")
        break

print(f"Processing completed with status: {status}")

Job status is: running - waiting...
Job status is: running - waiting...
Job status is: running - waiting...
Job status is: running - waiting...


In [36]:
print(f"status: {response.status_code}\nheaders: {json.dumps(dict(response.headers), indent=2)}\nbody...\n{json.dumps(response.json(), indent=2)}")

status: 200
headers: {
  "Content-Type": "application/json;charset=UTF-8",
  "Transfer-Encoding": "chunked",
  "Connection": "keep-alive",
  "Date": "Wed, 19 Mar 2025 12:01:44 GMT",
  "X-Powered-By": "ZOO-Project-DRU",
  "X-Also-Powered-By": "jwt.securityIn",
  "X-Also-Also-Powered-By": "dru.securityIn",
  "Server": "APISIX/3.11.0"
}
body...
{
  "id": "b2ff1aba-04b9-11f0-ba55-d6587e5c8cf1",
  "jobID": "b2ff1aba-04b9-11f0-ba55-d6587e5c8cf1",
  "type": "process",
  "processID": "convert-url",
  "created": "2025-03-19T12:00:07.372Z",
  "started": "2025-03-19T12:00:07.372Z",
  "finished": "2025-03-19T12:01:39.814Z",
  "updated": "2025-03-19T12:01:39.423Z",
  "status": "successful",
  "message": "ZOO-Kernel successfully run your service!",
  "links": [
    {
      "title": "Status location",
      "rel": "monitor",
      "type": "application/json",
      "href": "https://processing.ax3hub-dev.planetek.net/gfenoy/ogc-api/jobs/b2ff1aba-04b9-11f0-ba55-d6587e5c8cf1"
    },
    {
      "title": 

#### ZOO-Project-DRU: Job Result
**GET {service_url}/jobs/{job_id}/results**<br>
Returns details of the outputs for a successful job execution.<br>
The response body provides json data that includes the reference to the STAC file that indexes the processing outputs.

In [37]:
# API Processes - Job Result
response = requests.get(f"{job_status_url}/results", headers=headers)
response.raise_for_status()

print(json.dumps(response.json(), indent = 2))

{
  "type": "FeatureCollection",
  "features": [
    {
      "type": "Feature",
      "stac_version": "1.0.0",
      "id": "logo6_med.original-resize-1742385668.200845929",
      "properties": {
        "created": "2025-03-19T12:01:08.200Z",
        "datetime": "2025-03-19T12:01:08.200000Z",
        "updated": "2025-03-19T12:01:08.200Z"
      },
      "geometry": {
        "type": "Polygon",
        "coordinates": [
          [
            [
              -180,
              -90
            ],
            [
              -180,
              90
            ],
            [
              180,
              90
            ],
            [
              180,
              -90
            ],
            [
              -180,
              -90
            ]
          ]
        ]
      },
      "links": [
        {
          "rel": "root",
          "href": "s3://eks-1-processing/processing-results/b2ff1aba-04b9-11f0-ba55-d6587e5c8cf1/catalog.json",
          "type": "application/json"
      

From the S3 url let's retrieve the folder in the bucket where the result files are stored.

In [None]:
processing_results = response.json()
result_location = "Not found"
for link in processing_results["links"]:
    if link["rel"] == "root":
        result_location = link["href"]

print(f"Processing results root STAC Catalog: {result_location}")

result_folder_name = result_location.rsplit('/', 1)[0].rsplit(':', 1)[1].replace("//"+bucket_name+"/","")
print(f"Folder name for results in Object Storage: {result_folder_name}")

#### ZOO-Project-DRU: List Jobs

In [39]:
# API Processes - List Jobs
response = requests.get(f"{ades_proc_url}/jobs", headers=headers)
response.raise_for_status()

jobs_list = response.json()
print(f"Listing {jobs_list['numberTotal']} jobs...")
for job in jobs_list["jobs"]:
    print(f"  {job.get('finished', '???'):^24} - {job['jobID']} - {job['status']:<12} - {job['processID']}")

Listing 1 jobs...
  2025-03-19T12:01:39.814Z - b2ff1aba-04b9-11f0-ba55-d6587e5c8cf1 - successful   - convert-url


#### ZOO-Project-DRU: Undeploy Application
**DELETE {service_url}/processes/{application_name}**<br>
Undeploys application from the ADES

In [40]:
# API Processes - Undeploy Application
response = requests.delete(
    f"{ades_proc_url}/processes/{app_name}",
    headers=headers
)
response.raise_for_status()

In [41]:
# API Processes - List Processes
response = requests.get(f"{ades_proc_url}/processes", headers=headers)
response.raise_for_status()

process_list = response.json()
print("Processes...")
for process in process_list["processes"]:
    print("  ", process["id"])

Processes...
   echo


#### Workspace: Inspect results
Let's inspect the result files in the user S3 bucket.

In [None]:
# Quick hack S3 access
import boto3

# Init S3 session for Creodias
S3_ENDPOINT = f"https://minio.{base_domain}"
session = boto3.session.Session()
s3resource = session.resource('s3', aws_access_key_id=s3_access, aws_secret_access_key=s3_secret, endpoint_url=S3_ENDPOINT)

# List bucket contents
bucket = s3resource.Bucket(bucket_name)
for obj in bucket.objects.filter(Prefix=result_folder_name):
    print(' ->', obj.key)