In [1]:
%load_ext autoreload
% autoreload 2

In [3]:
import os
import json
import cromwell_manager as cwm
from google.cloud import storage
from IPython import display

## Basic Cromwell Manager Functionality

This notebook walks through the basic functionality of cromwell-manager. A typical workflow involves defining and running some WDL workflows using a cromwell server. 

This server can either be a locally running server, in which case you access it from a localhost url, or a web service, which will have its own URL. For the purpose of this demo notebook, we'll assume you're setting up a local server. It also assumes you've set-up and authenticated a version of the google-cloud-sdk and can make a successful call to `google.cloud.storage.Client()`

## Set up clients & confirm they're running

In [6]:
# no username and password for localhost. 
PORT = 'secret'  # replace with a number
google_project = 'broad-dsde-mint-dev'
cromwell_url = 'http://localhost:{PORT}'.format(PORT=PORT)
local_config = {'cromwell_url': cromwell_url}
cromwell = cwm.Cromwell(**local_config)

The constructor for `Cromwell` confirms that you've specified a properly identified server. However you can re-run this check with `Cromwell.server_is_running()`. This can also help debug rare cases where your server shuts down mid-workflow. 

In [7]:
cromwell.server_is_running()

True

In [8]:
# confirm client is properly authenticated by listing the buckets
client = storage.Client(project=google_project)
buckets = list(client.list_buckets())
print(buckets[:2]) # just list the first two

[<Bucket: artifacts.broad-dsde-mint-dev.appspot.com>, <Bucket: broad-dsde-mint-dev>]


## Define an example workflow

Cromwell is set up to accept local files, google storage endpoints, and http or https endpoints for its inputs. To demonstrate the https capabilities, this demo will pull the files directly from the `cromwell-manager` git repository. We'll download a WDL file that runs a testing workflow that spins up an inexpensive google instance and sleeps for 15 seconds. We will attach a monitoring script to it so we can see how the memory and disk usage fluctuates across the run. 

First, however, assuming you're running this notebook from a cloned repository, lets take a look at the files we're about to run. 

In [25]:
data_dir = '../cromwell_manager/test/data/' 
local_wdl = data_dir + 'testing.wdl'
local_inputs = data_dir + 'testing_example_inputs.json'
local_options = data_dir + 'options.json'

We can visualize each of these modules because they're very simple. The inputs file provides a single input, the amount of time to sleep. The options file provides the monitoring script and turns off call caching to make sure the submission provokes a fresh run. Finally, the WDL defines a task that takes the time input and sleeps for that amount of time. 

In [27]:
!cat $local_inputs

{
  "Sleep.time": 15
}


In [28]:
!cat $local_options

{
  "monitoring_script": "gs://broad-dsde-mint-dev-teststorage/10x/benchmark/scripts/monitor.sh",
  "read_from_cache":false,
  "write_to_cache":false
}

In [29]:
!cat $local_wdl


task SleepAWhile {
  Int time

  command {
    lsblk
    df -k
    sleep ${time}
    echo "something"
  }

  runtime {
    cpu: "1"
    docker: "ubuntu:zesty"
    memory: "1 GB"
    disks: "local-disk 10 HDD"
  }
}

workflow Sleep {
  Int time

  call SleepAWhile {
    input:
      time = time
  }
}


Now we define the files we're looking to download

In [34]:
wdl = 'https://raw.githubusercontent.com/ambrosejcarr/cromwell-manager/master/src/cromwell_manager/test/data/testing.wdl'
inputs = 'https://raw.githubusercontent.com/ambrosejcarr/cromwell-manager/master/src/cromwell_manager/test/data/testing_example_inputs.json'
options = 'https://raw.githubusercontent.com/ambrosejcarr/cromwell-manager/master/src/cromwell_manager/test/data/options.json'

## Submit and explore the workflow

The Cromwell Manager package defines two main classes: Cromwell and Workflow. An instance of the `Cromwell` object checks that it points to a valid, authenticated, active Cromwell instance when it starts up. It defines all of the REST api methods supported by cromwell. In contrast, a `Workflow` instance represents a workflow that the `Cromwell` server is aware of. Thus, it has two constructors: one that submits a new workflow, and one that builds the object based on an existing run. We will explore both below. 

First, we'll use the secondary constructor to submit a new workflow. Later we'll query `Cromwell` and use the discovered run_id to demonstrate the primary constructor. 

One useful capability of cromwell that this package exposes is the ability to add custom tags to runs. This will help us find the workflow we're initializing with a query. 

In [35]:
custom_labels = {'type': 'basicfunctionalitytest'}

Documentation for any function in the cromwell-manager package can be accessed by printing the `__doc__` method, or by using `?cwm.<object_name>` in `jupyter`.

In [32]:
# also accessible with ?cwm.Workflow.from_submission
print(cwm.Workflow.from_submission.__doc__)

Submit a new workflow, returning a Workflow object.


        :param str wdl: wdl that defines this workflow
        :param str inputs_json: inputs to this wdl
        :param Cromwell cromwell_server: an authenticated cromwell server
        :param storage.Client storage_client: authenticated google storage client

        :param str workflow_dependencies:
        :param dict custom_labels:
        :param str options_json: options file for the workflow

        :param bool wait: if True, wait until workflow recognizes as submitted (default: True)
        :param int timeout: maximum time to wait
        :param int delay: time between status queries
        :param bool verbose: if True, print request results
        :param args: additional positional args to pass to requests.post
        :param kwargs: additional keyword args to pass to request.post

        :return dict: Cromwell submission result
        


In [36]:
test_workflow = cwm.Workflow.from_submission(
    wdl=wdl, 
    inputs_json=inputs, 
    cromwell_server=cromwell,
    storage_client=client,
    custom_labels=custom_labels,
    options_json=options)

the Cromwell rest API exposes a number of useful endpoints that we can use to interact with and evaluate the outcome of a running workflow. For any command to a `Cromwell` instance, specifying `verbose=True` will print the response in addition to storing the output, and specifying `open_browser=True` for any GET request will display the json response in your browser. 

Both the `Cromwell` and `Workflow` classes provide access to a most of the endpoints. In most cases the `Workflow` API is simpler but less fully featured. Below, we describe these two ways to get the status of a workflow. In the latter case, we both open the browser window and print the request with `verbose`.  

In [37]:
# version 1
test_workflow.status

{'id': '7e940c5a-ed9c-4558-84ea-fbc83387b682', 'status': 'Submitted'}

In [38]:
# version 2
response = cromwell.status(test_workflow.id, open_browser=True, verbose=True)

GET Request: http://localhost:6361/api/workflows/v1/7e940c5a-ed9c-4558-84ea-fbc83387b682/status
Response: 200
Response Content:
{
  "status": "Submitted",
  "id": "7e940c5a-ed9c-4558-84ea-fbc83387b682"
}


We can also get a run's metadata, which we read in as a traversable dictionary:

In [39]:
test_workflow.metadata.keys()

dict_keys(['workflowName', 'submittedFiles', 'calls', 'outputs', 'workflowRoot', 'id', 'inputs', 'labels', 'submission', 'status', 'start'])

Or we can print the whole dictionary:

In [40]:
test_workflow.metadata

{'calls': {'Sleep.SleepAWhile': [{'attempt': 1,
    'backend': 'JES',
    'callCaching': {'allowResultReuse': False,
     'effectiveCallCachingMode': 'CallCachingOff'},
    'executionStatus': 'Starting',
    'shardIndex': -1,
    'start': '2017-10-13T19:03:01.530-04:00'}]},
 'id': '7e940c5a-ed9c-4558-84ea-fbc83387b682',
 'inputs': {'Sleep.time': 15},
 'labels': {'cromwell-workflow-id': 'cromwell-7e940c5a-ed9c-4558-84ea-fbc83387b682',
  'type': 'basicfunctionalitytest'},
 'outputs': {},
 'start': '2017-10-13T19:02:59.057-04:00',
 'status': 'Running',
 'submission': '2017-10-13T19:02:49-04:00',
 'submittedFiles': {'inputs': '{"Sleep.time":15}',
  'labels': '{"type": "basicfunctionalitytest"}',
  'options': '{\n  "monitoring_script": "gs://broad-dsde-mint-dev-teststorage/10x/benchmark/scripts/monitor.sh",\n  "read_from_cache": false,\n  "write_to_cache": false\n}',
  'workflow': '\ntask SleepAWhile {\n  Int time\n\n  command {\n    lsblk\n    df -k\n    sleep ${time}\n    echo "something"

## Explore Workflow results and resource utilization

After the workflow completes, we can automatically parse information on the tasks that were run. In this case, we ran a monitoring script and can figure out how much memory and disk was used in the task. While it's not necessary to do this, we can first look at the actual output of the monitoring script.

In [41]:
# can take up to two minutes, considering overhead required to spin up the instance
test_workflow.wait_until_complete(timeout=120, delay=5)

We can display the logs for the successful run. Unfortunately cromwell doesn't consider our monitoring script a log, so we need to get the workflow root. 

In [42]:
cromwell.logs(test_workflow.id, verbose=True)

GET Request: http://localhost:6361/api/workflows/v1/7e940c5a-ed9c-4558-84ea-fbc83387b682/logs
Response: 200
Response Content:
{
  "calls": {
    "Sleep.SleepAWhile": [
      {
        "stdout": "gs://broad-dsde-mint-dev-cromwell-execution/Sleep/7e940c5a-ed9c-4558-84ea-fbc83387b682/call-SleepAWhile/SleepAWhile-stdout.log",
        "shardIndex": -1,
        "stderr": "gs://broad-dsde-mint-dev-cromwell-execution/Sleep/7e940c5a-ed9c-4558-84ea-fbc83387b682/call-SleepAWhile/SleepAWhile-stderr.log",
        "attempt": 1,
        "backendLogs": {
          "log": "gs://broad-dsde-mint-dev-cromwell-execution/Sleep/7e940c5a-ed9c-4558-84ea-fbc83387b682/call-SleepAWhile/SleepAWhile.log"
        }
      }
    ]
  },
  "id": "7e940c5a-ed9c-4558-84ea-fbc83387b682"
}


<Response [200]>

We can use an accessory class `GSObject` to easily download any of the above logs and take a look through them. In this case our run succeeded so they `stdout` and `stderr` are pretty boring. We'll get the backend log

In [43]:
# sleepy_run_log = cromwell.logs(test_workflow.id)[<indexers>]  # also works!
sleepy_run_log = test_workflow.logs['calls']['Sleep.SleepAWhile'][0]['backendLogs']['log']

In [44]:
gs_log = cwm.io_util.GSObject(sleepy_run_log, client=client)
log_data = gs_log.download_as_string()
print(log_data)

2017/10/13 23:03:44 I: Switching to status: pulling-image
2017/10/13 23:03:44 I: Calling SetOperationStatus(pulling-image)
2017/10/13 23:03:44 I: SetOperationStatus(pulling-image) succeeded
2017/10/13 23:03:44 I: Pulling image "ubuntu@sha256:da2fd4e2e10e0ab991f251353a2d3e32d38c75a83a917dbca0a307efd8730f49"
2017/10/13 23:03:59 I: Pulled image "ubuntu@sha256:da2fd4e2e10e0ab991f251353a2d3e32d38c75a83a917dbca0a307efd8730f49" successfully.
2017/10/13 23:03:59 I: Switching to status: localizing-files
2017/10/13 23:03:59 I: Calling SetOperationStatus(localizing-files)
2017/10/13 23:03:59 I: SetOperationStatus(localizing-files) succeeded
2017/10/13 23:03:59 I: Docker file /cromwell_root/monitoring.sh maps to host location /mnt/local-disk/monitoring.sh.
2017/10/13 23:03:59 I: Running command: sudo gsutil -q -m cp gs://broad-dsde-mint-dev-teststorage/10x/benchmark/scripts/monitor.sh /mnt/local-disk/monitoring.sh
2017/10/13 23:04:01 I: Docker file /cromwell_root/exec.sh maps to host location /mnt

The above log shows an example of what goes on in a normal cromwell Task. Now we'll look at the resource utilization for our run. Another way to access files is through the workflow root for the cromwell run. This is recorded by the run metadata, but is also accessible from a property for easy access. `inputs`, `outputs`, `logs`, `root`, `metadata`, and `status` are all available in this fashion.

In [45]:
# our call was called SleepAWhile; we can get the file from google storage. 
log_filename = test_workflow.root + 'call-SleepAWhile/monitoring.log'
print(test_workflow.root)
print(log_filename)

gs://broad-dsde-mint-dev-cromwell-execution/Sleep/7e940c5a-ed9c-4558-84ea-fbc83387b682/
gs://broad-dsde-mint-dev-cromwell-execution/Sleep/7e940c5a-ed9c-4558-84ea-fbc83387b682/call-SleepAWhile/monitoring.log


Download and print the monitoring log. 

In [46]:
gs_monitoring = cwm.io_util.GSObject(log_filename, client=client)
log_data = gs_monitoring.download_as_string()
print(log_data)

--- General Information ---
#CPU: 1
Total Memory (MB): 1700
Total Disk space (KB): 10190136

--- Runtime Information ---
* Memory usage (%): 7.53%
* Memory usage (MB): 128
* Disk usage (%): 0.23%
* Disk usage (KB): 23044
* Memory usage (%): 7.59%
* Memory usage (MB): 128
* Disk usage (%): 0.23%
* Disk usage (KB): 23044
* Memory usage (%): 7.53%
* Memory usage (MB): 128
* Disk usage (%): 0.23%
* Disk usage (KB): 23044



This information is automatically parsed by the `Task` object and stored in a `ResourceUtilization` object, which is created when you call `tasks()` on a workflow. 

In [47]:
for name, task in test_workflow.tasks().items():
    print(name)
    print(task.resource_utilization)

Sleep.SleepAWhile
SleepAWhile Monitoring Summary:
Max Memory Usage (MB): 128
Available Memory (MB): 1700
Max disk usage   (KB): 23044
Available disk   (KB): 10190136
Disk Utilized     (%): 0.002
Memory Utilized   (%): 0.075
Robust Estimate?     : True



This information can be saved to a file for later analysis. Below I create a tempfile to avoid polluting the filesystem of those running this demo, and print the contents to prove the save worked. However, one could easily pass an open file or a string filename. 

In [48]:
import tempfile
tf = tempfile.TemporaryFile(mode='w+')
test_workflow.save_resource_utilization(tf)
tf.seek(0); print(tf.read()); tf.close()

SleepAWhile Monitoring Summary:
Max Memory Usage (MB): 128
Available Memory (MB): 1700
Max disk usage   (KB): 23044
Available disk   (KB): 10190136
Disk Utilized     (%): 0.002
Memory Utilized   (%): 0.075
Robust Estimate?     : True



## Interact with previously completed workflows

Earlier it was stated that there are two `Workflow` constructors. Lets use some of the other cromwell functionality to show how that other constructor works. First, lets find our workflow using cromwell's query syntax. 

In [49]:
cromwell.query(status=['Succeeded'], names=['Sleep'], verbose=True)

GET Request: http://localhost:6361/api/workflows/v1/query?name=Sleep&status=Succeeded
Response: 200
Response Content:
{
  "results": [
    {
      "name": "Sleep",
      "id": "7e940c5a-ed9c-4558-84ea-fbc83387b682",
      "status": "Succeeded",
      "end": "2017-10-13T19:04:29.405-04:00",
      "start": "2017-10-13T19:02:59.057-04:00"
    }
  ]
}


<Response [200]>

Here I've run a couple, but the last one is the one we're looking for, which was run today. 

Below we use the other constructor to create a `Workflow` from a run_id object. 

In [51]:
workflow_id = cromwell.query(status=['Succeeded'], names=['Sleep']).json()['results'][-1]['id']
duplicate_workflow = cwm.Workflow(workflow_id=workflow_id, cromwell_server=cromwell, storage_client=client)
duplicate_workflow.status  # same as above. 

{'id': '7e940c5a-ed9c-4558-84ea-fbc83387b682', 'status': 'Succeeded'}

We can also look at the timing diagram, which for this workflow is boring (will open in another window)

In [52]:
duplicate_workflow.timing()

## Other miscellaneous functionality

Display cromwell backends:

In [53]:
response = cromwell.backends(verbose=True)

GET Request: http://localhost:6361/api/workflows/v1/backends
Response: 200
Response Content:
{
  "supportedBackends": [
    "JES",
    "Local",
    "SGE"
  ],
  "defaultBackend": "JES"
}


Display run outputs (note: our task doesn't have any!)

In [54]:
outputs = cromwell.outputs(test_workflow.id, verbose=True)

GET Request: http://localhost:6361/api/workflows/v1/7e940c5a-ed9c-4558-84ea-fbc83387b682/outputs
Response: 200
Response Content:
{
  "outputs": {},
  "id": "7e940c5a-ed9c-4558-84ea-fbc83387b682"
}


Abort a workflow (this will fail, since our workflow is already complete!)

In [55]:
test_workflow.abort()

{'message': "Couldn't abort 7e940c5a-ed9c-4558-84ea-fbc83387b682 because no workflow with that ID is in progress",
 'status': 'error'}

Finally, open the swagger API for your instance:

In [56]:
cromwell.swagger()