In [1]:
import urlparse
import requests
import os
import json

## Configuration

Get endpoint and authentication info from environment variables (**note:** provided by Abraham via email).

In [2]:
proto = os.environ.get('WES_API_PROTO')
host = os.environ.get('WES_API_HOST')
auth = os.environ.get('WES_API_AUTH')

In [3]:
headers = {'Authorization': auth}
endpoint = '{}://{}/ga4gh/wes/v1'.format(proto, host)

Create a simple object to store session/connection info.

In [4]:
class WorkflowServiceSession(object):
    """
    API session with workflow execution service endpoint.
    """
    def __init__(self, host, auth, proto='http'):
        self.endpoint = '{}://{}/ga4gh/wes/v1'.format(proto, host)
        self.headers = headers = {'Authorization': auth}


In [5]:
wes = WorkflowServiceSession(host=host, auth=auth, proto=proto)

## Getting info from WES

Create a generic `GET` request function (to minimize repeated code).

In [6]:
def wes_get_request(wes, target):
    """
    Generic workflow execution service API 'GET' request.
    """
    res = requests.get(
        '{}{}'.format(wes.endpoint, target),
        headers=wes.headers
    )
    # TODO: add some exception handling for different responses
    return res.json()

Use `get_info` to retrieve summary information about the WES endpoint.

In [7]:
def get_info(wes):
    """
    Collect properties of a workflow execution service server endpoint.
    """
    target = '/service-info'
    return wes_get_request(wes, target)

In [8]:
get_info(wes)

{u'engine_versions': {u'cwltool': u'1.0.20171107133715',
  u'dockstore': u'1.3.0'},
 u'key_values': {u'flavour': u'e.g. (r2.medium), instance type descriptor for AWS'},
 u'supported_filesystem_protocols': [u'http',
  u'https',
  u'sftp',
  u's3',
  u'gs',
  u'synapse'],
 u'supported_wes_versions': [u'v1.0'],
 u'system_state_counts': {u'Complete': 46},
 u'workflow_type_versions': {u'cwl': {u'workflow_type_version': [u'1.0']},
  u'wdl': {u'workflow_type_version': [u'1.0']}}}

Use `list_workflow_runs` to list all workflow instances/runs/jobs (completed, initialized, or running) that have been recorded in the history of the WES endpoint.

In [9]:
def list_workflow_runs(wes, page_size=100, page_token=0):
    """
    List all workflow runs in workflow execution service server endpoint.
    """
    target = '/workflows'
    return wes_get_request(wes, target)
    

In [10]:
list_workflow_runs(wes)

{u'next_page_token': None,
 u'workflows': [{u'state': u'Complete', u'workflow_id': u'1'},
  {u'state': u'Complete', u'workflow_id': u'2'},
  {u'state': u'Complete', u'workflow_id': u'3'},
  {u'state': u'Complete', u'workflow_id': u'4'},
  {u'state': u'Complete', u'workflow_id': u'5'},
  {u'state': u'Complete', u'workflow_id': u'6'},
  {u'state': u'Complete', u'workflow_id': u'7'},
  {u'state': u'Complete', u'workflow_id': u'8'},
  {u'state': u'Complete', u'workflow_id': u'9'},
  {u'state': u'Complete', u'workflow_id': u'10'},
  {u'state': u'Complete', u'workflow_id': u'11'},
  {u'state': u'Complete', u'workflow_id': u'12'},
  {u'state': u'Complete', u'workflow_id': u'13'},
  {u'state': u'Complete', u'workflow_id': u'14'},
  {u'state': u'Complete', u'workflow_id': u'15'},
  {u'state': u'Complete', u'workflow_id': u'16'},
  {u'state': u'Complete', u'workflow_id': u'17'},
  {u'state': u'Complete', u'workflow_id': u'18'},
  {u'state': u'Complete', u'workflow_id': u'19'},
  {u'state': u'Com

Use `get_workflow_run` to retrieve details about a specific workflow run by its ID.

In [11]:
def get_workflow_run(wes, workflow_run_id):
    target = '/workflows/{}'.format(workflow_run_id)
    return wes_get_request(wes, target)

In [12]:
# get details of the most recent workflow run
get_workflow_run(wes, list_workflow_runs(wes)['workflows'][-1]['workflow_id'])

{u'outputs': [{u'location': None,
   u'name': None,
   u'type': None,
   u'value': None}],
 u'request': {u'key_values': None,
  u'workflow_descriptor': u'#!/usr/bin/env cwl-runner\n\nclass: CommandLineTool\nid: "BAMStats"\nlabel: "BAMStats tool"\ncwlVersion: v1.0 \ndescription: |\n    ![build_status](https://quay.io/repository/collaboratory/dockstore-tool-bamstats/status)\n    A Docker container for the BAMStats command. See the [BAMStats](http://bamstats.sourceforge.net/) website for more information.\n\ndct:creator:\n  "@id": "http://orcid.org/0000-0002-7681-6415"\n  foaf:name: Brian O\'Connor\n  foaf:mbox: "mailto:briandoconnor@gmail.com"\n\nrequirements:\n  - class: DockerRequirement\n    dockerPull: "quay.io/collaboratory/dockstore-tool-bamstats:1.25-6_1.0"\n\nhints:\n  - class: ResourceRequirement\n    coresMin: 1\n    ramMin: 4092\n    outdirMin: 512000\n    doc: "the process requires at least 4G of RAM"\n\ninputs:\n  mem_gb:\n    type: int\n    default: 4\n    doc: "The memory,

Use `get_workflow_run_status` to retrieve the status of a workflow run (this is the same as the `state` property of a workflow run object). 

In [13]:
def get_workflow_run_status(wes, workflow_run_id):
    target = '/workflows/{}/status'.format(workflow_run_id)
    return wes_get_request(wes, target)

In [14]:
get_workflow_run_status(wes, list_workflow_runs(wes)['workflows'][-1]['workflow_id'])

{u'state': u'Complete', u'workflow_id': u'46'}

## Submitting workflows

Use `build_workflow_run_packet` to collect and format parameters/instructions for a new workflow run.

In [15]:
def build_workflow_run_packet(
    workflow_params, workflow_type, workflow_type_version, workflow_descriptor=None, 
    tags=None, workflow_engine_parameters=None, workflow_url=None, key_values=None
):
    """
    Build JSON request packet for submitting a workflow run.
    
    Fields from http://ga4gh.github.io/workflow-execution-service-schemas/definitions/ga4gh_wes_WorkflowRequest
    """
    packet = {
        'workflow_descriptor': workflow_descriptor, 
        'workflow_params' : workflow_params, 
        'workflow_type': workflow_type,
        'workflow_type_version': workflow_type_version,
        'tags': tags,
        'workflow_engine_parameters': workflow_engine_parameters,
        'workflow_url': workflow_url,
        'key_values' : key_values
    }
    return {k: v for k, v in packet.items()
            if v is not None}
    

`read_file` is a small helper function from Abraham (used here to read in JSON parameters file).

In [16]:
def read_file(file_name):
    file_r = open(file_name,'r')
    return file_r.read()

### 1. Testing with `dockstore-tool-bamstats`

Example workflow from Abraham's gist/notebook. Check out his notebook for the contents of `bams.json` (I just copied and pasted into a local file).

In [17]:
# File in local path
params = json.dumps(json.loads(read_file('bams.json')))

workflow_type = 'cwl'
workflow_type_version = '1.0'
key_values = {'flavour' : 'c4.xlarge', 'tracker_id': 'test1'} # Consonance specific key value

Create a generic `POST` request function (to minimize repeated code). Returns the response of the request.

In [18]:
def wes_post_request(wes, target, json):
    """
    Generic workflow execution service API 'POST' request.
    """
    res = requests.post(
        '{}{}'.format(wes.endpoint, target),
        headers=wes.headers,
        json=json
    )
    # TODO: add some exception handling for different responses
    return res.json()

Use `submit_workflow_run` to initiate a new workflow run with the specified parameters. The response for this request will give the ID of the submitted workflow run.

In [19]:
def submit_workflow_run(wes, workflow_run_packet, tags=None):
    target = '/workflows'
    params = json.loads(workflow_run_packet['workflow_params'])
    params.update(tags)
    workflow_run_packet['workflow_params'] = json.dumps(params)
    # print request data for testing
    print(workflow_run_packet)
    return wes_post_request(wes, target, json=workflow_run_packet)

#### 1.1. Using Dockstore ID

This test follows Abraham's example, using a Dockstore tool ID for the `workflow_descriptor` field. 

> I don't think this technically conforms with the WES spec (?) — my understanding is that `_descriptor` should be the actual descriptor object, and `_url` should be a URL path to the descriptor.

In [20]:
# Dockstore tool id.
descriptor = "quay.io/collaboratory/dockstore-tool-bamstats:1.25-6_1.0" 

In [21]:
test1_1 = submit_workflow_run(
    wes, 
    workflow_run_packet = build_workflow_run_packet(
        workflow_descriptor=descriptor,
        workflow_params=params,
        workflow_type=workflow_type,
        workflow_type_version=workflow_type_version,
        key_values=key_values
    ),
    tags={'track_id': 'test1_1'}
)
test1_1

{'key_values': {'flavour': 'c4.xlarge', 'tracker_id': 'test1'}, 'workflow_type_version': '1.0', 'workflow_params': '{"bam_input": {"path": "https://s3-us-west-2.amazonaws.com/achave11-redwood-storage/consonance-outputs/rna.SRR948778.bam", "class": "File", "format": "http://edamontology.org/format_2572"}, "bamstats_report": {"path": "/tmp/bamstats_report.zip", "class": "File"}, "track_id": "test1_1"}', 'workflow_descriptor': 'quay.io/collaboratory/dockstore-tool-bamstats:1.25-6_1.0', 'workflow_type': 'cwl'}


{u'workflow_id': u'33'}

> I think there's a bug with the Consonance workflow run counter (or the `POST` endpoint in WES) — we saw above that there were over 40 workflow runs, so the new run should have an ID corresponding to a larger number. Can double check:

In [22]:
list_workflow_runs(wes)

{u'next_page_token': None,
 u'workflows': [{u'state': u'Complete', u'workflow_id': u'1'},
  {u'state': u'Complete', u'workflow_id': u'2'},
  {u'state': u'Complete', u'workflow_id': u'3'},
  {u'state': u'Complete', u'workflow_id': u'4'},
  {u'state': u'Complete', u'workflow_id': u'5'},
  {u'state': u'Complete', u'workflow_id': u'6'},
  {u'state': u'Complete', u'workflow_id': u'7'},
  {u'state': u'Complete', u'workflow_id': u'8'},
  {u'state': u'Complete', u'workflow_id': u'9'},
  {u'state': u'Complete', u'workflow_id': u'10'},
  {u'state': u'Complete', u'workflow_id': u'11'},
  {u'state': u'Complete', u'workflow_id': u'12'},
  {u'state': u'Complete', u'workflow_id': u'13'},
  {u'state': u'Complete', u'workflow_id': u'14'},
  {u'state': u'Complete', u'workflow_id': u'15'},
  {u'state': u'Complete', u'workflow_id': u'16'},
  {u'state': u'Complete', u'workflow_id': u'17'},
  {u'state': u'Complete', u'workflow_id': u'18'},
  {u'state': u'Complete', u'workflow_id': u'19'},
  {u'state': u'Com

Add a little function to find the actual ID of the workflow run.

In [23]:
def find_workflow_run(wes, tag_key, tag_val, n=5):
    wr_list = []
    for wr in list_workflow_runs(wes)['workflows'][-n:]:
        params = json.loads(
            get_workflow_run(wes, wr['workflow_id'])['request']['workflow_params']
        )
        wr_tag_val = params.get(tag_key, None)
        if wr_tag_val == tag_val:
            wr_list.append(wr['workflow_id'])
    return wr_list[-1]

In [24]:
test1_1_id = find_workflow_run(wes, 'track_id', 'test1_1')
test1_1_id

u'47'

OK, so the now that I have the actual workflow run ID, I can check status and details using the same functions as above.

In [27]:
get_workflow_run_status(wes, test1_1_id)

{u'state': u'Complete', u'workflow_id': u'47'}

> Assume I re-ran the above cell until the `state` was `Complete`.

In [28]:
get_workflow_run(wes, test1_1_id)

{u'outputs': [{u'location': None,
   u'name': None,
   u'type': None,
   u'value': None}],
 u'request': {u'key_values': None,
  u'workflow_descriptor': u'#!/usr/bin/env cwl-runner\n\nclass: CommandLineTool\nid: "BAMStats"\nlabel: "BAMStats tool"\ncwlVersion: v1.0 \ndescription: |\n    ![build_status](https://quay.io/repository/collaboratory/dockstore-tool-bamstats/status)\n    A Docker container for the BAMStats command. See the [BAMStats](http://bamstats.sourceforge.net/) website for more information.\n\ndct:creator:\n  "@id": "http://orcid.org/0000-0002-7681-6415"\n  foaf:name: Brian O\'Connor\n  foaf:mbox: "mailto:briandoconnor@gmail.com"\n\nrequirements:\n  - class: DockerRequirement\n    dockerPull: "quay.io/collaboratory/dockstore-tool-bamstats:1.25-6_1.0"\n\nhints:\n  - class: ResourceRequirement\n    coresMin: 1\n    ramMin: 4092\n    outdirMin: 512000\n    doc: "the process requires at least 4G of RAM"\n\ninputs:\n  mem_gb:\n    type: int\n    default: 4\n    doc: "The memory,

#### Test 1.1 Results

> Aside from the fact that my `key_values` don't appear to have been correctly parsed/set from the request, everything else seems to have worked fine.

#### Using CWL content

Next attempt is to use the actual CWL content/document for the `workflow_descriptor` field. First, I'll grab the content from the descriptor URL.

In [29]:
# Dockstore tool URL
descriptor_url = 'https://dockstore.org:8443/api/ga4gh/v2/tools/quay.io%2Fcollaboratory%2Fdockstore-tool-bamstats/versions/1.25-6_1.0/plain-CWL/descriptor/%2FDockstore.cwl'

# Get descriptor from URL
descriptor_content = requests.get(descriptor_url).content

Descriptor content looks like:

In [30]:
descriptor_content

'#!/usr/bin/env cwl-runner\n\nclass: CommandLineTool\nid: "BAMStats"\nlabel: "BAMStats tool"\ncwlVersion: v1.0 \ndescription: |\n    ![build_status](https://quay.io/repository/collaboratory/dockstore-tool-bamstats/status)\n    A Docker container for the BAMStats command. See the [BAMStats](http://bamstats.sourceforge.net/) website for more information.\n\ndct:creator:\n  "@id": "http://orcid.org/0000-0002-7681-6415"\n  foaf:name: Brian O\'Connor\n  foaf:mbox: "mailto:briandoconnor@gmail.com"\n\nrequirements:\n  - class: DockerRequirement\n    dockerPull: "quay.io/collaboratory/dockstore-tool-bamstats:1.25-6_1.0"\n\nhints:\n  - class: ResourceRequirement\n    coresMin: 1\n    ramMin: 4092\n    outdirMin: 512000\n    doc: "the process requires at least 4G of RAM"\n\ninputs:\n  mem_gb:\n    type: int\n    default: 4\n    doc: "The memory, in GB, for the reporting tool"\n    inputBinding:\n      position: 1\n\n  bam_input:\n    type: File\n    doc: "The BAM file used as input, it must be s

In [31]:
test1_2 = submit_workflow_run(
    wes, 
    build_workflow_run_packet(
        workflow_descriptor=descriptor_content,
        workflow_params=params,
        workflow_type=workflow_type,
        workflow_type_version=workflow_type_version,
        key_values=key_values
    ),
    tags={'track_id': 'test1_2'}
)
test1_2

{'key_values': {'flavour': 'c4.xlarge', 'tracker_id': 'test1'}, 'workflow_type_version': '1.0', 'workflow_params': '{"bam_input": {"path": "https://s3-us-west-2.amazonaws.com/achave11-redwood-storage/consonance-outputs/rna.SRR948778.bam", "class": "File", "format": "http://edamontology.org/format_2572"}, "bamstats_report": {"path": "/tmp/bamstats_report.zip", "class": "File"}, "track_id": "test1_2"}', 'workflow_descriptor': '#!/usr/bin/env cwl-runner\n\nclass: CommandLineTool\nid: "BAMStats"\nlabel: "BAMStats tool"\ncwlVersion: v1.0 \ndescription: |\n    ![build_status](https://quay.io/repository/collaboratory/dockstore-tool-bamstats/status)\n    A Docker container for the BAMStats command. See the [BAMStats](http://bamstats.sourceforge.net/) website for more information.\n\ndct:creator:\n  "@id": "http://orcid.org/0000-0002-7681-6415"\n  foaf:name: Brian O\'Connor\n  foaf:mbox: "mailto:briandoconnor@gmail.com"\n\nrequirements:\n  - class: DockerRequirement\n    dockerPull: "quay.io/co

{u'workflow_id': u'33'}

In [32]:
test1_2_id = find_workflow_run(wes, 'track_id', 'test1_2')
test1_2_id

u'48'

In [41]:
get_workflow_run_status(wes, test1_2_id)

{u'state': u'Complete', u'workflow_id': u'48'}

In [42]:
get_workflow_run(wes, test1_2_id)

{u'outputs': [{u'location': None,
   u'name': None,
   u'type': None,
   u'value': None}],
 u'request': {u'key_values': None,
  u'workflow_descriptor': None,
  u'workflow_params': u'{"bam_input": {"path": "https://s3-us-west-2.amazonaws.com/achave11-redwood-storage/consonance-outputs/rna.SRR948778.bam", "class": "File", "format": "http://edamontology.org/format_2572"}, "bamstats_report": {"path": "/tmp/bamstats_report.zip", "class": "File"}, "track_id": "test1_2"}',
  u'workflow_type': u'cwl',
  u'workflow_type_version': u'1.0'},
 u'state': u'Complete',
 u'task_logs': [{u'cmd': None,
   u'endTime': u'2018-05-03 04:22:01.895187',
   u'exitCode': None,
   u'name': u'cwl',
   u'startTime': u'2018-05-03 04:20:07.436',
   u'stderr': u"/usr/local/bin/cwltool 1.0.20160712154127\n['docker', 'pull', 'quay.io/collaboratory/dockstore-tool-bamstats:1.25-6_1.0']\n1.25-6_1.0: Pulling from collaboratory/dockstore-tool-bamstats\n3f992ab3df53: Pulling fs layer\n0aa0bd28396f: Pulling fs layer\ndb7bb1508

#### Test 1.2 Results

> `workflow_descriptor` appears to be unset (failed to parse?); however, there's no other indication that the job failed. I assume that the `stderr` is held over from a previous job? Or maybe not. Hard to tell what's happening...

#### 1.3 Using descriptor URL

Now I'll provide the descriptor URL in the `workflow_url` field — which, based on my understanding of the WES spec, is a valid option.

In [43]:
test1_3 = submit_workflow_run(
    wes, 
    build_workflow_run_packet(
        workflow_url=descriptor_url,
        workflow_params=params,
        workflow_type=workflow_type,
        workflow_type_version=workflow_type_version,
        key_values=key_values
    ),
    tags={'track_id': 'test1_3'}
)
test1_3

{'key_values': {'flavour': 'c4.xlarge', 'tracker_id': 'test1'}, 'workflow_url': 'https://dockstore.org:8443/api/ga4gh/v2/tools/quay.io%2Fcollaboratory%2Fdockstore-tool-bamstats/versions/1.25-6_1.0/plain-CWL/descriptor/%2FDockstore.cwl', 'workflow_type_version': '1.0', 'workflow_params': '{"bam_input": {"path": "https://s3-us-west-2.amazonaws.com/achave11-redwood-storage/consonance-outputs/rna.SRR948778.bam", "class": "File", "format": "http://edamontology.org/format_2572"}, "bamstats_report": {"path": "/tmp/bamstats_report.zip", "class": "File"}, "track_id": "test1_3"}', 'workflow_type': 'cwl'}


{u'code': 400, u'message': u'Unable to process JSON'}

#### Test1.3 Results

> Fail. :(

#### 1.4 Using descriptor URL, part 2

Maybe if I provide the descriptor URL as the `workflow_descriptor`...

In [44]:
test1_4 = submit_workflow_run(
    wes, 
    build_workflow_run_packet(
        workflow_descriptor=descriptor_url,
        workflow_params=params,
        workflow_type=workflow_type,
        workflow_type_version=workflow_type_version,
        key_values=key_values
    ),
    tags={'track_id': 'test1_4'}
)
test1_4

{'key_values': {'flavour': 'c4.xlarge', 'tracker_id': 'test1'}, 'workflow_type_version': '1.0', 'workflow_params': '{"bam_input": {"path": "https://s3-us-west-2.amazonaws.com/achave11-redwood-storage/consonance-outputs/rna.SRR948778.bam", "class": "File", "format": "http://edamontology.org/format_2572"}, "bamstats_report": {"path": "/tmp/bamstats_report.zip", "class": "File"}, "track_id": "test1_4"}', 'workflow_descriptor': 'https://dockstore.org:8443/api/ga4gh/v2/tools/quay.io%2Fcollaboratory%2Fdockstore-tool-bamstats/versions/1.25-6_1.0/plain-CWL/descriptor/%2FDockstore.cwl', 'workflow_type': 'cwl'}


{u'workflow_id': u'33'}

In [45]:
test1_4_id = find_workflow_run(wes, 'track_id', 'test1_4')
test1_4_id

u'49'

In [47]:
get_workflow_run_status(wes, test1_4_id)

{u'state': u'Complete', u'workflow_id': u'49'}

In [48]:
get_workflow_run(wes, test1_4_id)

{u'outputs': [{u'location': None,
   u'name': None,
   u'type': None,
   u'value': None}],
 u'request': {u'key_values': None,
  u'workflow_descriptor': u'#!/usr/bin/env cwl-runner\n\nclass: CommandLineTool\nid: "BAMStats"\nlabel: "BAMStats tool"\ncwlVersion: v1.0 \ndescription: |\n    ![build_status](https://quay.io/repository/collaboratory/dockstore-tool-bamstats/status)\n    A Docker container for the BAMStats command. See the [BAMStats](http://bamstats.sourceforge.net/) website for more information.\n\ndct:creator:\n  "@id": "http://orcid.org/0000-0002-7681-6415"\n  foaf:name: Brian O\'Connor\n  foaf:mbox: "mailto:briandoconnor@gmail.com"\n\nrequirements:\n  - class: DockerRequirement\n    dockerPull: "quay.io/collaboratory/dockstore-tool-bamstats:1.25-6_1.0"\n\nhints:\n  - class: ResourceRequirement\n    coresMin: 1\n    ramMin: 4092\n    outdirMin: 512000\n    doc: "the process requires at least 4G of RAM"\n\ninputs:\n  mem_gb:\n    type: int\n    default: 4\n    doc: "The memory,

#### Test 1.4 Results

> Inconclusive. The descriptor document seems to have been correctly retrieved and set, but I'm still unsure if I can trust the `stderr`. I probably need to test sequentially with different workflows.