# Demonstrate ADES Execution for OGC Application Packages
This notebook runs through some example API calls to the ADES (Application, Deployment Execution Service) component of the EODH Platform

In [None]:
from __future__ import annotations

!pip install urllib3

In [None]:
!pip install python-dotenv

In [None]:
import json
import time

import urllib3

http = urllib3.PoolManager(cert_reqs="CERT_NONE")
urllib3.disable_warnings()  # to avoid SSL warnings

In [None]:
# Define text colour for later output
class bcolors:
    OKBLUE = "\033[94m"
    OKGREEN = "\033[92m"
    WARNING = "\033[93m"
    ENDC = "\033[0m"

In [None]:
# Place your username and password here


workspace = "jakubstaszel"

# Update these variables as required to identify the running ades instance and specify workspace name
# If the workspace does not yet exect, it will be created by the ades automatically
ades_endpoint = "test.eodatahub.org.uk/ades"
user = "jakubstaszel"

## Additional process from Spyrosoft
raster-calculate is an example tool developed to make sure we are able to integrate with ADES. The rest of the code comes from the team developing ADES.

## Below are some example API requests you can make to the ADES component
Feel free to run these examples and change the inputs by specifying the application packages, process name and process inputs.
All outputs can be found in the S3 bucket [eodhp-ades](https://s3.console.aws.amazon.com/s3/buckets/eodhp-ades?region=eu-west-2&bucketType=general&tab=objects).

As an example we provide three EOEPCA-developed OGC Application Package to demonstrate the successful execution using the ADES deployment:
- [convert-url](https://github.com/EOEPCA/convert/blob/main/convert-url-app.cwl) - take an image specified by a URL and resize it by a given scale percentage
- [convert-stac](https://github.com/EOEPCA/convert/blob/main/convert-stac-app.cwl) - take an image specified by a stac item and resize it by a given scale percentage
- [water-bodies](https://github.com/EOEPCA/deployment-guide/blob/main/deploy/samples/requests/processing/water-bodies-app.cwl) - takes STAC items, area of interest, epsg definition and set of bands and identifies water bodies based on NDWI and Otsu threshold

This application is specified by configuring the below variable

In [None]:
process_to_be_run = "raster-calculate"

In [None]:
# Automated configuration of CWL script location, process name and inputs
if process_to_be_run == "raster-calculate":
    cwl_location = "https://raw.githubusercontent.com/EO-DataHub/eodh-workflows/main/cwl_files/raster-calculate-app.cwl"
    process_name = "raster-calculate"
    inputs_dict = {
        "inputs": {
            "workspace": workspace,
            "stac_collection": "sentinel-2-l2a",
            "aoi": '{"type": "Polygon","coordinates": [[[14.763294437090849, 50.833598186651244],[15.052268923898112, 50.833598186651244],[15.052268923898112, 50.989077215056824],[14.763294437090849, 50.989077215056824],[14.763294437090849, 50.833598186651244]]]}',
            "date_start": "2024-04-03",
            "date_end": "2024-08-01",
            "index": "ndvi",
        }
    }
elif process_to_be_run == "convert-url":
    process_name = "convert-url"
    cwl_location = "https://raw.githubusercontent.com/EOEPCA/deployment-guide/main/deploy/samples/requests/processing/convert-url-app.cwl"
    inputs_dict = {
        "inputs": {
            "workspace": workspace,
            "fn": "resize",
            "url": "https://eoepca.org/media_portal/images/logo6_med.original.png",
            "size": "50%",
        }
    }
elif process_to_be_run == "convert-stac":
    process_name = "convert-stac"
    cwl_location = "https://raw.githubusercontent.com/EOEPCA/deployment-guide/main/deploy/samples/requests/processing/convert-stac-app.cwl"
    inputs_dict = {
        "inputs": {
            "workspace": workspace,
            "fn": "resize",
            "stac": "https://raw.githubusercontent.com/EOEPCA/convert/main/stac/eoepca-logo.json",
            "size": "50%",
        }
    }

elif process_to_be_run == "water-bodies":
    process_name = "water-bodies"
    cwl_location = "https://raw.githubusercontent.com/EOEPCA/deployment-guide/main/deploy/samples/requests/processing/water-bodies-app.cwl"
    inputs_dict = {
        "inputs": {
            "workspace": workspace,
            "stac_items": [
                "https://test.eodatahub.org.uk/catalogue-data/element84-data/collections/sentinel-2-c1-l2a/items/S2B_T42MVU_20240319T054135_L2A.json"
            ],
            "aoi": "68.09, -6.42, 69.09, -5.43",
            "epsg": "EPSG:4326",
            "bands": ["green", "nir"],
        }
    }

else:
    raise ValueError(
        f"unknown process: {process_to_be_run}",
    )

### List processes

In [None]:
import os
from urllib.parse import urljoin

import requests
from dotenv import load_dotenv

load_dotenv()

PLATFORM_URL = os.environ.get("KEYCLOAK_URL", default="https://test.eodatahub.org.uk/")
KEYCLOAK_REALM = os.environ.get("KEYCLOAK_REALM", default="eodhp")
CLIENT_ID = os.environ.get("CLIENT_ID", default=None)
CLIENT_SECRET = os.environ.get("CLIENT_SECRET", default=None)
USERNAME = os.environ.get("PORTAL_USERNAME", default=None)
PASSWORD = os.environ.get("PORTAL_PASSWORD", default=None)
HEADERS = {"Content-Type": "application/x-www-form-urlencoded"}

TOKEN_URL = urljoin(
    PLATFORM_URL,
    f"/keycloak/realms/{KEYCLOAK_REALM}/protocol/openid-connect/token",
)


def get_refresh_token() -> str:
    response = requests.post(
        TOKEN_URL,
        headers=HEADERS,
        data={
            "client_id": CLIENT_ID,
            "client_secret": CLIENT_SECRET,
            "username": USERNAME,
            "password": PASSWORD,
            "grant_type": "password",
            "scope": "offline_access",
        },
    )
    if response.ok:
        return response.json()["refresh_token"]
    raise Exception("Failed to get offline refresh token: %s", response.json())


def get_access_token(refresh_token: str) -> str:
    response = requests.post(
        TOKEN_URL,
        headers=HEADERS,
        data={
            "client_id": CLIENT_ID,
            "client_secret": CLIENT_SECRET,
            "refresh_token": refresh_token,
            "grant_type": "refresh_token",
            "scope": "offline_access",
        },
    )
    if response.ok:
        return response.json()["access_token"]
    print(response.json())
    Exception("Failed to get user access token: %s", response.json())


def check_token(token: str) -> dict:
    response = requests.post(
        TOKEN_URL + "/introspect",
        headers=HEADERS,
        data={
            "client_id": CLIENT_ID,
            "client_secret": CLIENT_SECRET,
            "token": token,
        },
    )
    if response.ok:
        return response.json()
    Exception("Failed to check token: %s", response.json())


def test_token(token: str) -> bool:
    url = urljoin(PLATFORM_URL, "/api/demo/execute")
    response = requests.get(url, headers={"Authorization": f"Bearer {token}"})

    return 200 <= response.status_code < 300


refresh_token = get_refresh_token()
access_token = get_access_token(refresh_token)
token_status = check_token(access_token)

print(f"Refresh token: {refresh_token}\n")
print(f"Access token: {access_token}\n")
print(
    f"Token status: {json.dumps(token_status, indent=2)}\n",
)

if test_token(access_token):
    print("Test SUCCESS")
else:
    print("Test FAILED")

In [None]:
url = f"https://{ades_endpoint}/{user}/ogc-api/processes"
headers = {"Accept": "application/json"}
auth_dict = {
    "Authorization": f"Bearer {access_token}",
}
headers.update(auth_dict)
response = http.request("GET", url, headers=headers)
json.loads(response.data)

### Undeploy/Delete process

In [None]:
# Here a 204 response means the process was remove successfully, no other content is returned
url = f"https://{ades_endpoint}/{user}/ogc-api/processes/{process_name}"
headers = {"Accept": "application/json"}
headers.update(auth_dict)
response = http.request("DELETE", url, headers=headers)
response.status

### Deploy processes

In [None]:
url = f"https://{ades_endpoint}/{user}/ogc-api/processes"
headers = {"Accept": "application/json", "Content-Type": "application/json"}
headers.update(auth_dict)
params = {"executionUnit": {"href": cwl_location, "type": "application/cwl"}}
response = http.request("POST", url, headers=headers, body=json.dumps(params))
deployStatus = response.headers["Location"]
json.loads(response.data)

### Get deploy status

In [None]:
url = f"{deployStatus}"
headers = {"Accept": "application/json"}
headers.update(auth_dict)
response = http.request("GET", url, headers=headers)
json.loads(response.data)

### Get process details

In [None]:
url = f"https://{ades_endpoint}/{user}/ogc-api/processes/{process_name}"
headers = {"Accept": "application/json"}
headers.update(auth_dict)
response = http.request("GET", url, headers=headers)
json.loads(response.data)

### Execute process

In [None]:
url = f"https://{ades_endpoint}/{user}/ogc-api/processes/{process_name}/execution"
headers = {"Accept": "application/json", "Content-Type": "application/json", "Prefer": "respond-async"}
headers.update(auth_dict)
params = {**inputs_dict}
print(json.dumps(params))
time.sleep(5)
response = http.request("POST", url, headers=headers, body=json.dumps(params))
executeStatus = response.headers["Location"]
json.loads(response.data)

### Get execute status
See the following section to continually poll this function instead to determine once complete

In [None]:
url = f"{executeStatus}"
headers = {"Accept": "application/json"}
headers.update(auth_dict)
params = {}
# time.sleep(5)
response = http.request("GET", url, headers=headers)
json.loads(response.data)

### Get execute status (continuous polling)
Run this cell to keep polling the ExecuteStatus endpoint to determine when the process has finished running and also see it's final status: *SUCCESS* or *FAILED*

In [None]:
url = f"{executeStatus}"
headers = {"Accept": "application/json"}
headers.update(auth_dict)
response = http.request("GET", url, headers=headers)
data = json.loads(response.data)
status = data["status"]
message = data["message"]
print("Status is " + bcolors.OKBLUE + status.upper() + bcolors.ENDC)
print("Message is " + "\033[1m" + message + "\033[0m", end="")
old_message = message
old_status = status

while status == "running":
    time.sleep(2)
    response = http.request("GET", url, headers=headers)
    data = json.loads(response.data)
    status = data["status"]
    message = data["message"]
    if status != old_status:
        print("\n")
        print("Status is " + bcolors.OKBLUE + status.upper() + bcolors.ENDC)
        print("Message is " + "\033[1m" + message + "\033[0m", end="")
    elif message != old_message:
        print(".")
        print("Message is " + "\033[1m" + message + "\033[0m", end="")
    else:
        print(".", end="")
    old_message = message
    old_status = status

if status == "successful":
    print("\n")
    print(bcolors.OKGREEN + "SUCCESS" + bcolors.ENDC)

if status == "failed":
    print(bcolors.WARNING + "FAILED" + bcolors.ENDC)

### Get processing results

In [None]:
# Note, this will return a 500 response when no output is produced
url = f"{executeStatus}/results"
headers = {"Accept": "application/json"}
headers.update(auth_dict)
params = {}
response = http.request("GET", url, headers=headers)
json.loads(response.data)

### List jobs

In [None]:
url = f"https://{ades_endpoint}/{user}/ogc-api/jobs"
headers = {"Accept": "application/json"}
headers.update(auth_dict)
response = http.request("GET", url, headers=headers)
json.loads(response.data)

### Undeploy/Delete process

In [None]:
# Here a 204 response means the process was remove successfully, no other content is returned
url = f"https://{ades_endpoint}/{user}/ogc-api/processes/water-bodies"
headers = {"Accept": "application/json"}
headers.update(auth_dict)
response = http.request("DELETE", url, headers=headers)
response.status