## SatCen GEO-DAMP bridge

In [1]:
from owslib.etree import etree
import json
import geopandas as gp
import requests
import pandas as pd
import owslib
from owslib.wps import monitorExecution
from owslib.wps import WebProcessingService
import sys
import os
import time
from requests.auth import HTTPBasicAuth
import cioppy
from urlparse import urlparse
pd.set_option('display.max_colwidth',120)

### Parameters

The **data_pipeline** parameter is the catalog index where the data items and their associated results are posted

In [2]:
data_pipeline = dict([('id', 'data_pipeline', ),
              ('title', 'Ellip data pipeline'),
              ('abstract', 'Ellip data pipeline'),
              ('value', 'nextgeoss-satcen-003')])

The **wps_url** parameter is the WPS end-point of the service to be invoked by the data item

In [3]:
wps_url = dict([('id', 'wps_url'),
              ('title', 'Application WPS end point URL'),
              ('abstract', 'Application WPS end point URL'),
              ('value', 'https://ec-nextgeoss-apps-deployer-hetzner.terradue.com/zoo/?')])

The **wps_url_queue** parameter is the WPS end-point of the generic service the generates, posts and eventually pipes the data items 

In [4]:
wps_url_queue = dict([('id', 'wps_url_queue'),
                      ('value', 'https://ec-better-triggers-deployer.terradue.com/zoo/?')])

The **api_key** parameter is the Terradue API key associated to the **data_pipeline** user

In [5]:
api_key = dict([('id', 'api_key'),
              ('title', 'Ellip API key for data pipeline'),
              ('abstract', 'Ellip API key for data pipeline'),
              ('value', 'AKCp5dKENhKvxfYsnrXuVeBwWbF4DuEtFoF8Paykhtpcy4un3cdvHB8rMmJrF1sSDSDovdLcs')])

The **publication_categories** parameter are an optional parameter allowing results associated to a data item to be tagged (category element) 

In [6]:
publication_categories = dict([('id', 'publication_categories'),
                    ('title', 'Publication categories'),
                    ('abstract', 'Publication categories (key=value)'),
                    ('value', 'volcano_id=211060,wvar=id0003')])

### A few functions 

In [7]:
def shorten_identifier(row):
    
    return ('_').join(row['identifier'].split('_')[0:-len(row['version'].split('.'))])

In [8]:
def get_parameter_string(wps_url, process_id):
    
    wps = WebProcessingService(wps_url, verbose=False, skip_caps=True)
    
    process = wps.describeprocess(process_id)

    data_inputs = []

    for data_input in process.dataInputs:

        if data_input.identifier == '_T2Username':
            continue
        
        if (data_input.minOccurs != 0) or (data_input.identifier == 't2_coordinator_name') or (data_input.identifier == '_T2Username'):
            if data_input.defaultValue is None:
                data_inputs.append('{}={}'.format(data_input.identifier, ''))
            else:
                data_inputs.append('{}={}'.format(data_input.identifier, data_input.defaultValue.replace(',', '|')))

    return ','.join(data_inputs)

### Check app service

In [9]:
wps = WebProcessingService(wps_url['value'], verbose=False, skip_caps=False)



In [10]:
app_process = dict()
temp_results = []

for index, process in enumerate(wps.processes):
    
    if process.identifier == 'ec_nextgeoss_ewf_satcen_s1_change_detection_ewf_satcen_s1_change_detection_0_5':    
        temp_results.append([process.identifier, get_parameter_string(wps_url['value'], process.identifier), 
                        process.processVersion])
    
app_process = gp.GeoDataFrame(temp_results, columns=['identifier', 'wps_params', 'version']).sort_values(by='identifier')
                                                                                                          
app_process['short_identifier'] = app_process.apply(lambda row: shorten_identifier(row), axis=1)                                                                                                            



In [11]:
app_process

Unnamed: 0,identifier,wps_params,version,short_identifier
0,ec_nextgeoss_ewf_satcen_s1_change_detection_ewf_satcen_s1_change_detection_0_5,source=https://catalog.terradue.com/sentinel1/search?format=atom&uid=S1B_S3_GRDH_1SDV_20180924T173034_20180924T17304...,0.5,ec_nextgeoss_ewf_satcen_s1_change_detection_ewf_satcen_s1_change_detection


### Check queue-up service

In [12]:
wps_queue = WebProcessingService(wps_url_queue['value'], verbose=False, skip_caps=False)



In [13]:
queue_process_id = 'ec_better_tg_queue_up_tg_queue_up_0_3'

In [14]:
queue_process = dict()
temp_results = []

for index, process in enumerate(wps_queue.processes):
   
    if process.identifier == queue_process_id:
        temp_results.append([process.identifier, get_parameter_string(wps_url_queue['value'], process.identifier), 
                        process.processVersion])
        
queue_process = gp.GeoDataFrame(temp_results, columns=['identifier', 'wps_params', 'version']).sort_values(by='identifier')

queue_process['short_identifier'] = queue_process.apply(lambda row: shorten_identifier(row), axis=1)



In [15]:
queue_process

Unnamed: 0,identifier,wps_params,version,short_identifier
0,ec_better_tg_queue_up_tg_queue_up_0_3,"source=dummy,data_pipeline=fbrito,wps_url=https://ec-better-apps-deployer.terradue.com/zoo/?,api_key=AKCp5bBrPgXEzo1...",0.3,ec_better_tg_queue_up_tg_queue_up


### List the processing parameters

In [16]:
processing_parameters = dict()

for item in queue_process.iloc[0].wps_params.split(','):
    
    processing_parameters[item.split('=')[0]] = '='.join(item.split('=')[1:]).replace('|', ',')

In [17]:
processing_parameters

{'api_key': 'AKCp5bBrPgXEzo1NNPf4VzXjTieG8Eup49ZGKHUyZMV3pqBNKcvGCiRdYhTVBQxa4bvY7qo5S',
 'data_pipeline': 'fbrito',
 'execution_mode': 'synchronous',
 'process_id': 'ec_better_ewf_sen2cor_ewf_sen2cor_0_12',
 'publication_categories': 'volcano_id=211060,wvar=id0003',
 'source': 'dummy',
 'wps_params': 'source=https://catalog.terradue.com:443/sentinel2/search?uid=S2B_MSIL1C_20190712T162839_N0208_R083_T17ULU_20190712T194612,https://catalog.terradue.com/sentinel2/search?uid=S2A_MSIL1C_20180723T041551_N0206_R090_T46QEF_20180723T075132,resolution=all,dem=Yes,aerosol_type=RURAL,mid_latitude=SUMMER,ozone_content=331,wv_correction=1,vis_update_mode=1,wv_watermask=1,cirrus_correction=FALSE,brdf_correction=0,brdf_lower_bound=0.22,dem_unit=0,adj_km=1.000,visibility=40.0,altitude=0.100,smooth_wv_map=100.0,wv_threshold_cirrus=0.25,l2a_process_timeout=3600',
 'wps_url': 'https://ec-better-apps-deployer.terradue.com/zoo/?'}

Update the values according to the WPS service to invoke:

In [18]:
processing_parameters['api_key'] = api_key['value']
processing_parameters['wps_params'] = app_process.wps_params.values[0]
processing_parameters['process_id'] = app_process.identifier.values[0]
processing_parameters['wps_url'] = wps_url['value']
processing_parameters['execution_mode'] = 'synchronous'
processing_parameters['_T2Username'] = data_pipeline['value']
processing_parameters['data_pipeline'] = data_pipeline['value']

**NOTE:** parameters such as the **source**, **subset_wkt** and **epsg** have to be updated as well. This notebooks submits the default parameters

In [19]:
processing_parameters

{'_T2Username': 'nextgeoss-satcen-003',
 'api_key': 'AKCp5dKENhKvxfYsnrXuVeBwWbF4DuEtFoF8Paykhtpcy4un3cdvHB8rMmJrF1sSDSDovdLcs',
 'data_pipeline': 'nextgeoss-satcen-003',
 'execution_mode': 'synchronous',
 'process_id': 'ec_nextgeoss_ewf_satcen_s1_change_detection_ewf_satcen_s1_change_detection_0_5',
 'publication_categories': 'volcano_id=211060,wvar=id0003',
 'source': 'dummy',
 'wps_params': 'source=https://catalog.terradue.com/sentinel1/search?format=atom&uid=S1B_S3_GRDH_1SDV_20180924T173034_20180924T173045_012860_017BEB_CA29|https://catalog.terradue.com/sentinel1/search?format=atom&uid=S1B_S3_GRDH_1SDV_20180912T173034_20180912T173045_012685_017687_5033,orbit_type=Precise,polarization=VV,subset_wkt=POLYGON((8.22762498164556 15.07791004244666|8.763045262446788 12.364797810361907|11.449070337799624 12.826368693010892|10.503161175050785 15.534087438341302|8.22762498164556 15.07791004244666|8.22762498164556 15.07791004244666))),epsg=EPSG:32632,dem_threshold=510',
 'wps_url': 'https://ec

### Submit the data item creation and monitor the execution

In [20]:
execution = owslib.wps.WPSExecution(url=wps_url_queue['value'])

execution_request = execution.buildRequest(identifier=queue_process.iloc[0].identifier, 
                                           inputs=processing_parameters.items(), 
                                           output=[('result_osd', False)])

execution_response = execution.submitRequest(etree.tostring(execution_request,
                                                            pretty_print=True))

time.sleep(10)

execution.parseResponse(execution_response)

execution.statusLocation



'http://ec-better-triggers-deployer.terradue.com/zoo-bin/zoo_loader.cgi?request=Execute&service=WPS&version=1.0.0&Identifier=GetStatus&DataInputs=sid=37d312d4-a799-11e9-8414-0242ac110019&RawDataOutput=Result'

In [21]:
monitorExecution(execution)



In [22]:
if execution.isSucceded() and processing_parameters['execution_mode'] == 'synchronous':
    
    osd_results = execution.processOutputs[0].reference
    
    search_results = cioppy.Cioppy().search(end_point=osd_results,
                                            params=[],
                                            output_fields='identifier,enclosure',
                                            model='GeoTime',
                                            timeout=50000)
    
    root = etree.fromstring(requests.get(search_results[0]['enclosure'], 
                                         auth=HTTPBasicAuth(data_pipeline['value'],
                                                            api_key['value'])).content) 
    
    link_results = root.xpath('/atom:feed/atom:entry/atom:link[@rel = "results"]', 
                              namespaces={'atom':'http://www.w3.org/2005/Atom'})[0].attrib['href']
    
    print link_results
    
    link_results_osd = link_results.replace('search', 'description')
    
    print link_results_osd
    
    o = urlparse(link_results)
    
    link_results_alt = '{}://{}/{}/search?{}={}'.format(o.scheme, o.netloc, o.path.split('/')[1], o.path.split('/')[2],o.path.split('/')[3])
    
    print link_results_alt


https://catalog.terradue.com:443/nextgeoss-satcen-003/cat/6949ea7ee6894f2c94a0c9f3f2e42a21/search
https://catalog.terradue.com:443/nextgeoss-satcen-003/cat/6949ea7ee6894f2c94a0c9f3f2e42a21/description
https://catalog.terradue.com:443/nextgeoss-satcen-003/search?cat=6949ea7ee6894f2c94a0c9f3f2e42a21
