# Processing
Demonstration of Processing endpoints.

In [None]:
%load_ext autoreload
%autoreload 2
import jwt
import json
from owslib.csw import CatalogueServiceWeb
import os
from time import sleep

## Initialise

Platform domain...

In [None]:
base_domain = "develop.eoepca.org"

### User
User authenticates and the client receives an ID Token (JWT) that represents the user.

In [None]:
#-------------------------------------------------------------------------------
# Helper function to authenticate user and get access token
#-------------------------------------------------------------------------------
import requests

realm = "master"
keycloak = f"https://keycloak-v1x.{base_domain}"
token_endpoint = f"{keycloak}/realms/{realm}/protocol/openid-connect/token"
client_id = "admin-cli"
client_secret = "changeme"

def get_user_token(username, password):
    headers = {
        "Cache-Control": "no-cache",
        "Content-Type": "application/x-www-form-urlencoded"
    }
    data = {
        "scope": "openid profile email",
        "grant_type": "password",
        "username": username,
        "password": password,
        "client_id": client_id,
        "client_secret": client_secret
    }
    response = requests.post(token_endpoint, headers=headers, data=data)
    if response.ok:
        return response.json()["access_token"]
    else:
        print(response)
        return None

In [None]:
#-------------------------------------------------------------------------------
# Authenticate as user `eric` and get an access token
#-------------------------------------------------------------------------------
USER_NAME="eric"
USER_PASSWORD="changeme"
user_access_token = get_user_token(USER_NAME, USER_PASSWORD)

## Workspace

In [None]:
workspace_url = "https://workspace-api." + base_domain
workspace_prefix = "ws"

### Workspace: Get Details
Retrieve access information to the user workspace S3 bucket (bucket name, access key and secret access key)

In [None]:
# Workspace - Get Details
workspace_name = f"{workspace_prefix}-{USER_NAME.lower()}"

headers = {
    "Accept": "application/json",
    "Authorization": f"Bearer {user_access_token}"
}
response = requests.get(f"{workspace_url}/workspaces/{workspace_name}", headers=headers)

response.raise_for_status()
workspace_details = response.json()
print(json.dumps(workspace_details["storage"], indent = 2))

### Inspect User Workspace
Using the access credentials, the user can inspect his S3 bucket.

In [None]:
# Bucket details
bucket_name = workspace_details["storage"]["credentials"]["bucketname"]
s3_access = workspace_details["storage"]["credentials"]["access"]
s3_secret = workspace_details["storage"]["credentials"]["secret"]

In [None]:
# Quick hack S3 access
import boto3

# Init S3 session for Creodias
S3_ENDPOINT = f"https://minio.{base_domain}"
session = boto3.session.Session()
s3resource = session.resource('s3', aws_access_key_id=s3_access, aws_secret_access_key=s3_secret, endpoint_url=S3_ENDPOINT)

# List bucket contents
bucket = s3resource.Bucket(bucket_name)
for obj in bucket.objects.all():
    print(' ->', obj.key)

### Upload Application Package and register it
User uploads the application package to his workspace

In [None]:
object = s3resource.Object(bucket_name, 'application-package/s-expression/s-expression-0_0_2.cwl')
result = object.put(Body=open('../data/s-expression-cwl.cwl', 'rb'))
res = result.get('ResponseMetadata')
if res.get('HTTPStatusCode') == 200:
    print('Application package uploaded successfully')
else:
    print('Application package not uploaded')

User registers the Application Package through the Workspace API

In [None]:
application_url = f'{bucket_name}/application-package/s-expression/s-expression-0_0_2.cwl'
response = requests.post(
    f"{workspace_url}/workspaces/{workspace_name}/register",
    headers = dict([("Content-Type", "application/json")], **headers),
    json= {
        "type": "application",
        "url": application_url,
    }
)
response.raise_for_status()
response

User inspects the registered Application Package through the Resource Catalogue API

In [None]:
app_id= "s-expression"
workspace_endpoint = f'https://resource-catalogue.{workspace_name}.{base_domain}/csw'
print(f"{USER_NAME}'s workspace (catalogue): {workspace_endpoint}")

csw = CatalogueServiceWeb(workspace_endpoint,timeout=30,headers=headers)
csw.getrecords2(maxrecords=10)

print(f"{USER_NAME}'s workspace records:\n")
for rec in csw.records:
    print(f'identifier: {csw.records[rec].identifier}\ntype: {csw.records[rec].type}\ntitle: {csw.records[rec].title}\n')

    print(f"URLs for Application {app_id}:")
csw.records[app_id].references

## ADES
The ADES provides user-specific endpoints, using a URL path prefix.

In [None]:
# ADES URLs
ades_base_url = f"https://zoo.{base_domain}"
ades_proc_url = f"{ades_base_url}/{USER_NAME}/ogc-api"
print(f"ADES API Processes endpoint for user {USER_NAME}: {ades_proc_url}")

### ADES API Processes

In [None]:
app_name = "convert-url"

#### ADES: List Processes
**GET {service_url}/processes**<br>
Provides a list of all processes 

In [None]:
# API Processes - List Processes
response = requests.get(f"{ades_proc_url}/processes", headers=headers)
response.raise_for_status()

process_list = response.json()
print("Processes...")
app_is_already_deployed = False
for process in process_list["processes"]:
    process_id = process["id"]
    if process_id == app_name:
        app_is_already_deployed = True
    print("  ", process_id)

#### ADES: Undeploy Application (in case its already deployed)
**DELETE {service_url}/processes/{application_name}**<br>
Undeploys application from the ADES

In [None]:
# API Processes - Undeploy Application

if app_is_already_deployed:
    print(f"Application {app_name} is already deployed. Undeploying...")

    response = requests.delete(
        f"{ades_proc_url}/processes/{app_name}",
        headers=headers
    )
    response.raise_for_status()
else:
    print(f"Application {app_name} is NOT yet deployed. No need to delete.")

In [None]:
# API Processes - List Processes
response = requests.get(f"{ades_proc_url}/processes", headers=headers)
response.raise_for_status()

process_list = response.json()
print("Processes...")
for process in process_list["processes"]:
    print("  ", process["id"])

#### ADES: Deploy Application

**POST {service_url}/processes**<br>
Deploys application to the ADES.<br>
Request body is json in which the input is the Application Package describing the application.<br>
The Application Package is a CWL Workflow that is typically provided as an href with content type **_application/atom+xml_** or **_application/cwl_**. The url to the application package can be either a http or s3 endpoint.

In [None]:
# API Processes - Deploy Application
response = requests.post(
    f"{ades_proc_url}/processes",
    headers = dict([("Content-Type", "application/json")], **headers),
    json = {
      "executionUnit": {
        "href": "https://raw.githubusercontent.com/EOEPCA/deployment-guide/main/deploy/samples/requests/processing/convert-url-app.cwl",
        "type": "application/cwl"
      }
    }
)
if response.status_code == 409:
    print(f"Application {app_name} is already deployed")
else:
    response.raise_for_status()
response

In [None]:
response.content

Check that the application has been correctly deployed

In [None]:
# API Processes - List Processes
response = requests.get(f"{ades_proc_url}/processes", headers=headers)
response.raise_for_status()

process_list = response.json()
print("Processes...")
for process in process_list["processes"]:
    print("  ", process["id"])

#### ADES: Get Application Details
**GET {service_url}/processes/{application_name}**<br>
Provides details of the deployed application<br>
The response includes the API Processes json application description.

In [None]:
# API Processes - Get Application Details
response = requests.get(f"{ades_proc_url}/processes/{app_name}", headers=headers)
response.raise_for_status()

print(json.dumps(response.json(), indent = 2))

#### ADES: Execute Application

**POST {service_url}/processes/{application_name}/execution**<br>
Request body is json as defined by API Processes to define the inputs and outputs, consistent with the CWL Workflow application package.<br>
The response returns **201 CREATED** to indicate that the job has been successfully initiated.<br>
The response **Location header** provides the path (/processes/{application_name}/jobs/{job_id}) to follow the job status.

In [None]:
# API Processes - Execute Application
response = requests.post(
    f"{ades_proc_url}/processes/{app_name}/execution",
    headers = dict([("Content-Type", "application/json"), ("Prefer", "respond-async")], **headers),
    json = {
      "inputs": {
          "fn": "resize",
          "url":  "https://eoepca.org/media_portal/images/logo6_med.original.png",
          "size": "50%"
      }
    }
)
response.raise_for_status()
response.content

In [None]:
job_status_url = response.headers["Location"]
job_status_url

#### ADES: Job Status

**GET {service_url}/jobs/{job_id}**<br>
Checks the status of the previously submitted job - using the URL returned in the Location header of the execute request.<br>
The response body json provides a status string (success/running/failed) and a % progress.<br>
In the case of a failure then a descriptive message is provided.

In [None]:
# API Processes - Job Status
def get_job_status():
    return requests.get(job_status_url, headers=headers)

response = get_job_status()
response.raise_for_status()

print(json.dumps(response.json(), indent = 2))

In [None]:
# API Processes - Job Status (keep polling for completion)
status = "running"
interval = 10
error_count = 0
max_error_count = 5
while status == 'running':
    response = get_job_status()
    # Expecting a 200 response
    if response.status_code == 200:
        error_count = 0
        status = response.json()["status"]
        print(f"Job status is: {status}", end="")
        if status != 'successful' and status != 'failed':
            print(" - waiting...")
            sleep(interval)
        else:
            print(" - DONE")
            break
    # Unexpected response, latch the error
    else:
        error_count += 1
        print(f"Unexpected response {response.status_code} requesting job status")
        sleep(interval)
    # Too many consecutive errors
    if error_count > max_error_count:
        print("ERROR: Too many failed attempts to get job status")
        break

print(f"Processing completed with status: {status}")

In [None]:
print(f"status: {response.status_code}\nheaders: {json.dumps(dict(response.headers), indent=2)}\nbody...\n{json.dumps(response.json(), indent=2)}")

#### ADES: Job Result
**GET {service_url}/jobs/{job_id}/results**<br>
Returns details of the outputs for a successful job execution.<br>
The response body provides json data that includes the reference to the STAC file that indexes the processing outputs.

In [None]:
# API Processes - Job Result
response = requests.get(f"{job_status_url}/results", headers=headers)
response.raise_for_status()

print(json.dumps(response.json(), indent = 2))

From the S3 url let's retrieve the folder in the bucket where the result files are stored.

In [None]:
processing_results = response.json()
result_location = "Not found"
for link in processing_results["links"]:
    if link["rel"] == "root":
        result_location = link["href"]

print(f"Processing results root STAC Catalog: {result_location}")

result_folder_name = result_location.rsplit('/', 1)[0].rsplit(':', 1)[1].replace("//"+bucket_name+"/","")
print(f"Folder name for results in Object Storage: {result_folder_name}")

#### ADES: List Jobs

In [None]:
# API Processes - List Jobs
response = requests.get(f"{ades_proc_url}/jobs", headers=headers)
response.raise_for_status()

jobs_list = response.json()
print(f"Listing {jobs_list['numberTotal']} jobs...")
for job in jobs_list["jobs"]:
    print(f"  {job.get('finished', '???'):^24} - {job['jobID']} - {job['status']:<12} - {job['processID']}")

#### ADES: Undeploy Application
**DELETE {service_url}/processes/{application_name}**<br>
Undeploys application from the ADES

In [None]:
# API Processes - Undeploy Application
response = requests.delete(
    f"{ades_proc_url}/processes/{app_name}",
    headers=headers
)
response.raise_for_status()

In [None]:
# API Processes - List Processes
response = requests.get(f"{ades_proc_url}/processes", headers=headers)
response.raise_for_status()

process_list = response.json()
print("Processes...")
for process in process_list["processes"]:
    print("  ", process["id"])

#### Workspace: Inspect results
Let's inspect the result files in the user S3 bucket.

In [None]:
# Quick hack S3 access
import boto3

# Init S3 session for Creodias
S3_ENDPOINT = f"https://minio.{base_domain}"
session = boto3.session.Session()
s3resource = session.resource('s3', aws_access_key_id=s3_access, aws_secret_access_key=s3_secret, endpoint_url=S3_ENDPOINT)

# List bucket contents
bucket = s3resource.Bucket(bucket_name)
for obj in bucket.objects.filter(Prefix=result_folder_name):
    print(' ->', obj.key)