## Submitting a job with the [`otello`](https://github.com/hysds/otello) python library

#### Once your job-type has been registered and built (see: [pge_create.ipynb](pge_create.ipynb)), jobs can be submitted from python using the steps laid out in this notebook.

#### While this notebook only shows submission of a single job/parameter-set, you can map or iterate over a collection of input parameter sets to efficiently submit large batches of jobs.

### Establish an otello `Mozart` instance to communicate with the HySDS cluster controller.
#### It will be necessary to provide credentials the first time you initialise otello.

#### When prompted for the HySDS host, include the protocol, e.g. https://my-mozart.jpl.nasa.gov

#### When prompted for "HySDS cluster authenticated", enter 'y' if the cluster requires a password to access.

In [1]:
import json
import os
import otello
import re
import shutil
import requests, json, getpass
from requests.auth import HTTPBasicAuth
import urllib3

from pathlib import Path
from pprint import pprint

if not os.path.exists(f"{Path.home()}/.config/otello/config.yml"):
    otello.client.initialize()

m = otello.mozart.Mozart()

#### Pele is initialized

In [2]:
mozart_base_url = 'https://137.78.250.114'
pele_base_url = f'{mozart_base_url}/pele/api/v0.1'

user = 'alexander.e.dunn@jpl.nasa.gov' #  input("Enter email address then press <Enter>: ")
print("Enter your Pele account password then press <Enter>.")
password = getpass.getpass()

r = requests.post(pele_base_url + '/login', auth=HTTPBasicAuth(user, password), verify=False)

# expect 200
print("status code: {}".format(r.status_code))
print(json.dumps(r.json(), indent=2))

# extract API token
token = r.json()['token']
print("token: {}".format(token))
assert r.status_code == 200

def retrieve_product(product_id: str):
    product_request = requests.get(f'{pele_base_url}/pele/dataset/{product_id}', params = {}, headers={'X-API-KEY': token}, verify=False)
    product_data = product_request.json()
#     print("status code: {}".format(product_request.status_code))
#     print(json.dumps(product_request.json(), indent=2))
    assert product_request.status_code == 200
    return product_data['result']

Enter your Pele account password then press <Enter>.


 ··········


status code: 200
{
  "success": true,
  "message": null,
  "token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJhbGV4YW5kZXIuZS5kdW5uQGpwbC5uYXNhLmdvdiIsImlhdCI6MTYyNDQ5NzQxMywiZXhwIjoxNjI0NTgzODEzfQ.75QfYUVhC_QrjbuDZc7WN20HyipvTHFxoG4weqxdOng"
}
token: eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJhbGV4YW5kZXIuZS5kdW5uQGpwbC5uYXNhLmdvdiIsImlhdCI6MTYyNDQ5NzQxMywiZXhwIjoxNjI0NTgzODEzfQ.75QfYUVhC_QrjbuDZc7WN20HyipvTHFxoG4weqxdOng


#### The input volcano_anomaly_aoi product is retrieved

In [3]:

aoi_product_id = 'VOLCANO_ANOMALY_AOI-2021-06-24T00:25:14.326925-d7951'
aoi_product_data = retrieve_product(aoi_product_id)

print(aoi_product_data)


{'browse_urls': ['http://soamc-dev-rs-fwd.s3-website-us-west-2.amazonaws.com/browse/VOLCANO_ANOMALY_AOI/v1.0/2021/06/24/VOLCANO_ANOMALY_AOI-2021-06-24T00:25:14.326925-d7951', 's3://s3-us-west-2.amazonaws.com:80/soamc-dev-rs-fwd/browse/VOLCANO_ANOMALY_AOI/v1.0/2021/06/24/VOLCANO_ANOMALY_AOI-2021-06-24T00:25:14.326925-d7951'], 'urls': ['http://soamc-dev-rs-fwd.s3-website-us-west-2.amazonaws.com/products/VOLCANO_ANOMALY_AOI/v1.0/2021/06/24/VOLCANO_ANOMALY_AOI-2021-06-24T00:25:14.326925-d7951', 's3://s3-us-west-2.amazonaws.com:80/soamc-dev-rs-fwd/products/VOLCANO_ANOMALY_AOI/v1.0/2021/06/24/VOLCANO_ANOMALY_AOI-2021-06-24T00:25:14.326925-d7951'], 'version': 'v1.0', 'objectid': 'VOLCANO_ANOMALY_AOI-2021-06-24T00:25:14.326925-d7951', 'continent': 'North America', 'center': {'type': 'point', 'coordinates': [-97.45325225477447, 22.822836269854633]}, 'location': {'type': 'multipolygon', 'coordinates': [[[[-96.419273, 21.798433], [-96.074295, 23.425812], [-98.505028, 23.842773], [-98.819763, 22.2

#### The mintpy input product is retrieved:

In [4]:

mintpy_input_product_id = 'S1-TIMESERIES-MINTPY-A124-20190214-718d4'
mintpy_product_data = retrieve_product(mintpy_input_product_id)

#### The input products are mapped to pge parameters:

In [5]:
mintpy_input_product_base_url = next(filter(lambda url: url.startswith('http'), mintpy_product_data['urls']))

pge_parameters = {
    'start_time': aoi_product_data['starttime'],
    'end_time': aoi_product_data['endtime'],
    'location': aoi_product_data['location'],
    'active_pixel_x_min': aoi_product_data['metadata']['active_pixel_x_min'],
    'active_pixel_x_max': aoi_product_data['metadata']['active_pixel_x_max'],
    'active_pixel_y_min': aoi_product_data['metadata']['active_pixel_y_min'],
    'active_pixel_y_max': aoi_product_data['metadata']['active_pixel_y_max'],
    'outer_pixel_x_min': aoi_product_data['metadata']['outer_pixel_x_min'],
    'outer_pixel_x_max': aoi_product_data['metadata']['outer_pixel_x_max'],
    'outer_pixel_y_min': aoi_product_data['metadata']['outer_pixel_y_min'],
    'outer_pixel_y_max': aoi_product_data['metadata']['outer_pixel_y_max'],
    'input_timeseries_dem_err_url': f'{mintpy_input_product_base_url}/timeseries_demErr.h5'
}

#### The job is initialized and provided with the runtime arguments:

In [6]:
job_type_name = 'job-volcano-anomaly_pge:main'

job_type = m.get_job_types()[job_type_name]
job_type.initialize()
job_type.set_input_params(pge_parameters)
pprint(job_type.get_input_params())

{'active_pixel_x_max': 512.0,
 'active_pixel_x_min': 488.0,
 'active_pixel_y_max': 512.0,
 'active_pixel_y_min': 488.0,
 'end_time': '2019-02-15T00:00:00',
 'input_timeseries_dem_err_url': 'http://soamc-dev-rs-fwd.s3-website-us-west-2.amazonaws.com/products/S1-TIMESERIES-MINTPY/v1.0/2019/02/14/S1-TIMESERIES-MINTPY-A124-20190214-718d4/timeseries_demErr.h5',
 'location': {'coordinates': [[[[-96.419273, 21.798433],
                                [-96.074295, 23.425812],
                                [-98.505028, 23.842773],
                                [-98.819763, 22.218616],
                                [-96.419273, 21.798433]]]],
              'type': 'multipolygon'},
 'lstm_grid_size': 3,
 'lstm_time_window': 9,
 'outer_pixel_x_max': 524.0,
 'outer_pixel_x_min': 476.0,
 'outer_pixel_y_max': 524.0,
 'outer_pixel_y_min': 476.0,
 'start_time': '2019-01-14T00:00:00'}


### Submitting the job
#### A job tag (useful for finding the job later) and job queue are specified. Both are optional. Job submission is asynchronous, so this call will return almost immediately.

In [7]:
from datetime import datetime

sample_job_tag = f'{datetime.strftime(datetime.now(), "%Y%m%d")}_volcano_anomaly'
print(sample_job_tag)

job_run = job_type.submit_job(tag=sample_job_tag, queue="factotum-job_worker-small")

20210624_volcano_anomaly


### Determining job completion
#### Information about the job state will print periodically, until the job is completed.

In [8]:
job_run.wait_for_completion()

92ba2bf1-b80e-4222-8a97-8e4b956839ae: job-queued 2021-06-24T01:16:57.763234
92ba2bf1-b80e-4222-8a97-8e4b956839ae: job-started 2021-06-24T01:17:27.921228
92ba2bf1-b80e-4222-8a97-8e4b956839ae: job-started 2021-06-24T01:17:58.065503
92ba2bf1-b80e-4222-8a97-8e4b956839ae: job-started 2021-06-24T01:18:28.230678
92ba2bf1-b80e-4222-8a97-8e4b956839ae: job-completed 2021-06-24T01:18:58.380734


'job-completed'