## ECOPOTENTIAL-00003 Data Pipeline Coodinator Queue - NDVI Curonian Lagoon

This coordinator feeds the ecopotential-00003 data pipeline queue for the Curonian Lagoon protected area

### Data pipeline parameters

In [1]:
data_pipeline = 'ecopotential-00003'

In [2]:
wps_url_triggers = 'https://ec-ecopotential-triggers-deployer.terradue.com/zoo/'
wps_url_apps = 'https://ec-ecopotential-apps-deployer.terradue.com/zoo/'

### Data transformation application parameters

* Application: dcs-s2-ndvi-boa
* Application repository: https://github.com/ec-ecopotential/dcs-s2-ndvi-boa
* Build (CI): https://build.terradue.com/job/communities/job/Github%20Organizations/job/ec-ecopotential/job/dcs-s2-ndviboa/

In [3]:
process_id = 'ec_ecopotential_dcs_s2_ndvi_boa_ndvi_boa_1_9'

In [4]:
pa_code = 'CL'
pa_name = 'Curonian Lagoon'
resolution = '10'
plot = 'False'
flag_expr = 'saturated_l1a_B4'

### Data selection parameters

In [5]:
series = 'https://catalog.terradue.com/sentinel2/description'

In [6]:
product_type = 'S2MSI2Ap'

In [7]:
cloud_coverage = '100['

In [8]:
wkt = 'POLYGON((21.056 55.722,21.094 55.722,21.083 55.73,21.128 55.707,21.123 55.706,21.165 55.647,21.164 55.643,21.146 55.661,21.143 55.661,21.152 55.639,21.187 55.582,21.238 55.596,21.248 55.569,21.281 55.559,21.243 55.537,21.23 55.537,21.234 55.53,21.223 55.524,21.245 55.493,21.315 55.497,21.341 55.49,21.316 55.473,21.298 55.472,21.283 55.461,21.261 55.424,21.248 55.425,21.247 55.412,21.271 55.403,21.299 55.425,21.375 55.412,21.399 55.423,21.43 55.424,21.425 55.415,21.436 55.398,21.461 55.397,21.459 55.389,21.423 55.352,21.439 55.329,21.456 55.325,21.441 55.317,21.469 55.29,21.489 55.291,21.489 55.284,21.469 55.274,21.495 55.262,21.537 55.26,21.563 55.238,21.594 55.237,21.575 55.233,21.59 55.225,21.587 55.219,21.576 55.21,21.579 55.203,21.616 55.19,21.608 55.19,21.646 55.183,21.593 55.187,21.566 55.197,21.551 55.181,21.592 55.149,21.618 55.085,21.528 55.07,21.372 55.067,21.292 55.022,21.248 55.018,21.22 55.028,21.21 55.009,21.242 54.978,21.249 54.958,21.242 54.943,21.21 54.92,21.152 54.915,21.101 54.906,21.085 54.891,21.059 54.901,21.029 54.892,21.018 54.9,20.996 54.891,20.89 54.907,20.866 54.901,20.79 54.947,20.692 54.931,20.619 54.94,20.574 54.935,20.548 54.942,20.532 54.956,20.529 54.951,20.52 54.956,20.497 54.946,20.503 54.951,20.528 54.959,20.532 54.968,20.523 54.975,20.497 54.969,20.578 55,20.68 55.059,20.952 55.281,20.917 55.287,20.977 55.364,21.042 55.486,21.055 55.525,21.056 55.722))'

In [9]:
from shapely.wkt import loads 

In [10]:
geom = loads(wkt).buffer(0.2).simplify(0.01, preserve_topology=False).to_wkt().replace(',', '%2C').replace(' ', '%20')

### User Mapping

In [11]:
api_key = 'AKCp5aTSAokLQTzhVH52Q8tcfobtaVQtJ3xPiGgB8AdxSPvDpUXU1DW2uYby4EjJcEiJtaoPu'

### Coordinator parameters

In [12]:
start_queue = '${coord:formatTime(coord:dateOffset(coord:nominalTime(), -10, \'DAY\'), "yyyy-MM-dd\'T\'HH:mm:ss\'Z\'")}'
end_queue = '${coord:formatTime(coord:dateOffset(coord:nominalTime(), -9, \'DAY\'), "yyyy-MM-dd\'T\'HH:mm:ss\'Z\'")}'

In [13]:
co_trigger_queue_process_id = 'coordinator_ec_ecopotential_tg_ecopotential_00003_queue_tg_ecopotential_00003_1_12'

In [14]:
coordinator_date_start = '2017-01-01T00:00Z'
coordinator_date_stop = '2018-12-31T00:00Z'
coordinator_period = '0 0 * * *'

### Imports

In [15]:
import owslib
from owslib.wps import monitorExecution
from owslib.wps import WebProcessingService
import sys
import os
sys.path.append('/opt/anaconda/bin/')
import lxml.etree as etree
import requests
import shapely
import dateutil.parser

from datetime import datetime, timedelta
from io import BytesIO
import json

### Common Parameters

In [16]:
tg_quotation = 'No'
recovery = 'No'
_T2Username = data_pipeline

### Check data transformation application

In [17]:
wps = WebProcessingService(wps_url_apps, verbose=False, skip_caps=False)

In [18]:
found_process = False

message = "The process %s is not deployed" % process_id  
    
for index, elem in enumerate(wps.processes):

    if elem.identifier == process_id:
        message = "The process %s is deployed" % process_id  
        found_process = True
        
print message

if not found_process:
    raise Exception()

The process ec_ecopotential_dcs_s2_ndvi_boa_ndvi_boa_1_9 is deployed


In [19]:
process = wps.describeprocess(process_id)

In [20]:
for input in process.dataInputs:
    print(input.identifier)

source
pa_code
pa_name
resolution
plot
flag_expr
_T2Username


### Check trigger coordinator

In [21]:
wps = WebProcessingService(wps_url_triggers, verbose=False, skip_caps=False)

In [22]:
found_process = False

message = "The queue coordinator process %s is not deployed" % co_trigger_queue_process_id  
    
for index, elem in enumerate(wps.processes):

    if elem.identifier == co_trigger_queue_process_id:
        message = "The queue coordinator process %s is deployed" % co_trigger_queue_process_id  
        found_process = True
        
print message

if not found_process:
    raise Exception()

The queue coordinator process coordinator_ec_ecopotential_tg_ecopotential_00003_queue_tg_ecopotential_00003_1_12 is deployed


### Feed the queue

In [23]:
process = wps.describeprocess(co_trigger_queue_process_id)

In [24]:
process.title

'Trigger for ecopotential-00003 Coordinator'

In [25]:
process.abstract

'Coordinator: Trigger for ecopotential-00003'

In [26]:
for input in process.dataInputs:
    print(input.identifier)

series
data_pipeline
wps_url
process_id
update
geom
product_type
cloud_coverage
tg_quotation
api_key
recovery
mode
pa_code
pa_name
resolution
plot
flag_expr
t2_coordinator_date_start
t2_coordinator_date_stop
t2_coordinator_period
_T2Username


##### Define the input parameters


In [27]:
mode = 'Queue'

In [28]:
inputs = [('series', series),
        ('data_pipeline', data_pipeline),
        ('wps_url', wps_url_apps),
        ('process_id', process_id),
        ('update', '%s/%s' % (start_queue, end_queue)),
        ('geom', geom),
        ('product_type', product_type),
        ('cloud_coverage', cloud_coverage),
        ('tg_quotation', tg_quotation),
        ('api_key', api_key),
        ('recovery', recovery),
        ('mode', mode),
        ('pa_code', pa_code),
        ('pa_name', pa_name),
        ('resolution', resolution),
        ('plot', plot),
        ('flag_expr', flag_expr),
        ('t2_coordinator_date_start', coordinator_date_start), 
        ('t2_coordinator_date_stop', coordinator_date_stop),
        ('t2_coordinator_period', coordinator_period),
        ('_T2Username', data_pipeline)]

##### Identify the outputs

In [29]:
for output in process.processOutputs:
    print(output.identifier)

coordinatorIds


##### Submit the coordinator request

In [30]:
execution = owslib.wps.WPSExecution(url=wps.url)

In [31]:
execution_request = execution.buildRequest(co_trigger_queue_process_id, 
                                           inputs, 
                                           output = [('coordinatorIds', False)])

In [32]:
execution_response = execution.submitRequest(etree.tostring(execution_request, pretty_print=True))

In [33]:
if False:
    print(etree.tostring(execution_request, pretty_print=True))

In [34]:
execution.parseResponse(execution_response)

In [35]:
if False:
    print(etree.tostring(execution_response, pretty_print=True))

In [36]:
execution.statusLocation

'http://ec-ecopotential-triggers-deployer.terradue.com/zoo-bin/zoo_loader.cgi?request=Execute&service=WPS&version=1.0.0&Identifier=GetStatus&DataInputs=sid=2ea0f754-6e28-11e8-9cf4-0242ac110005&RawDataOutput=Result'

In [37]:
monitorExecution(execution)

In [38]:
execution.isSucceded()

True

In [39]:
for output in execution.processOutputs:
    print(output.identifier)

coordinatorIds
result_osd
QUOTATION


In [40]:
coordinator_id = str(json.loads(execution.processOutputs[0].data[0])['coordinatorsId'][0]['oozieId'])

In [41]:
coordinator_id

'0002146-180330140554685-oozie-oozi-C'