## ADES demonstration - OGC API Processes 

Deploy a CWL application package as a WPS processing service and submit an execution request

In [1]:
import requests
import os
import getpass
import json
import yaml
from time import sleep

## Discover a registered process

In [29]:
rm_endpoint = 'https://catalog.terradue.com/eoepca-services/search'

This is the identifier from the previous demonstration notebook:

In [None]:
application_package_id = 'd5d2d9af95cade45f7602d36ff07aa22'

Query the resource manager catalog to discover the application package:

In [51]:
payload = {'uid': application_package_id,
           'format': 'json'}

r = requests.get(rm_endpoint, 
                 params=payload)
                 
print(r.status_code, r.reason)

200 OK


In [52]:
response = r.json()

response

{'id': 'https://catalog.terradue.com:443/eoepca-services/search?uid=d5d2d9af95cade45f7602d36ff07aa22&format=json',
 'properties': {'links': [{'@href': 'https://catalog.terradue.com/eoepca-services/search?uid=d5d2d9af95cade45f7602d36ff07aa22&format=json',
    '@rel': 'self',
    '@title': 'Reference link',
    '@type': 'application/json'},
   {'@href': 'https://catalog.terradue.com/eoepca-services/description',
    '@rel': 'search',
    '@title': 'OpenSearch Description link',
    '@type': 'application/opensearchdescription+xml'}],
  'updated': '2020-10-21T16:32:30Z',
  'title': 'Result for OpenSearch query over type * in index eoepca-services',
  'debugRequest': None,
  'debugResponse': None,
  'totalResults': '1',
  'startIndex': '1',
  'itemsPerPage': '20',
  'Query': {'@role': 'request',
   '@geo:uid': 'd5d2d9af95cade45f7602d36ff07aa22'}},
 'features': [{'id': 'https://catalog.terradue.com:443/eoepca-services/search?format=json&uid=d5d2d9af95cade45f7602d36ff07aa22',
   'properties':

Here's the Application Package CWL:

In [53]:
application_package = yaml.safe_load(response['features'][0]['properties']['offering']['content']['#text'])

In [54]:
application_package

{'$graph': [{'baseCommand': 'vegetation-index',
   'class': 'CommandLineTool',
   'hints': {'DockerRequirement': {'dockerPull': 'terradue/eoepca-vegetation-index:0.1'}},
   'id': 'clt',
   'inputs': {'inp1': {'inputBinding': {'position': 1,
      'prefix': '--input-reference'},
     'type': 'Directory'},
    'inp2': {'inputBinding': {'position': 2, 'prefix': '--aoi'},
     'type': 'string'}},
   'outputs': {'results': {'outputBinding': {'glob': '.'}, 'type': 'Any'}},
   'requirements': {'EnvVarRequirement': {'envDef': {'PATH': '/opt/anaconda/envs/env_vi/bin:/opt/anaconda/envs/env_vi/bin:/opt/anaconda/envs/env_default/bin:/opt/anaconda/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin',
      'PREFIX': '/opt/anaconda/envs/env_vi'}},
    'ResourceRequirement': {}}},
  {'class': 'Workflow',
   'doc': 'Vegetation index processor',
   'id': 'vegetation-index2',
   'inputs': {'aoi': {'doc': 'Area of interest in WKT',
     'label': 'Area of interest',
     'type': 'string'},
  

## Deploy the process

In [55]:
endpoint = 'http://wps-eoepca-dev.terradue.com' 

In [8]:
token = getpass.getpass('Token:')

Token: ···························································································································································································································································································································································································································································································································································································································································································································································································································································································································


In [56]:
deploy_headers = {'Authorization': f'Bearer {token}',
                  'Content-Type': 'application/json',
                  'Accept': 'application/json', 
                  'Prefer': 'respond-async'} 

In [57]:
deploy_payload = {'inputs': [{'id': 'applicationPackage',
                              'input': {'format': {'mimeType': 'application/xml'},
                                        'value': {'href': f'https://catalog.terradue.com/eoepca-services/search?uid={application_package_id}'}}}],
                  'outputs': [{'format': {'mimeType': 'string',
                                          'schema': 'string',
                                          'encoding': 'string'},
                               'id': 'deployResult',
                               'transmissionMode': 'value'}],
                  'mode': 'auto',
                  'response': 'raw'}

In [58]:
r = requests.post(f'{endpoint}/wps3/processes',
                  json=deploy_payload,
                  headers=deploy_headers)

print(r.status_code, r.reason)

201 Created


## /processes

In [59]:
headers = {'Authorization': f'Bearer {token}',
           'Content-Type': 'application/json',
           'Accept': 'application/json'}

In [60]:
r = requests.get(f'{endpoint}/wps3/processes',
                 headers=headers)

print(r.status_code, r.reason)

200 OK


In [61]:
response = r.json()

response

{'processes': [{'id': 'eoepcaadesundeployprocess',
   'title': 'Eoepca Deploy Process',
   'abstract': 'This method will undeploy an application encapsulated within a Docker container as a process accessible through the WPS interface.',
   'version': '1.0.0',
   'jobControlOptions': ['sync-execute', 'async-execute', 'dismiss'],
   'outputTransmission': ['value', 'reference'],
   'links': [{'rel': 'canonical',
     'type': 'application/json',
     'title': 'Process Description',
     'href': '/watchjob/processes/eoepcaadesundeployprocess/'}]},
  {'id': 'eoepcaadesdeployprocess',
   'title': 'Eoepca Deploy Process',
   'abstract': 'This method will deploy an application encapsulated within a Docker container as a process accessible through the WPS interface.',
   'version': '1.0.0',
   'jobControlOptions': ['sync-execute', 'async-execute', 'dismiss'],
   'outputTransmission': ['value', 'reference'],
   'links': [{'rel': 'canonical',
     'type': 'application/json',
     'title': 'Process

In [62]:
for process in response['processes']:
    
    print('Process id: {}\n{} - {}\n'.format(process['id'], 
                                             process['title'], 
                                             process['abstract']))

Process id: eoepcaadesundeployprocess
Eoepca Deploy Process - This method will undeploy an application encapsulated within a Docker container as a process accessible through the WPS interface.

Process id: eoepcaadesdeployprocess
Eoepca Deploy Process - This method will deploy an application encapsulated within a Docker container as a process accessible through the WPS interface.

Process id: GetStatus
Produce an updated ExecuteResponse document.  - Create an ExecuteResponse document from a sid (Service ID), it will use the niternal ZOO Kernel mechanisms to access the current status from a running Service and update the percentCompleted from the original backup file used by the ZOO Kernel when running a Service in background. 

Process id: vegetation_index_
Vegetation index - Vegetation index processor

Process id: vegetation_index2_
Vegetation index - Vegetation index processor



## /processes/{id}

In [63]:
process_id = 'vegetation_index2_'

In [64]:
r = requests.get(f'{endpoint}/wps3/processes/{process_id}',
                 headers=headers)

print(r.status_code, r.reason)

200 OK


In [65]:
response = r.json()

response

{'process': {'id': 'vegetation_index2_',
  'title': 'Vegetation index',
  'abstract': 'Vegetation index processor',
  'jobControlOptions': ['sync-execute', 'async-execute', 'dismiss'],
  'outputTransmission': ['value', 'reference'],
  'links': [{'rel': 'canonical',
    'type': 'application/json',
    'title': 'Execute End Point',
    'href': '/watchjob/processes/vegetation_index2_/jobs/'}],
  'inputs': [{'id': 'aoi',
    'title': 'Area of interest',
    'abstract': 'Area of interest in WKT',
    'minOccurs': '1',
    'maxOccurs': '1',
    'input': {'literalDataDomains': [{'dataType': {'name': 'string'},
       'valueDefinition': {'anyValue': True}}]}},
   {'id': 'input_reference',
    'title': 'EO product for vegetation index',
    'abstract': 'EO product for vegetation index',
    'minOccurs': '1',
    'maxOccurs': '999',
    'input': {'formats': [{'default': True,
       'mimeType': 'application/opensearchdescription+xml'},
      {'default': False, 'mimeType': 'application/atom+xml'}

List the inputs

In [66]:
inputs = response['process']['inputs']

In [67]:
inputs

[{'id': 'aoi',
  'title': 'Area of interest',
  'abstract': 'Area of interest in WKT',
  'minOccurs': '1',
  'maxOccurs': '1',
  'input': {'literalDataDomains': [{'dataType': {'name': 'string'},
     'valueDefinition': {'anyValue': True}}]}},
 {'id': 'input_reference',
  'title': 'EO product for vegetation index',
  'abstract': 'EO product for vegetation index',
  'minOccurs': '1',
  'maxOccurs': '999',
  'input': {'formats': [{'default': True,
     'mimeType': 'application/opensearchdescription+xml'},
    {'default': False, 'mimeType': 'application/atom+xml'},
    {'default': False, 'mimeType': 'application/geo+json; profile=stac'}]}}]

## Execute - /processes/{id}/jobs

In [76]:
execute_request = {'inputs': [{'id': 'input_reference',
                               'input': {'dataType': {'name': 'string'},
                                         'value': 'https://catalog.terradue.com/sentinel2/search?uid=S2A_MSIL2A_20191216T004701_N0213_R102_T53HPA_20191216T024808'}},
                              {'id': 'aoi',
                               'input': {'dataType': {'name': 'string'},
                                         'value': 'POLYGON((136.112726861895 -36.227897298303,137.333826362695 -36.2103069464338,137.305145407058 -35.2211228310596,136.099040812374 -35.2380875358202,136.112726861895 -36.227897298303))'}}],
                   'outputs': [{'format': {'mimeType': 'string',
                                           'schema': 'string',
                                           'encoding': 'string'},
                                'id': 'wf_output',
                                'transmissionMode': 'value'}],
                   'mode': 'async',
                   'response': 'raw'}

In [77]:
execution_headers = {'Authorization': f'Bearer {token}',
                       'Content-Type': 'application/json',
                       'Accept': 'application/json', 
                       'Prefer': 'respond-async'}

In [78]:
process_id = 'vegetation_index_'

In [79]:
r = requests.post(f'{endpoint}/wps3/processes/{process_id}/jobs',
                  json=execute_request,
                  headers=execution_headers)

print(r.status_code, r.reason)

201 Created


In [80]:
job_location = r.headers['Location']

job_location

'/watchjob/processes/vegetation_index_/jobs/0528ea60-144f-11eb-b6f2-0a580a8105b1'

## Poll /processes/{id}/jobs/{job_id}

In [81]:
r = requests.get(f'{endpoint}{job_location}',
                 headers=headers)

print(r.status_code, r.reason)

200 OK


In [82]:
response = r.json()

response

{'jobID': '0528ea60-144f-11eb-b6f2-0a580a8105b1',
 'status': 'running',
 'message': 'running',
 'progress': '0',
 'links': [{'href': 'http://wps-eoepca-dev.terradue.com/t2dep//processes/vegetation_index_/jobs/0528ea60-144f-11eb-b6f2-0a580a8105b1',
   'rel': 'self',
   'type': 'application/json',
   'title': 'get Status'}]}

In [None]:

while response['status'] == 'running':

    r = requests.get(f'{endpoint}{job_location}',
                 headers=headers)
    
    response = r.json()
    
    if response['status'] == 'failed': 
    
        print(response)
        
        break
        
    if response['status'] == 'successful':  

        print(response['links'][0]['href'])
        
        break
    
    else:
        
        print('Polling - {}'.format(response['status']))
        
        sleep(30)
        

Polling - running
Polling - running
Polling - running
Polling - running
Polling - running
Polling - running
Polling - running
Polling - running
Polling - running
Polling - running
Polling - running
Polling - running
Polling - running
Polling - running
Polling - running
Polling - running
Polling - running
Polling - running
Polling - running
Polling - running
Polling - running
Polling - running
Polling - running
Polling - running
Polling - running
Polling - running
Polling - running
Polling - running
Polling - running
Polling - running


## Results - /processes/{id}/jobs/{job_id}/result

In [None]:
response

In [None]:
r = requests.get(f'{endpoint}/{job_location}/result',
                 headers=headers)

print(r.status_code, r.reason)

In [None]:
response = r.json()

response

In [None]:
results = json.loads(response['outputs'][0]['value']['inlineValue'])

results

## Inspect the results

Use the STAC catalog endpoint in the "Exploiting the result.ipynb" Notebook 

In [None]:
stac_catalog_endpoint = results['stac:catalog']['href']

stac_catalog_endpoint