In [1]:
import requests
import time
import os
import requests
import json

In [2]:
# Configuration
NAMESPACE = "ns1"
ARGO_SERVER_URL = f"http://localhost:2746/api/v1"

# this token is generated by the argo server with the following command:
# kubectl get -n ns1 secret argo.service-account-token -o=jsonpath='{.data.token}' | base64 --decode
AUTH_TOKEN = "eyJhbGciOiJSUzI1NiIsImtpZCI6ImNtR2dMYUdaLWk4MS1IbjkyNWs3ejhrMkl3VFdOZWZnRS12QzdfdkwtTGsifQ.eyJpc3MiOiJrdWJlcm5ldGVzL3NlcnZpY2VhY2NvdW50Iiwia3ViZXJuZXRlcy5pby9zZXJ2aWNlYWNjb3VudC9uYW1lc3BhY2UiOiJuczEiLCJrdWJlcm5ldGVzLmlvL3NlcnZpY2VhY2NvdW50L3NlY3JldC5uYW1lIjoiYXJnby5zZXJ2aWNlLWFjY291bnQtdG9rZW4iLCJrdWJlcm5ldGVzLmlvL3NlcnZpY2VhY2NvdW50L3NlcnZpY2UtYWNjb3VudC5uYW1lIjoiYXJnbyIsImt1YmVybmV0ZXMuaW8vc2VydmljZWFjY291bnQvc2VydmljZS1hY2NvdW50LnVpZCI6Ijg1NTllMzQ3LWJlYWYtNGQzNS1hYmM2LTk4OTg2NThlOGRiNSIsInN1YiI6InN5c3RlbTpzZXJ2aWNlYWNjb3VudDpuczE6YXJnbyJ9.Bxe3oPSK_pjjfanfeSskJ4QMFaVr6joDBbugBuDNS2KYzEFOkcaaUXaGmSXTqIsRPc1VWCrl9hKHrJL2I4mPsIA-AmxLcjh0N2I0Gvj_DRuqWhcAu1ENVT8Gtme5m7iZ6dyjS6_M-agfV9QmhADjowmW58gQwMlEVdXnAw_LISwFBUFX2_CNnw5sOFrpADVP7C7ksa8Q0IUHJebTpMTeDmTaJv-pBd2c7nbdLAoDnzKCD27LUcmGZJruieOJccLXztsHQfD8v3lz7KIBH2264cyHUH74AQIlR1WDWMDEu6xZCdHhmQ2fRT12GlyXZB7DsYhuw575qqtpdRs6Y2qa2w"


## List available "Processes"

In [3]:
# Headers for API request
headers = {
    "Authorization": f"Bearer {AUTH_TOKEN}",
}

print(f"API endpoint: GET {ARGO_SERVER_URL}/workflow-templates/{NAMESPACE}")

response = requests.get(f"{ARGO_SERVER_URL}/workflow-templates/{NAMESPACE}", headers=headers)

print(response.status_code)


API endpoint: GET http://localhost:2746/api/v1/workflow-templates/ns1
200


Get information about the available "Processes"

In [4]:
response.json().get("items", {})

for item in response.json().get("items", {}):
    print(item.get("metadata", {}).get("name", {}))
    print(item.get("metadata", {}).get("annotations", {}).get("workflows.argoproj.io/description", {}))
    print(item.get("metadata", {}).get("annotations", {}).get("eoap.ogc.org/version", "N/A"))
    print("---")

argo-cwl-runner
This workflow template is a CWL runner for Argo Workflows.

N/A
---
water-bodies-detection
This workflow template detects water bodies.

1.0.0
---


## Describe "Process"

In [5]:
id = "water-bodies-detection"

# Headers for API request
headers = {
    "Authorization": f"Bearer {AUTH_TOKEN}",
}

print(f"API endpoint: GET {ARGO_SERVER_URL}/workflow-templates/{NAMESPACE}/{id}")

response = requests.get(f"{ARGO_SERVER_URL}/workflow-templates/{NAMESPACE}/{id}", headers=headers)

print(response.status_code)

API endpoint: GET http://localhost:2746/api/v1/workflow-templates/ns1/water-bodies-detection
200


"Processes" have an entrypoint:

In [6]:
entry_point = response.json().get("spec", {}).get("entrypoint", None)

entry_point

'main'

Get the "Process" entry point template:

In [7]:
[template for template in response.json().get("spec", {}).get("templates", {}) if template["name"] == entry_point]

[{'name': 'main',
  'inputs': {'parameters': [{'name': 'items',
     'description': 'JSON array of STAC items to process'},
    {'name': 'aoi',
     'description': "Area of interest expressed as a bounding box 'minx,miny,maxx,maxy'"},
    {'name': 'epsg',
     'default': 'EPSG:4326',
     'description': 'EPSG code of the bounding box coordinates'}]},
  'outputs': {'parameters': [{'name': 'results',
     'valueFrom': {'expression': "steps['argo-cwl'].outputs.parameters['results']"},
     'description': 'JSON array results produced by the CWL execution'},
    {'name': 'log',
     'valueFrom': {'expression': "steps['argo-cwl'].outputs.parameters['log']"},
     'description': 'Log produced by the CWL execution'},
    {'name': 'usage-report',
     'valueFrom': {'expression': "steps['argo-cwl'].outputs.parameters['usage-report']"},
     'description': 'Usage report produced by the CWL execution'},
    {'name': 'stac-catalog',
     'valueFrom': {'expression': "steps['argo-cwl'].outputs.parame

In [8]:
main = [template for template in response.json().get("spec", {}).get("templates", {}) if template["name"] == entry_point][0]

Get the "Process" inputs

In [9]:
for input_parameter in main["inputs"]["parameters"]:
    print(f"id: {input_parameter['name']}")
    print(f"description: {input_parameter.get('description', 'N/A')}")
    print(f"default value: {input_parameter.get('default', 'N/A')}")
    print("---")

id: items
description: JSON array of STAC items to process
default value: N/A
---
id: aoi
description: Area of interest expressed as a bounding box 'minx,miny,maxx,maxy'
default value: N/A
---
id: epsg
description: EPSG code of the bounding box coordinates
default value: EPSG:4326
---


Get the "Process" outputs

In [10]:
for output_parameter in main["outputs"]["parameters"]:
    print(f"id: {output_parameter['name']}")
    print(f"description: {output_parameter.get('description', 'N/A')}")
    print("---")

id: results
description: JSON array results produced by the CWL execution
---
id: log
description: Log produced by the CWL execution
---
id: usage-report
description: Usage report produced by the CWL execution
---
id: stac-catalog
description: STAC catalog with the detected water bodies and published on S3
---
id: feature-collection
description: Feature collection with the detected water bodies
---


Get the "Process" artifacts

In [11]:
for artifact in main["outputs"]["artifacts"]:
    print(artifact)
    print(f"id: {artifact['name']}")
    print("---")

{'name': 'tool-logs', 'fromExpression': "steps['argo-cwl'].outputs.artifacts['tool-logs']"}
id: tool-logs
---
{'name': 'calrissian-output', 'fromExpression': "steps['argo-cwl'].outputs.artifacts['calrissian-output']"}
id: calrissian-output
---
{'name': 'calrissian-stderr', 'fromExpression': "steps['argo-cwl'].outputs.artifacts['calrissian-stderr']"}
id: calrissian-stderr
---
{'name': 'calrissian-report', 'fromExpression': "steps['argo-cwl'].outputs.artifacts['calrissian-report']"}
id: calrissian-report
---


## "Process" execution submission

Create the payload

In [12]:
parameters = [{"items":["https://earth-search.aws.element84.com/v0/collections/sentinel-s2-l2a-cogs/items/S2A_10TFK_20210708_0_L2A"]},
      {"aoi": "-121.399,39.834,-120.74,40.472"},
      {"epsg": "EPSG:4326"}]

Convert the parameters to a list of strings

In [13]:
payload_parameters = []
for param in parameters:
    for key, value in param.items():
        if isinstance(value, list):
            # Join the list items with double quotes and format with brackets
            items_str = '[' + ','.join(f'"{item}"' for item in value) + ']'
            payload_parameters.append(f"{key}={items_str}")
        else:
            payload_parameters.append(f"{key}={value}")

payload_parameters

['items=["https://earth-search.aws.element84.com/v0/collections/sentinel-s2-l2a-cogs/items/S2A_10TFK_20210708_0_L2A"]',
 'aoi=-121.399,39.834,-120.74,40.472',
 'epsg=EPSG:4326']

Execution payload:

In [14]:
# Payload with parameters
payload = {
  "resourceKind": "WorkflowTemplate",
  "resourceName": id,
  "submitOptions": {
    "parameters": payload_parameters
  }
}

payload

{'resourceKind': 'WorkflowTemplate',
 'resourceName': 'water-bodies-detection',
 'submitOptions': {'parameters': ['items=["https://earth-search.aws.element84.com/v0/collections/sentinel-s2-l2a-cogs/items/S2A_10TFK_20210708_0_L2A"]',
   'aoi=-121.399,39.834,-120.74,40.472',
   'epsg=EPSG:4326']}}

In [15]:
# Headers for API request
headers = {
    "Authorization": f"Bearer {AUTH_TOKEN}",
    "Content-Type": "application/json"
}

print(f"API endpoint: POST {ARGO_SERVER_URL}/workflows/{NAMESPACE}/submit")


# Submit the workflow and handle the response
response = requests.post(
    f"{ARGO_SERVER_URL}/workflows/{NAMESPACE}/submit",
    headers=headers,
    json=payload,
    verify=False  # Use verify=True if you have valid SSL certificates
)

response.status_code

API endpoint: POST http://localhost:2746/api/v1/workflows/ns1/submit


200

In [16]:

if response.status_code == 200 or response.status_code == 201:
    # Decode JSON response
    workflow_info = response.json()
    workflow_name = workflow_info["metadata"]["name"]
    print(f"Workflow submitted successfully with name: {workflow_name}")
else:
    print("Failed to submit workflow:")
    print(response.text)


Workflow submitted successfully with name: water-bodies-detection-82n94


## List "Jobs"

In [17]:
print(f"API endpoint: GET {ARGO_SERVER_URL}/workflows/{NAMESPACE}")

response = requests.get(f"{ARGO_SERVER_URL}/workflows/{NAMESPACE}", headers=headers)

API endpoint: GET http://localhost:2746/api/v1/workflows/ns1


In [18]:
for item in response.json().get("items", {}):
    print(f"name: {item.get('metadata', {}).get('name', {})}")
    print(f"submission date: {item.get('metadata', {}).get('creationTimestamp', {})}")
    print(f"uid: {item.get('metadata', {}).get('uid', {})}")
    print(f"submission date: {item.get('metadata', {}).get('creationTimestamp', {})}")
    print(f"job type: {item.get('spec', {}).get('workflowTemplateRef', {}).get('name', {})}") 
    print(f"status: {item.get('status', {}).get('phase', {})}")  
    print(f"started at: {item.get('status', {}).get('startedAt', {})}") 
    print(f"finished at: {item.get('status', {}).get('finishedAt', {})}")  
    print(f"progress: {item.get('status', {}).get('progress', {})}")      
    print("---")

name: water-bodies-detection-82n94
submission date: 2024-11-07T11:14:43Z
uid: 7c915cc5-5adf-4ec0-92b6-9c8ae10eb093
submission date: 2024-11-07T11:14:43Z
job type: water-bodies-detection
status: {}
started at: None
finished at: None
progress: {}
---
name: water-bodies-detection-hrvcq
submission date: 2024-11-07T11:04:33Z
uid: c2d6730e-4431-4b9a-aa85-f54adbc0dbe8
submission date: 2024-11-07T11:04:33Z
job type: water-bodies-detection
status: Failed
started at: 2024-11-07T11:04:33Z
finished at: 2024-11-07T11:06:20Z
progress: 6/7
---
name: water-bodies-detection-vfntj
submission date: 2024-11-07T10:46:32Z
uid: 8446017b-f2a2-447e-ab1f-aaec0a3d8cb6
submission date: 2024-11-07T10:46:32Z
job type: water-bodies-detection
status: Failed
started at: 2024-11-07T10:46:32Z
finished at: 2024-11-07T10:48:25Z
progress: 6/7
---
name: water-bodies-detection-wpptm
submission date: 2024-11-07T08:01:58Z
uid: 97e5af4a-1a82-455c-85cd-a8c1f7a010b0
submission date: 2024-11-07T08:01:58Z
job type: water-bodies-det

The element `status.nodes` provides detailed information about the workflow nodes

In [19]:
job_name = workflow_info.get("metadata", {}).get("name", {}) 

job_name


'water-bodies-detection-82n94'

## "Job" monitoring

In [20]:

print(f"API endpoint: GET {ARGO_SERVER_URL}/workflows/{NAMESPACE}/{workflow_name}")

# Headers for API request
headers = {
    "Authorization": f"Bearer {AUTH_TOKEN}",
    "Content-Type": "application/json"
}

def get_workflow_status(workflow_name):
    """Fetches the current status of the workflow."""
    response = requests.get(
        f"{ARGO_SERVER_URL}/workflows/{NAMESPACE}/{workflow_name}",
        headers=headers,
        verify=False  # Use verify=True with valid SSL certificates
    )

    if response.status_code == 200:
        workflow_info = response.json()
        status = workflow_info.get("status", {}).get("phase", "Unknown")
        return status, workflow_info
    else:
        print(f"Failed to retrieve workflow status: {response.status_code}")
        return None

# Monitor the workflow status until it reaches a terminal state
while True:
    status, workflow_status = get_workflow_status(job_name)
    
    if status:
        print(f"Workflow Status: {status}")

        # Check if the workflow has completed
        if status in ["Succeeded", "Failed", "Error"]:
            print(f"Workflow has completed with status: {status}")
            break
    else:
        print("Unable to retrieve workflow status. Exiting monitoring.")
        break

    # Wait for 10 seconds before the next check
    time.sleep(10)

API endpoint: GET http://localhost:2746/api/v1/workflows/ns1/water-bodies-detection-82n94
Workflow Status: Running
Workflow Status: Running
Workflow Status: Running
Workflow Status: Running
Workflow Status: Running
Workflow Status: Running
Workflow Status: Running
Workflow Status: Running
Workflow Status: Running
Workflow Status: Running
Workflow Status: Running
Workflow Status: Running
Workflow Status: Succeeded
Workflow has completed with status: Succeeded


In [21]:
print(f"name: {workflow_status.get('metadata', {}).get('name', {})}")
print(f"submission date: {workflow_status.get('metadata', {}).get('creationTimestamp', {})}")
print(f"uid: {workflow_status.get('metadata', {}).get('uid', {})}")
print(f"submission date: {workflow_status.get('metadata', {}).get('creationTimestamp', {})}")
print(f"job type: {workflow_status.get('spec', {}).get('workflowTemplateRef', {}).get('name', {})}") 
print(f"status: {workflow_status.get('status', {}).get('phase', {})}")  
print(f"started at: {workflow_status.get('status', {}).get('startedAt', {})}") 
print(f"finished at: {workflow_status.get('status', {}).get('finishedAt', {})}")  
print(f"progress: {workflow_status.get('status', {}).get('progress', {})}")      
print("---")

name: water-bodies-detection-82n94
submission date: 2024-11-07T11:14:43Z
uid: 7c915cc5-5adf-4ec0-92b6-9c8ae10eb093
submission date: 2024-11-07T11:14:43Z
job type: water-bodies-detection
status: Succeeded
started at: 2024-11-07T11:14:43Z
finished at: 2024-11-07T11:16:39Z
progress: 7/7
---


## Get the "job" outputs: parameters and artifacts

List the "job" outputs parameters

In [22]:
for output_parameter in workflow_status.get("status").get("nodes").get(job_name).get("outputs").get("parameters"):
    print(f"id: {output_parameter['name']}")
    print(f"description: {output_parameter.get('description', 'N/A')}")
    print(f"value: {output_parameter.get('value', 'N/A')}")
    print("---")


id: results
description: JSON array results produced by the CWL execution
value: {
    "stac_catalog": {
        "location": "file:///calrissian/results/6us9l4pd",
        "basename": "6us9l4pd",
        "class": "Directory",
        "listing": [
            {
                "class": "File",
                "location": "file:///calrissian/results/6us9l4pd/catalog.json",
                "basename": "catalog.json",
                "checksum": "sha1$f1994683c2e2b8d21aab7f0c0c05b76b850ac21e",
                "size": 359,
                "path": "/calrissian/results/6us9l4pd/catalog.json"
            },
            {
                "class": "Directory",
                "location": "file:///calrissian/results/6us9l4pd/S2A_10TFK_20210708_0_L2A",
                "basename": "S2A_10TFK_20210708_0_L2A",
                "listing": [
                    {
                        "class": "File",
                        "location": "file:///calrissian/results/6us9l4pd/S2A_10TFK_20210708_0_L2A/S2A

List the "job" outputs artifacts

In [23]:
for artifact in workflow_status.get("status").get("nodes").get(job_name).get("outputs").get("artifacts"):
    print(f"id: {artifact['name']}")
    print(f"location in artifact bucket: {artifact.get('s3').get('key')}")
    print("---")

id: tool-logs
location in artifact bucket: water-bodies-detection-82n94-7c915cc5-5adf-4ec0-92b6-9c8ae10eb093-artifacts/tool-logs.tgz
---
id: calrissian-output
location in artifact bucket: water-bodies-detection-82n94-7c915cc5-5adf-4ec0-92b6-9c8ae10eb093-artifacts/calrissian-output.tgz
---
id: calrissian-stderr
location in artifact bucket: water-bodies-detection-82n94-7c915cc5-5adf-4ec0-92b6-9c8ae10eb093-artifacts/calrissian-stderr.tgz
---
id: calrissian-report
location in artifact bucket: water-bodies-detection-82n94-7c915cc5-5adf-4ec0-92b6-9c8ae10eb093-artifacts/calrissian-report.tgz
---


In [24]:
workflow_status.get("status").get("nodes").get(job_name).get("outputs").get("artifacts")

[{'name': 'tool-logs',
  'path': '/calrissian/logs',
  's3': {'key': 'water-bodies-detection-82n94-7c915cc5-5adf-4ec0-92b6-9c8ae10eb093-artifacts/tool-logs.tgz'}},
 {'name': 'calrissian-output',
  'path': '/tmp/calrissian-output.json',
  's3': {'key': 'water-bodies-detection-82n94-7c915cc5-5adf-4ec0-92b6-9c8ae10eb093-artifacts/calrissian-output.tgz'}},
 {'name': 'calrissian-stderr',
  'path': '/tmp/calrissian-stderr.txt',
  's3': {'key': 'water-bodies-detection-82n94-7c915cc5-5adf-4ec0-92b6-9c8ae10eb093-artifacts/calrissian-stderr.tgz'}},
 {'name': 'calrissian-report',
  'path': '/tmp/calrissian-report.json',
  's3': {'key': 'water-bodies-detection-82n94-7c915cc5-5adf-4ec0-92b6-9c8ae10eb093-artifacts/calrissian-report.tgz'}}]

## Download the "job" artifacts

In [25]:
[artifact.get("name") for artifact in workflow_status.get("status").get("nodes").get(job_name).get("outputs").get("artifacts")]

['tool-logs', 'calrissian-output', 'calrissian-stderr', 'calrissian-report']

In [26]:
# Configuration

id_discriminator = "workflows"
id = job_name
node_id = job_name
ARTIFACT_BASE_URL = f"http://localhost:2746/artifact-files/{NAMESPACE}/{id_discriminator}/{id}/{node_id}/outputs" #/tool-logs"


# Headers for authentication
headers = {
    "Authorization": f"Bearer {AUTH_TOKEN}"
}

# Download the artifacts
for artifact in workflow_status.get("status").get("nodes").get(job_name).get("outputs").get("artifacts"):
    
    print(f"API endpoint: GET {ARTIFACT_BASE_URL}/{artifact.get('name')}")
    response = requests.get(f"{ARTIFACT_BASE_URL}/{artifact.get('name')}", headers=headers, stream=True)

    output_file = f"{artifact.get('name')}.tar.gz"  
    if response.status_code == 200:
        with open(output_file, "wb") as file:
            for chunk in response.iter_content(chunk_size=8192):
                file.write(chunk)
        print(f"Artifact downloaded successfully and saved as {output_file}")
    else:
        print(f"Failed to download artifact. Status code: {response.status_code}")
        print(response.text)

API endpoint: GET http://localhost:2746/artifact-files/ns1/workflows/water-bodies-detection-82n94/water-bodies-detection-82n94/outputs/tool-logs
Artifact downloaded successfully and saved as tool-logs.tar.gz
API endpoint: GET http://localhost:2746/artifact-files/ns1/workflows/water-bodies-detection-82n94/water-bodies-detection-82n94/outputs/calrissian-output
Artifact downloaded successfully and saved as calrissian-output.tar.gz
API endpoint: GET http://localhost:2746/artifact-files/ns1/workflows/water-bodies-detection-82n94/water-bodies-detection-82n94/outputs/calrissian-stderr
Artifact downloaded successfully and saved as calrissian-stderr.tar.gz
API endpoint: GET http://localhost:2746/artifact-files/ns1/workflows/water-bodies-detection-82n94/water-bodies-detection-82n94/outputs/calrissian-report
Artifact downloaded successfully and saved as calrissian-report.tar.gz


## Delete "Job"

In [27]:
print(f"API endpoint: DELETE {ARGO_SERVER_URL}/workflows/{NAMESPACE}/{job_name}S")

response = requests.delete(f"{ARGO_SERVER_URL}/workflows/{NAMESPACE}/{job_name}", headers=headers)

print(response.status_code)

API endpoint: DELETE http://localhost:2746/api/v1/workflows/ns1/water-bodies-detection-82n94S
200


Check the "job" no longer exists:

In [28]:
response = requests.get(f"{ARGO_SERVER_URL}/workflows/{NAMESPACE}/{job_name}", headers=headers)

response.status_code

404