# Launching WES workflow on DRS objects based on a Data Connect query

## Check if endpoints are available

1) Check if WES endpoints are available

In [794]:
import requests
import json

def pretty_print_json(response):
    # pretty print JSON in blue color
    print("\033[38;2;8;75;138m"+json.dumps(response.json(), indent=4)+"\033[0m")
    
def print_head(text):
    # print in green color
    print("\033[38;2;8;138;75m"+text+"\033[0m")

In [795]:
node_ips = ['ga4gh-starter-kit.ilifu.ac.za','154.114.10.54','154.114.10.62']
wes_port = "6000"
service_info_path = "/service-info"
runs_path = "/runs"
http_method = "GET"

for node_ip in node_ips:
    ga4gh_base_url = "http://" + node_ip + ":{}/ga4gh/{}/v1"
    wes_base_url = ga4gh_base_url.format(wes_port,"wes")
    request_url = wes_base_url+service_info_path
    print_head("{} request to {}".format(http_method, request_url))
    # GET request to service-info endpoint
    wes_service_info_resp = requests.request(http_method, request_url)
    # print the response
    pretty_print_json(wes_service_info_resp)

[38;2;8;138;75mGET request to http://ga4gh-starter-kit.ilifu.ac.za:6000/ga4gh/wes/v1/service-info[0m
[38;2;8;75;138m{
    "id": "org.ga4gh.starterkit.wes",
    "name": "GA4GH Starter Kit WES Service",
    "description": "An open source, community-driven implementation of the GA4GH Workflow Execution Service (WES)API specification.",
    "contactUrl": "mailto:info@ga4gh.org",
    "documentationUrl": "https://github.com/ga4gh/ga4gh-starter-kit-wes",
    "createdAt": "2020-01-15T12:00:00Z",
    "updatedAt": "2020-01-15T12:00:00Z",
    "environment": "test",
    "version": "0.3.2",
    "type": {
        "group": "org.ga4gh",
        "artifact": "wes",
        "version": "1.0.1"
    },
    "organization": {
        "name": "Global Alliance for Genomics and Health",
        "url": "https://ga4gh.org"
    },
    "workflow_type_versions": {
        "WDL": [
            "1.0"
        ],
        "NEXTFLOW": [
            "21.04.0"
        ]
    },
    "workflow_engine_versions": {
        "NA

2) Check if DRS endpoints are available

In [796]:
node_ips = ['ga4gh-starter-kit.ilifu.ac.za','154.114.10.54','154.114.10.62']
drs_port = "5000"
service_info_path = "/service-info"
http_method = "GET"

for node_ip in node_ips:
    ga4gh_base_url = "http://" + node_ip + ":{}/ga4gh/{}/v1"
    drs_base_url = ga4gh_base_url.format(drs_port,"drs")
    request_url = drs_base_url+service_info_path
    print_head("{} request to {}".format(http_method, request_url))
    # GET request to service-info endpoint
    drs_service_info_resp = requests.request(http_method, request_url)
    # print the response
    pretty_print_json(drs_service_info_resp)

[38;2;8;138;75mGET request to http://ga4gh-starter-kit.ilifu.ac.za:5000/ga4gh/drs/v1/service-info[0m
[38;2;8;75;138m{
    "id": "org.ga4gh.starterkit.drs",
    "name": "GA4GH Starter Kit DRS Service",
    "description": "An open source, community-driven implementation of the GA4GH Data Repository Service (DRS) API specification.",
    "contactUrl": "mailto:info@ga4gh.org",
    "documentationUrl": "https://github.com/ga4gh/ga4gh-starter-kit-drs",
    "createdAt": "2020-01-15T12:00:00Z",
    "updatedAt": "2020-01-15T12:00:00Z",
    "environment": "test",
    "version": "0.3.2",
    "type": {
        "group": "org.ga4gh",
        "artifact": "drs",
        "version": "1.3.0experimental"
    },
    "organization": {
        "name": "Global Alliance for Genomics and Health",
        "url": "https://ga4gh.org"
    }
}[0m
[38;2;8;138;75mGET request to http://154.114.10.54:5000/ga4gh/drs/v1/service-info[0m
[38;2;8;75;138m{
    "id": "org.ga4gh.starterkit.drs",
    "name": "GA4GH Starter

## Check if DRS objects exists on endpoint

1. Check ga4gh-starter-kit.ilifu.ac.za

In [797]:
http_method = "GET"
node_ip = 'ga4gh-starter-kit.ilifu.ac.za'
drs_port = "5000"
drs_ids = ['c542689dba1e54669335c8e25abe6207','5a436bec951fab59dd975bcd10f316f1']
object_path_get = "/objects/{}"
object_path_post = "/objects"
access_path = "/objects/{}/access/{}"

for drs_id in drs_ids:
    ga4gh_base_url = "http://" + node_ip + ":{}/ga4gh/{}/v1"
    drs_base_url = ga4gh_base_url.format(drs_port,"drs")
    request_url = drs_base_url+object_path_get.format(drs_id)
    print_head("{} request to {}".format(http_method, request_url))
    # GET request to /objects/{object_id} endpoint
    drs_object_response = requests.request(http_method, request_url)
    # print the response
    pretty_print_json(drs_object_response)

[38;2;8;138;75mGET request to http://ga4gh-starter-kit.ilifu.ac.za:5000/ga4gh/drs/v1/objects/c542689dba1e54669335c8e25abe6207[0m
[38;2;8;75;138m{
    "id": "c542689dba1e54669335c8e25abe6207",
    "description": "Patient: HG01857, Country: KHV, Region: EAS, Sex: female\n",
    "created_time": "2023-07-24T13:45:58Z",
    "mime_type": "application/cram",
    "name": "HG01857.final.chrX_15494566-15607236",
    "size": 500309,
    "updated_time": "2023-07-24T13:45:58Z",
    "checksums": [
        {
            "checksum": "70ef6da9822aecf071acda427a61e31cbbf8f25b",
            "type": "sha1"
        },
        {
            "checksum": "b9e9f9a1f85de2ee57ba05d3f75de865458c2645284e42deef20de38b9e6a37c",
            "type": "sha256"
        },
        {
            "checksum": "c542689dba1e54669335c8e25abe6207",
            "type": "md5"
        }
    ],
    "self_uri": "drs://ga4gh-starter-kit.ilifu.ac.za:5000/c542689dba1e54669335c8e25abe6207",
    "access_methods": [
        {
          

2. Check 154.114.10.54 (Uganda)

In [798]:
http_method = "GET"
node_ip = '154.114.10.54'
drs_port = "5000"
drs_ids = ['6fa43c7de04b60c1a73a42aa2efc977d','be145a60bc059c154475a2561af0df6b']
object_path_get = "/objects/{}"
object_path_post = "/objects"
access_path = "/objects/{}/access/{}"

for drs_id in drs_ids:
    ga4gh_base_url = "http://" + node_ip + ":{}/ga4gh/{}/v1"
    drs_base_url = ga4gh_base_url.format(drs_port,"drs")
    request_url = drs_base_url+object_path_get.format(drs_id)
    print_head("{} request to {}".format(http_method, request_url))
    # GET request to /objects/{object_id} endpoint
    drs_object_response = requests.request(http_method, request_url)
    # print the response
    pretty_print_json(drs_object_response)

[38;2;8;138;75mGET request to http://154.114.10.54:5000/ga4gh/drs/v1/objects/6fa43c7de04b60c1a73a42aa2efc977d[0m
[38;2;8;75;138m{
    "id": "6fa43c7de04b60c1a73a42aa2efc977d",
    "description": "Patient: HG01879, Country: ACB, Region: AFR, Sex: male\n",
    "created_time": "2023-07-26T13:08:05Z",
    "mime_type": "application/cram",
    "name": "HG01879.final.chrX_15494566-15607236",
    "size": 309197,
    "updated_time": "2023-07-26T13:08:05Z",
    "checksums": [
        {
            "checksum": "af30841a49bba8733cf1f070ff725dd9cdfc91f1",
            "type": "sha1"
        },
        {
            "checksum": "a61d4570f6d210220fd14b4f7744df46d6dce61df0da35782b528660def996f1",
            "type": "sha256"
        },
        {
            "checksum": "6fa43c7de04b60c1a73a42aa2efc977d",
            "type": "md5"
        }
    ],
    "self_uri": "drs://154.114.10.54:5000/6fa43c7de04b60c1a73a42aa2efc977d",
    "access_methods": [
        {
            "access_url": {
                

3. Check 154.114.10.62 (Mali)

In [799]:
http_method = "GET"
node_ip = '154.114.10.62'
drs_port = "5000"
drs_ids = ['a68c60133f942881983d0e15827bf88f','168d353c6f474ca72e35e9209f921a59']
object_path_get = "/objects/{}"
object_path_post = "/objects"
access_path = "/objects/{}/access/{}"

for drs_id in drs_ids:
    ga4gh_base_url = "http://" + node_ip + ":{}/ga4gh/{}/v1"
    drs_base_url = ga4gh_base_url.format(drs_port,"drs")
    request_url = drs_base_url+object_path_get.format(drs_id)
    print_head("{} request to {}".format(http_method, request_url))
    # GET request to /objects/{object_id} endpoint
    drs_object_response = requests.request(http_method, request_url)
    # print the response
    pretty_print_json(drs_object_response)


[38;2;8;138;75mGET request to http://154.114.10.62:5000/ga4gh/drs/v1/objects/a68c60133f942881983d0e15827bf88f[0m
[38;2;8;75;138m{
    "id": "a68c60133f942881983d0e15827bf88f",
    "description": "Patient: HG01880, Country: ACB, Region: AFR, Sex: female\n",
    "created_time": "2023-07-26T12:59:15Z",
    "mime_type": "application/cram",
    "name": "HG01880.final.chrX_15494566-15607236",
    "size": 449026,
    "updated_time": "2023-07-26T12:59:15Z",
    "checksums": [
        {
            "checksum": "5747e183e64f2fe4d87da568fd30d0159086d1fd",
            "type": "sha1"
        },
        {
            "checksum": "50284d24e63c3c859f1e3d46a8aa54869e82f6fb37099926f2968fe7e64c15d7",
            "type": "sha256"
        },
        {
            "checksum": "a68c60133f942881983d0e15827bf88f",
            "type": "md5"
        }
    ],
    "self_uri": "drs://154.114.10.62:5000/a68c60133f942881983d0e15827bf88f",
    "access_methods": [
        {
            "access_url": {
              

## Launch workflow on test DRS objects from South-Africa, Mali and Uganda (using WES on ga4gh-starter-kit.ilifu.ac.za, 154.114.10.54 or 154.114.10.62)

1. Launch workflow
- Change the node_ip to run on a different WES endpoint
- Change the infput_file to run on different DRS objects

In [800]:
#node_ip = 'ga4gh-starter-kit.ilifu.ac.za'
node_ip = '154.114.10.62'
#node_ip = '154.114.10.54'
wes_port = "6000"
service_info_path = "/service-info"
runs_path = "/runs"
http_method = "GET"
ga4gh_base_url = "http://" + node_ip + ":{}/ga4gh/{}/v1"
wes_base_url = ga4gh_base_url.format(wes_port,"wes")

http_method = "POST"
request_url = wes_base_url + runs_path

nextflow_workflow_url = "https://github.com/grbot/cram-qc"
#input_file = "drs://ga4gh-starter-kit.ilifu.ac.za:5000/c542689dba1e54669335c8e25abe6207 drs://ga4gh-starter-kit.ilifu.ac.za:5000/5a436bec951fab59dd975bcd10f316f1"
#input_file = "drs://154.114.10.62:5000/a68c60133f942881983d0e15827bf88f drs://154.114.10.62:5000/168d353c6f474ca72e35e9209f921a59"
input_file = "drs://ga4gh-starter-kit.ilifu.ac.za:5000/c542689dba1e54669335c8e25abe6207 drs://ga4gh-starter-kit.ilifu.ac.za:5000/5a436bec951fab59dd975bcd10f316f1 drs://154.114.10.54:5000/6fa43c7de04b60c1a73a42aa2efc977d drs://154.114.10.54:5000/be145a60bc059c154475a2561af0df6b drs://154.114.10.62:5000/a68c60133f942881983d0e15827bf88f drs://154.114.10.62:5000/168d353c6f474ca72e35e9209f921a59"


data = {
    'workflow_type': 'NEXTFLOW',
    'workflow_type_version': '21.04.0',
    'workflow_url': nextflow_workflow_url,
    'workflow_params': f'{{"input":"{input_file}"}}'
}

print_head("{} request to {}".format(http_method, request_url))

# Post a Nextflow workflow
wes_post_workflow_response = requests.request(http_method, request_url, data = data)

# print the response
pretty_print_json(wes_post_workflow_response)

current_run_id = wes_post_workflow_response.json()["run_id"]

print_head("run_id = {}".format(current_run_id))

[38;2;8;138;75mPOST request to http://154.114.10.62:6000/ga4gh/wes/v1/runs[0m
[38;2;8;75;138m{
    "run_id": "6ef2c369-3778-40e9-9918-a7fb678fd7aa"
}[0m
[38;2;8;138;75mrun_id = 6ef2c369-3778-40e9-9918-a7fb678fd7aa[0m


2. Check output

In [802]:
http_method = "GET"
request_url = wes_base_url + runs_path + "/" + current_run_id

print_head("{} request to {}".format(http_method, request_url))

# Get request to /runs/{run_id}
monitor_run_response = requests.request(http_method, request_url)

# print the response
pretty_print_json(monitor_run_response)

[38;2;8;138;75mGET request to http://154.114.10.62:6000/ga4gh/wes/v1/runs/6ef2c369-3778-40e9-9918-a7fb678fd7aa[0m
[38;2;8;75;138m{
    "run_id": "6ef2c369-3778-40e9-9918-a7fb678fd7aa",
    "request": {
        "workflow_params": {
            "input": "drs://ga4gh-starter-kit.ilifu.ac.za:5000/c542689dba1e54669335c8e25abe6207 drs://ga4gh-starter-kit.ilifu.ac.za:5000/5a436bec951fab59dd975bcd10f316f1 drs://154.114.10.54:5000/6fa43c7de04b60c1a73a42aa2efc977d drs://154.114.10.54:5000/be145a60bc059c154475a2561af0df6b drs://154.114.10.62:5000/a68c60133f942881983d0e15827bf88f drs://154.114.10.62:5000/168d353c6f474ca72e35e9209f921a59"
        },
        "workflow_type": "NEXTFLOW",
        "workflow_type_version": "21.04.0",
        "workflow_url": "https://github.com/grbot/cram-qc"
    },
    "state": "COMPLETE",
    "run_log": {
        "name": "grbot/cram-qc",
        "cmd": [
            "#!/bin/bash -ue",
            "samtools flagstat     -@ 1     HG01883.final.chrX_15494566-15607236.c

# Data Connect
1. Check service-info

In [825]:
import requests
import json

dc_port = "8089"
dc_base_url = "http://ga4gh-starter-kit.ilifu.ac.za:{}".format(dc_port)


service_info_path = "/service-info"
tables_path = "/tables"
table_info_path = "/table/{}/info"
table_data_path = "/table/{}/data"
search_path = "/search"

def pretty_print_json(response):
    print(json.dumps(response.json(), indent=4))

In [804]:
dc_service_info_resp = requests.request("GET", dc_base_url+service_info_path)
pretty_print_json(dc_service_info_resp)

{
    "id": "",
    "name": "GA4GH Discovery Search API",
    "description": "",
    "documentationUrl": "",
    "contactUrl": "",
    "version": ""
}


2. Check data table we will be querying

In [805]:
dc_service_info_resp = requests.request("GET", dc_base_url+'/table/trino.public.genome_ilifu/info')
pretty_print_json(dc_service_info_resp)

{
    "name": "trino.public.genome_ilifu",
    "description": "Automatically generated schema",
    "data_model": {
        "$id": "http://ga4gh-starter-kit.ilifu.ac.za:8089/table/trino.public.genome_ilifu/info",
        "description": "Automatically generated schema",
        "$schema": "http://json-schema.org/draft-07/schema#",
        "properties": {
            "sample_id": {
                "format": "varchar",
                "type": "string",
                "$comment": "varchar"
            },
            "population_id": {
                "format": "varchar",
                "type": "string",
                "$comment": "varchar"
            },
            "super_population_id": {
                "format": "varchar",
                "type": "string",
                "$comment": "varchar"
            },
            "sex": {
                "format": "varchar",
                "type": "string",
                "$comment": "varchar"
            },
            "cram_drs_id": {
   

### Select address and id functions

In [806]:
import re

def get_address(s):
    address = s.replace("drs://","")
    address = re.sub(':.*', '', address)
    return address
    
def get_drs_id(s):
    drs_id = re.sub('.*/', '', s)
    return drs_id

### Use case 1

![Use case 1](use_case_1.png)

1. Do query
Select CRAM DRS ids for all African samples. Limit search to 10 samples for now.

In [824]:
import requests, json
q2 = {
  "query": "select cram_drs_id from trino.public.genome_ilifu where super_population_id='AFR' limit 10",
  "parameters": []
}
r = requests.post("http://ga4gh-starter-kit.ilifu.ac.za:8089/search", json = q2)
print(json.dumps(r.json(), indent=3))
data = r.json()

{
   "data": [],
   "pagination": {
      "next_page_url": "http://ga4gh-starter-kit.ilifu.ac.za:8089/search/v1/statement/queued/20230731_095003_00003_69yj9/y16c2f4ad18cdc29060397427633e35d2ed7fa9ae/1?queryJobId=20230731_095003_00003_69yj9"
   }
}


2. Parse through pages to get results

In [813]:
next_page = data['pagination']['next_page_url']
# print (next_page)

# Poll till we get results
while not 'executing' in next_page:
    dc_service_info_resp = requests.request("GET", next_page)
    data = dc_service_info_resp.json()
    next_page = data['pagination']['next_page_url']
    # print (data)

dc_service_info_resp = requests.request("GET", next_page)
# pretty_print_json(dc_service_info_resp)

data = dc_service_info_resp.json()
next_page = data['pagination']['next_page_url']
dc_service_info_resp = requests.request("GET", next_page)
data = dc_service_info_resp.json()
# pretty_print_json(dc_service_info_resp)

drs_str = ""
drs_ids = []
for i in range(len(data['data'])):
    # print (data['data'][i]['cram_drs_id'])
    drs_ids.append(data['data'][i]['cram_drs_id'])
    drs_str = drs_str + data['data'][i]['cram_drs_id'] + " "

drs_str = drs_str[:-1]


3. Map DRS server to DRS objects

In [814]:
drs_servers = {}
for drs_id in drs_ids:
    address = get_address(drs_id)
    if address not in drs_servers:
        drs_servers[address] = {}
        drs_servers[address]['drs_ids'] = []
        drs_servers[address]['drs_ids'].append(drs_id)
        drs_servers[address]['total_file_size'] = 0
        drs_servers[address]['ingress'] = 0
        drs_servers[address]['run_id'] = 0
        drs_servers[address]['drs_ids_str'] = ""
        drs_servers[address]['outputs'] = {}
    else:
        drs_servers[address]['drs_ids'].append(drs_id)
print (drs_servers)

drs://154.114.10.54:5000/6fa43c7de04b60c1a73a42aa2efc977d drs://154.114.10.62:5000/a68c60133f942881983d0e15827bf88f drs://154.114.10.62:5000/45ca586b0921ffedf6a63679fbaacb68 drs://ga4gh-starter-kit.ilifu.ac.za:5000/5a436bec951fab59dd975bcd10f316f1 drs://154.114.10.54:5000/be145a60bc059c154475a2561af0df6b drs://154.114.10.62:5000/d36019bb63182abad672205a140f7e83 drs://154.114.10.62:5000/b809bb9b81a9583ec67e787b0449e9bd drs://154.114.10.62:5000/168d353c6f474ca72e35e9209f921a59 drs://154.114.10.54:5000/9a45659fe478e5bb39d1dd1b08bd1807 drs://154.114.10.54:5000/82adbcf7cc72c31a86e65d73bf6ef81b
{'154.114.10.54': {'drs_ids': ['drs://154.114.10.54:5000/6fa43c7de04b60c1a73a42aa2efc977d', 'drs://154.114.10.54:5000/be145a60bc059c154475a2561af0df6b', 'drs://154.114.10.54:5000/9a45659fe478e5bb39d1dd1b08bd1807', 'drs://154.114.10.54:5000/82adbcf7cc72c31a86e65d73bf6ef81b'], 'total_file_size': 0, 'ingress': 0, 'run_id': 0, 'drs_ids_str': '', 'outputs': {}}, '154.114.10.62': {'drs_ids': ['drs://154.114

4. Go through DRS servers, check file size of objects and calculate total file size

In [815]:
for drs_server in drs_servers:
    http_method = "GET"
    drs_port = "5000"
    object_path_get = "/objects/{}"
    ga4gh_base_url = "http://" + drs_server + ":{}/ga4gh/{}/v1"
    total_file_size = 0;
    for drs_id in drs_servers[drs_server]['drs_ids']:
        drs_base_url = ga4gh_base_url.format(drs_port,"drs")
        request_url = drs_base_url+object_path_get.format(get_drs_id(drs_id))
        # print_head("{} request to {}".format(http_method, request_url))
        drs_object_response = requests.request(http_method, request_url)
        data = drs_object_response.json()
        total_file_size = total_file_size + data['size']
        # pretty_print_json(drs_object_response)
    drs_servers[drs_server]['total_file_size'] = total_file_size

#print (drs_servers)        

5. Go through DRS servers, calculate ingress

In [816]:
for drs_server_1 in drs_servers:
    ingress = 0
    for drs_server_2 in drs_servers:
        if drs_server_1 != drs_server_2:
            drs_servers[drs_server_1]['ingress'] = drs_servers[drs_server_1]['ingress'] + drs_servers[drs_server_2]['total_file_size']
 

6. Get endpoint with largest file set in size (not optimising on this info anymore, now on least ingress 7.)

In [817]:
largest_file_set = 0
drs_server_selected = ""
for drs_server in drs_servers:
    if drs_servers[drs_server]['total_file_size'] > largest_file_set:
        largest_file_set = drs_servers[drs_server]['total_file_size']
        drs_server_selected = drs_server

7. Get endpoint with least ingress (least data needs to be transferred)

In [818]:
least_ingress = 0
drs_server_selected = ""
for drs_server in drs_servers:
    if least_ingress == 0:
        least_ingress = drs_servers[drs_server]['ingress']
        drs_server_selected = drs_server

    if (drs_servers[drs_server]['ingress'] < least_ingress):
        least_ingress = drs_servers[drs_server]['ingress']
        drs_server_selected = drs_server
        

8. Launch workflow with selected DRS objects on endpoint with least ingress (this is actually only working for now because all data is on the same path on all the nodes)

In [819]:
wes_port = "6000"
ga4gh_base_url = "http://" + drs_server_selected + ":{}/ga4gh/{}/v1"
wes_base_url = ga4gh_base_url.format(wes_port,"wes")

service_info_path = "/service-info"
runs_path = "/runs"

http_method = "POST"
request_url = wes_base_url + runs_path

nextflow_workflow_url = "https://github.com/grbot/cram-qc"
input_file = drs_str

#print (drs_str)

data = {
    'workflow_type': 'NEXTFLOW',
    'workflow_type_version': '21.04.0',
    'workflow_url': nextflow_workflow_url,
    'workflow_params': f'{{"input":"{input_file}"}}'
}

print_head("{} request to {}".format(http_method, request_url))

# Post a Nextflow workflow
wes_post_workflow_response = requests.request(http_method, request_url, data = data)

# print the response
pretty_print_json(wes_post_workflow_response)

current_run_id = wes_post_workflow_response.json()["run_id"]

print_head("run_id = {}".format(current_run_id))

[38;2;8;138;75mPOST request to http://154.114.10.62:6000/ga4gh/wes/v1/runs[0m
{
    "run_id": "87cf33fc-1cc1-4c17-aeba-f1fc2ebe0e13"
}
[38;2;8;138;75mrun_id = 87cf33fc-1cc1-4c17-aeba-f1fc2ebe0e13[0m


9. Get run ouputs

In [820]:
import time
# On first run might get the error not able to find ["state"]. This is due to underlying isssue below.
#{
#    "timestamp": "2023-07-27T17:10:08Z",
#    "status_code": 400,
#    "error": "Bad Request",
#    "msg": "Could not load WES run log"
#}
# Just rerun for now and then polling will start

http_method = "GET"
request_url = wes_base_url + runs_path + "/" + current_run_id

print_head("{} request to {}".format(http_method, request_url))

# Get request to /runs/{run_id}
monitor_run_response = requests.request(http_method, request_url)
time.sleep(15) # This delay resolves the issue mentioned above
monitor_run_response = requests.request(http_method, request_url)


# Poll until job is complete
while monitor_run_response.json()["state"]!="COMPLETE":
    print("Current job status: " + monitor_run_response.json()["state"])
    time.sleep(5)
    monitor_run_response = requests.request(http_method, request_url)

print("Job running status: " + monitor_run_response.json()["state"])
pretty_print_json(monitor_run_response)

[38;2;8;138;75mGET request to http://154.114.10.62:6000/ga4gh/wes/v1/runs/87cf33fc-1cc1-4c17-aeba-f1fc2ebe0e13[0m
Job running status: COMPLETE
{
    "run_id": "87cf33fc-1cc1-4c17-aeba-f1fc2ebe0e13",
    "request": {
        "workflow_params": {
            "input": "drs://154.114.10.54:5000/6fa43c7de04b60c1a73a42aa2efc977d drs://154.114.10.62:5000/a68c60133f942881983d0e15827bf88f drs://154.114.10.62:5000/45ca586b0921ffedf6a63679fbaacb68 drs://ga4gh-starter-kit.ilifu.ac.za:5000/5a436bec951fab59dd975bcd10f316f1 drs://154.114.10.54:5000/be145a60bc059c154475a2561af0df6b drs://154.114.10.62:5000/d36019bb63182abad672205a140f7e83 drs://154.114.10.62:5000/b809bb9b81a9583ec67e787b0449e9bd drs://154.114.10.62:5000/168d353c6f474ca72e35e9209f921a59 drs://154.114.10.54:5000/9a45659fe478e5bb39d1dd1b08bd1807 drs://154.114.10.54:5000/82adbcf7cc72c31a86e65d73bf6ef81b"
        },
        "workflow_type": "NEXTFLOW",
        "workflow_type_version": "21.04.0",
        "workflow_url": "https://github.co

10. Post `multiqc_report.html` to the DRS server

In [821]:
import importlib
import upload_to_drs
importlib.reload(upload_to_drs)

run_id = monitor_run_response.json()['run_id']
outputs = monitor_run_response.json()["outputs"]

for key in outputs:
    if 'multiqc_report.html' in key:
        print (outputs[key])
        file = outputs[key][7:]
        file_ext = file.split(".")[-1]
        meta_d = upload_to_drs.files_metadata_test(run_id, file, file_ext)
        upload_to_drs.add_file_to_server(meta_d, file_ext, drs_server_selected,'5001') #adds drs object
        drs_id = meta_d[0][3]

file:///opt/ga4gh-starter-kit-wes/wes_runs/87/cf/33/87cf33fc-1cc1-4c17-aeba-f1fc2ebe0e13/work/6a/8e4d50ba57c25ca2ebd749058dca89/multiqc_report.html
http://154.114.10.62:5001/admin/ga4gh/drs/v1/objects


11. Retrieve the results

In [822]:
import urllib.request

drs_port = 5000

object_path_get = "/objects/{}"
http_method = "GET"
ga4gh_base_url = "http://" + drs_server_selected + ":{}/ga4gh/{}/v1"
drs_base_url = ga4gh_base_url.format(drs_port,"drs")
request_url = drs_base_url + object_path_get.format(drs_id)
#print_head("{} request to {}".format(http_method, request_url))
drs_object_response = requests.request(http_method, request_url)
#pretty_print_json(drs_object_response)
data = drs_object_response.json()
access_url = request_url + "/access/" + (data['access_methods'][1]['access_id'])
#print(access_path)
drs_object_response = requests.request(http_method, access_url)
download_url = drs_object_response.json()["url"]
print(download_url)
urllib.request.urlretrieve(download_url, "multiqc_report.html")

http://154.114.10.62:5000/ga4gh/drs/v1/stream/51f94f95c2e3b85bea590f3076f92bfc/cc23ef5f-b9e5-426f-9827-b8b30a2dd037


('multiqc_report.html', <http.client.HTTPMessage at 0x7fdd1aab2b60>)

### Use case 2 
### Now we initiate a run in a gather/scatter/federated manner
#### - Runs are initiated on individual nodes (calculate flagstat)
#### - Output is gathered and MultiQC are run on the flagstat results on one WES endpoint

![Use case 1](use_case_2.png)

1) Launch flagstat runs. Use the dictionary structure generated previously. Launch workflow at WES endpoint on DRS objects only from that endpoint.

In [823]:
# Populate with drs_ids_str
for drs_server in drs_servers:
    print ("Launching jobs on server: " + drs_server)
    drs_ids = drs_servers[drs_server]['drs_ids']
    drs_ids_str = ""
    for drs_id in drs_ids:
        drs_ids_str = drs_ids_str + drs_id + " "
    drs_ids_str = drs_ids_str[:-1]
    print (drs_ids_str)
    drs_servers[drs_server]['drs_ids_str'] = drs_ids_str

# Launch workflow
for drs_server in drs_servers:
    wes_port = "6000"
    ga4gh_base_url = "http://" + drs_server + ":{}/ga4gh/{}/v1"
    wes_base_url = ga4gh_base_url.format(wes_port,"wes")

    runs_path = "/runs"

    http_method = "POST"
    request_url = wes_base_url + runs_path

    nextflow_workflow_url = "https://github.com/grbot/flagstat"

    input_file = drs_servers[drs_server]['drs_ids_str']
    
    data = {
        'workflow_type': 'NEXTFLOW',
        'workflow_type_version': '21.04.0',
        'workflow_url': nextflow_workflow_url,
        'workflow_params': f'{{"input":"{input_file}"}}'
    }

    print_head("{} request to {}".format(http_method, request_url))

    # Post a Nextflow workflow
    wes_post_workflow_response = requests.request(http_method, request_url, data = data)

    # print the response
    pretty_print_json(wes_post_workflow_response)

    current_run_id = wes_post_workflow_response.json()["run_id"]

    print_head("run_id = {}".format(current_run_id))

    ## We don't launch in parallel for now. Just poll a submitted job and retrieve the results
    http_method = "GET"
    request_url = wes_base_url + runs_path + "/" + current_run_id

    print_head("{} request to {}".format(http_method, request_url))

    # Get request to /runs/{run_id}
    monitor_run_response = requests.request(http_method, request_url) 
    time.sleep(15)
    monitor_run_response = requests.request(http_method, request_url)
    
    # Poll until job is complete

    print(monitor_run_response.json())
    while monitor_run_response.json()["state"] != "COMPLETE":
        print("Current job status: " + monitor_run_response.json()["state"])
        time.sleep(5)
        monitor_run_response = requests.request(http_method, request_url)

    print("Job running status: " + monitor_run_response.json()["state"])
    pretty_print_json(monitor_run_response)

    drs_servers[drs_server]['run_id'] = current_run_id
    drs_servers[drs_server]['outputs'] = monitor_run_response.json()["outputs"]


Launching jobs on server: 154.114.10.54
drs://154.114.10.54:5000/6fa43c7de04b60c1a73a42aa2efc977d drs://154.114.10.54:5000/be145a60bc059c154475a2561af0df6b drs://154.114.10.54:5000/9a45659fe478e5bb39d1dd1b08bd1807 drs://154.114.10.54:5000/82adbcf7cc72c31a86e65d73bf6ef81b
Launching jobs on server: 154.114.10.62
drs://154.114.10.62:5000/a68c60133f942881983d0e15827bf88f drs://154.114.10.62:5000/45ca586b0921ffedf6a63679fbaacb68 drs://154.114.10.62:5000/d36019bb63182abad672205a140f7e83 drs://154.114.10.62:5000/b809bb9b81a9583ec67e787b0449e9bd drs://154.114.10.62:5000/168d353c6f474ca72e35e9209f921a59
Launching jobs on server: ga4gh-starter-kit.ilifu.ac.za
drs://ga4gh-starter-kit.ilifu.ac.za:5000/5a436bec951fab59dd975bcd10f316f1
[38;2;8;138;75mPOST request to http://154.114.10.54:6000/ga4gh/wes/v1/runs[0m
{
    "run_id": "a776be25-d14f-40ff-8ce7-dfa6305d688f"
}
[38;2;8;138;75mrun_id = a776be25-d14f-40ff-8ce7-dfa6305d688f[0m
[38;2;8;138;75mGET request to http://154.114.10.54:6000/ga4gh/we

2. a) Upload results to individual DRS servers and get a list of DRS objects (this will not work need to run 2 b) and explained there)

In [828]:
importlib.reload(upload_to_drs)

drs_ids_str = ""

for drs_server in drs_servers:

    run_id = drs_servers[drs_server]['run_id']
    outputs = drs_servers[drs_server]["outputs"]

    for key in outputs:
        if '.flagstat' in key:
            file = outputs[key][7:]
            file_ext = file.split(".")[-1]
            print (file)
            meta_d = upload_to_drs.files_metadata_test(run_id, file, file_ext)
            upload_to_drs.add_file_to_server(meta_d, file_ext, drs_server,'5001') #adds drs object
            drs_id = "drs://" + drs_server + ":5000/" + meta_d[0][3]
            drs_ids_str = drs_ids_str + " "  + drs_id
            
drs_ids_str = drs_ids_str[:-1]

    

/opt/ga4gh-starter-kit-wes/wes_runs/a7/76/be/a776be25-d14f-40ff-8ce7-dfa6305d688f/work/b0/8497e623f7cc1a60371f44e9bba870/HG01896.final.chrX_15494566-15607236.cram.flagstat
http://154.114.10.54:5001/admin/ga4gh/drs/v1/objects
/opt/ga4gh-starter-kit-wes/wes_runs/a7/76/be/a776be25-d14f-40ff-8ce7-dfa6305d688f/work/ae/7ee1c4951c7040d34324a6e418f75a/HG01885.final.chrX_15494566-15607236.cram.flagstat
http://154.114.10.54:5001/admin/ga4gh/drs/v1/objects
/opt/ga4gh-starter-kit-wes/wes_runs/a7/76/be/a776be25-d14f-40ff-8ce7-dfa6305d688f/work/d9/5ecd91c25bbf4a70a511d702190163/HG01894.final.chrX_15494566-15607236.cram.flagstat
http://154.114.10.54:5001/admin/ga4gh/drs/v1/objects
/opt/ga4gh-starter-kit-wes/wes_runs/a7/76/be/a776be25-d14f-40ff-8ce7-dfa6305d688f/work/e4/7dcded2c1c857a54a70e9d8e41c7cc/HG01879.final.chrX_15494566-15607236.cram.flagstat
http://154.114.10.54:5001/admin/ga4gh/drs/v1/objects
/opt/ga4gh-starter-kit-wes/wes_runs/3e/30/0c/3e300cd5-e94f-4150-88c4-3bba8e342d12/work/4d/a514314132

2. b) Upload results to individual DRS servers and get a list of DRS access URLs
   

In [829]:
importlib.reload(upload_to_drs)

drs_urls_str = ""

for drs_server in drs_servers:

    run_id = drs_servers[drs_server]['run_id']
    outputs = drs_servers[drs_server]["outputs"]

    for key in outputs:
        if '.flagstat' in key:
            print (key)
            file = outputs[key][7:]
            file_ext = file.split(".")[-1]
            meta_d = upload_to_drs.files_metadata_test(run_id, file, file_ext)
            upload_to_drs.add_file_to_server(meta_d, file_ext, drs_server,'5001') #adds drs object
            drs_id = meta_d[0][3]
            drs_port = 5000
            object_path_get = "/objects/{}"
            http_method = "GET"
            ga4gh_base_url = "http://" + drs_server + ":{}/ga4gh/{}/v1"
            drs_base_url = ga4gh_base_url.format(drs_port,"drs")
            request_url = drs_base_url + object_path_get.format(drs_id)
            #print_head("{} request to {}".format(http_method, request_url))
            drs_object_response = requests.request(http_method, request_url)
            #pretty_print_json(drs_object_response)
            data = drs_object_response.json()
            # We cannot use DRS objects here and need to directly stream. DRS objects only resolve to local path
            # and if path is not on server their would be a failure. The disadvantage of using stream is that you loose
            # the file naming.
            access_url = request_url + "/access/" + (data['access_methods'][1]['access_id'])
            drs_object_response = requests.request(http_method, access_url)
            drs_url = drs_object_response.json()["url"]
            drs_urls_str = drs_urls_str + drs_url + " "
    
drs_urls_str = drs_urls_str[:-1]


HG01896.final.chrX_15494566-15607236.cram.flagstat
http://154.114.10.54:5001/admin/ga4gh/drs/v1/objects
HG01885.final.chrX_15494566-15607236.cram.flagstat
http://154.114.10.54:5001/admin/ga4gh/drs/v1/objects
HG01894.final.chrX_15494566-15607236.cram.flagstat
http://154.114.10.54:5001/admin/ga4gh/drs/v1/objects
HG01879.final.chrX_15494566-15607236.cram.flagstat
http://154.114.10.54:5001/admin/ga4gh/drs/v1/objects
HG01880.final.chrX_15494566-15607236.cram.flagstat
http://154.114.10.62:5001/admin/ga4gh/drs/v1/objects
HG01890.final.chrX_15494566-15607236.cram.flagstat
http://154.114.10.62:5001/admin/ga4gh/drs/v1/objects
HG01882.final.chrX_15494566-15607236.cram.flagstat
http://154.114.10.62:5001/admin/ga4gh/drs/v1/objects
HG01889.final.chrX_15494566-15607236.cram.flagstat
http://154.114.10.62:5001/admin/ga4gh/drs/v1/objects
HG01886.final.chrX_15494566-15607236.cram.flagstat
http://154.114.10.62:5001/admin/ga4gh/drs/v1/objects
HG01883.final.chrX_15494566-15607236.cram.flagstat
http://ga4gh-

3. Launch the workflow that will combine the results

In [830]:
wes_port = "6000"
drs_server_central = "ga4gh-starter-kit.ilifu.ac.za"
ga4gh_base_url = "http://" + drs_server_central + ":{}/ga4gh/{}/v1"
wes_base_url = ga4gh_base_url.format(wes_port,"wes")

service_info_path = "/service-info"
runs_path = "/runs"

http_method = "POST"
request_url = wes_base_url + runs_path

nextflow_workflow_url = "https://github.com/grbot/multiqc"
input_file = drs_urls_str

#print (drs_str)

data = {
    'workflow_type': 'NEXTFLOW',
    'workflow_type_version': '21.04.0',
    'workflow_url': nextflow_workflow_url,
    'workflow_params': f'{{"input":"{input_file}"}}'
}

print_head("{} request to {}".format(http_method, request_url))

# Post a Nextflow workflow
wes_post_workflow_response = requests.request(http_method, request_url, data = data)

# print the response
pretty_print_json(wes_post_workflow_response)

current_run_id = wes_post_workflow_response.json()["run_id"]

print_head("run_id = {}".format(current_run_id))

[38;2;8;138;75mPOST request to http://ga4gh-starter-kit.ilifu.ac.za:6000/ga4gh/wes/v1/runs[0m
{
    "run_id": "c3a499fe-ecfa-4340-8ebe-d126fd15bab8"
}
[38;2;8;138;75mrun_id = c3a499fe-ecfa-4340-8ebe-d126fd15bab8[0m


4. Poll for results

In [831]:
import time
# On first run might get the error not able to find ["state"]. This is due to underlying isssue below.
#{
#    "timestamp": "2023-07-27T17:10:08Z",
#    "status_code": 400,
#    "error": "Bad Request",
#    "msg": "Could not load WES run log"
#}
# Just rerun for now and then polling will start

http_method = "GET"
request_url = wes_base_url + runs_path + "/" + current_run_id

print_head("{} request to {}".format(http_method, request_url))

# Get request to /runs/{run_id}
monitor_run_response = requests.request(http_method, request_url)
time.sleep(15) # This delay resolves the issue mentioned above
monitor_run_response = requests.request(http_method, request_url)

print(monitor_run_response)

# Poll until job is complete
while monitor_run_response.json()["state"]!="COMPLETE":
    print("Current job status: " + monitor_run_response.json()["state"])
    time.sleep(5)
    monitor_run_response = requests.request(http_method, request_url)

print("Job running status: " + monitor_run_response.json()["state"])
pretty_print_json(monitor_run_response)

[38;2;8;138;75mGET request to http://ga4gh-starter-kit.ilifu.ac.za:6000/ga4gh/wes/v1/runs/c3a499fe-ecfa-4340-8ebe-d126fd15bab8[0m
<Response [200]>
Job running status: COMPLETE
{
    "run_id": "c3a499fe-ecfa-4340-8ebe-d126fd15bab8",
    "request": {
        "workflow_params": {
            "input": "http://154.114.10.54:5000/ga4gh/drs/v1/stream/aa9db3115e60350c8c295ebeedd75898/36ea9e85-59fe-4fcf-b50b-e4246b573244 http://154.114.10.54:5000/ga4gh/drs/v1/stream/7a2d378b7a0f8ac3b68938471c074d49/6bdf4071-76fd-49ae-afa3-529a2269229a http://154.114.10.54:5000/ga4gh/drs/v1/stream/e9e6bc591311b1ccbfb3ed96ada7fda9/f09e543f-15cc-4970-8a37-2f695e13ed0e http://154.114.10.54:5000/ga4gh/drs/v1/stream/3e4d5866f570dd3d78a2b86c3e83169a/b1f49292-4312-48f7-b75d-7ac06e3c66c1 http://154.114.10.62:5000/ga4gh/drs/v1/stream/eb7f2b0fea02a4d7a9f9f0e357d44740/25abb585-f827-413a-b3f0-bba36a331290 http://154.114.10.62:5000/ga4gh/drs/v1/stream/be343b6986696166bceb5f291fb23ec4/30e05669-b077-4f06-b8d2-3034202e16a1 ht

5. Upload `multiqc_report.html` to central DRS server

In [832]:
import importlib
import upload_to_drs
importlib.reload(upload_to_drs)

run_id = monitor_run_response.json()['run_id']
outputs = monitor_run_response.json()["outputs"]

for key in outputs:
    if 'multiqc_report.html' in key:
        print (outputs[key])
        file = outputs[key][7:]
        file_ext = file.split(".")[-1]
        meta_d = upload_to_drs.files_metadata_test(run_id, file, file_ext)
        upload_to_drs.add_file_to_server(meta_d, file_ext, drs_server_central,'5001') #adds drs object
        drs_id = meta_d[0][3]

file:///opt/ga4gh-starter-kit-wes/wes_runs/c3/a4/99/c3a499fe-ecfa-4340-8ebe-d126fd15bab8/work/ef/43e148faa635d4562d9a2f839b8750/multiqc_report.html
http://ga4gh-starter-kit.ilifu.ac.za:5001/admin/ga4gh/drs/v1/objects


6. Download `multiqc_report.html`

In [833]:
import urllib.request

drs_port = 5000

object_path_get = "/objects/{}"
http_method = "GET"
ga4gh_base_url = "http://" + drs_server_central + ":{}/ga4gh/{}/v1"
drs_base_url = ga4gh_base_url.format(drs_port,"drs")
request_url = drs_base_url + object_path_get.format(drs_id)
#print_head("{} request to {}".format(http_method, request_url))
drs_object_response = requests.request(http_method, request_url)
#pretty_print_json(drs_object_response)
data = drs_object_response.json()
access_url = request_url + "/access/" + (data['access_methods'][1]['access_id'])
#print(access_path)
drs_object_response = requests.request(http_method, access_url)
download_url = drs_object_response.json()["url"]
print(download_url)
urllib.request.urlretrieve(download_url, "multiqc_report_2.html")

http://ga4gh-starter-kit.ilifu.ac.za:5000/ga4gh/drs/v1/stream/d7dbba132b1a721a8caec07aa768f630/9cfe1861-497d-4984-8453-cd380a9a2d65


('multiqc_report_2.html', <http.client.HTTPMessage at 0x7fdd1aab33d0>)