Data connect setup

In [13]:
import requests
import json

dc_port = "8089"
dc_host = "255.255.55.55"
dc_base_url = "http://{}:{}".format(dc_host,dc_port)

wes_run = "/ga4gh/wes/v1/runs"

wes_1 = "http://255.255.55.55:8080"
drs_1 = "255.255.55.55:8081"

wes_2 = "http://255.255.55.55:8080/ga4gh/wes/v1/runs"
drs_2 = "255.255.55.55:8081"

service_info_path = "/service-info"
tables_path = "/tables"
table_info_path = "/table/{}/info"
table_data_path = "/table/{}/data"
search_path = "/search"

def pretty_print_json(response):
    print(json.dumps(response.json(), indent=4))

Confirm DC structure

In [14]:
dc_list_tables_resp = requests.request("GET", dc_base_url+tables_path)
pretty_print_json(dc_list_tables_resp)

{
    "tables": [
        {
            "name": "trino.ontology.axiom",
            "data_model": {
                "$ref": "http://154.114.10.13:8089/table/trino.ontology.axiom/info"
            }
        },
        {
            "name": "trino.ontology.temporary_axiom",
            "data_model": {
                "$ref": "http://154.114.10.13:8089/table/trino.ontology.temporary_axiom/info"
            }
        },
        {
            "name": "trino.public.databasechangelog",
            "data_model": {
                "$ref": "http://154.114.10.13:8089/table/trino.public.databasechangelog/info"
            }
        },
        {
            "name": "trino.public.databasechangeloglock",
            "data_model": {
                "$ref": "http://154.114.10.13:8089/table/trino.public.databasechangeloglock/info"
            }
        },
        {
            "name": "trino.public.fact",
            "data_model": {
                "$ref": "http://154.114.10.13:8089/table/trino.public.f

Query space

In [20]:
header = {"content-type":"application/json"}

#allows for custom search queries for use with more populous data-connect
query = input("Submit your sql query:")
if query:
    request_body = {
      "query": query
    }
else:
    #default search query just pulls file DRIs from the default table
    request_body = {
  "query": "SELECT url FROM trino.public.links"
    }
dc_search_resp = requests.post(dc_base_url+search_path, json = request_body, headers = header)
pretty_print_json(dc_search_resp)

Submit your sql query:
{
    "data": [],
    "pagination": {
        "next_page_url": "http://154.114.10.13:8089/search/v1/statement/queued/20230714_120512_00048_72yy3/y2b7d37e48764bed63df2ecb5512d428add4e367c/1?queryJobId=20230714_120512_00048_72yy3"
    }
}


Run the below if the data field is empty, data-connect is still querying the trino (may need to be rerun multiple times)

In [29]:
#convert the response to a json and cast it to python readable dictionary format.
dc_search_resp=json.dumps(dc_search_resp.json())
dc_search_resp=json.loads(dc_search_resp)
#data-connect will provide next page urls to allow for processing delays. we call the next page to check if the data has loaded yet
dc_search_resp = requests.get(dc_search_resp["pagination"]["next_page_url"], json = request_body, headers = header)
pretty_print_json(dc_search_resp)

AttributeError: 'str' object has no attribute 'json'

Setup for connecting to the WES service

In [25]:
#dc_search_resp must be formatted into a python dictionary to make it iterable more smoothly.
#until the data connect stream is returning data, this code block will error code because it's trying to jsonify an http response

dc_search_resp=json.dumps(dc_search_resp.json())
samples=json.loads(dc_search_resp)

#Below will query dataconnect for a table relating WES and DRS servers
#dc_search_resp = requests.post(dc_base_url+search_path, json = {"query":"SELECT * FROM WES_DRS_reference"}, headers = {"content-type":"application/json"})
#dc_search_resp=json.dumps(dc_search_resp.json())
#dc_reference=json.loads(dc_search_resp)
dc_reference={drs_1:wes_1+wes_run,
              drs_2:wes_2+wes_run}


def pretty_print_json(response):
    # pretty print JSON in blue color
    print("\033[38;2;8;75;138m"+json.dumps(response.json(), indent=4)+"\033[0m")
    
def print_head(text):
    # print in green color
    print("\033[38;2;8;138;75m"+text+"\033[0m")

#Switch logic for mapping DRS url to respective WES server. For the base case these are static enough to be scripted in.
#A dataconnect search request could provide a dictionary mapping for DRS uri to WES server should the code be deployed
def optimize(url):
    #extract the end of the port number
    x=url[6:].index("/")+6
    #extract host ip + port 
    drs=url[6:x]
    #using a switch here for ease of expansion. 
    match drs:
            case drs_1:
                #returning the same host with a wes port though future implementation may return
                #different host as well
                return wes_1+wes_run
            case drs_2:
                return wes_2+wes_run
            case default:
                return "http://localhost:6000"+wes_run
            
#rough data connect version of optimiser logic
def optimize_dict(url):
    #extract the end of the port number
    x=url[6:].index("/")+6
    #extract host ip + port 
    drs=url[6:x]
    #compare the given url to the reference ones in DC
    try:
        return dc_reference[drs]
    except:    
        return "http://localhost:6000"+wes_run

Submit workflow to WES

In [26]:
http_method = "POST"

#replace this with a relevant workflow, for demo purposes we are reusing the starterkit workflow by Yash Puligundla of GA4GH
nextflow_workflow_url = "https://github.com/yash-puligundla/samtools-head-nf"
#array to store run ID's
response_aggregate = []

#loop through all matching samples
for entry in samples["data"]:
    #implement error check for duplicate params to prevent 409 on s3 storage
    input_file = entry["url"]
    #for user to verify input file url
    print(input_file)
    #optimize reads the drs URI and replies with the appropriate WES url
    request_url=optimize(input_file)
    #specify workflow type, location, and input
    data = {
        'workflow_type': 'NEXTFLOW',
        'workflow_type_version': '21.04.0',
        'workflow_url': nextflow_workflow_url,
        'workflow_params': f'{{"input":"{input_file}"}}'
    }

    print_head("{} request sent to {}".format(http_method, request_url))
    # Post a Nextflow workflow
    wes_post_workflow_response = requests.request(http_method, request_url, data = data)
    # print the run id
    pretty_print_json(wes_post_workflow_response)
    try:
        #save run id to array
        response_aggregate.append(request_url+"/"+wes_post_workflow_response.json()["run_id"])
    except:
        print("run failed to start"+input_file)

drs://154.114.10.100:5000/063e092e-b9e2-496f-ac5a-27dc3a1764e5
[38;2;8;138;75mPOST request sent to http://154.114.10.100:6000/ga4gh/wes/v1/runs[0m
[38;2;8;75;138m{
    "run_id": "468ba909-c36f-4417-8d70-d4f6cbb0e5be"
}[0m
drs://154.114.10.176:5000/8e18bfb64168994489bc9e7fda0acd4f
[38;2;8;138;75mPOST request sent to http://154.114.10.176:6000/ga4gh/wes/v1/runs[0m
[38;2;8;75;138m{
    "run_id": "2c7ae809-00a5-46f1-a045-102a7192cfd9"
}[0m


Print status of runs

In [28]:
http_method = "GET"

#loop through jobs to check status
for run in response_aggregate:
    print_head("{} request to {}".format(http_method, run))
# Get request to /runs/{run_id}
    monitor_run_response = requests.request(http_method, run)
# print the response
    pretty_print_json(monitor_run_response)
    
#we expect the first run to fail due to a mismatch of input filetype for the workflow

[38;2;8;138;75mGET request to http://154.114.10.100:6000/ga4gh/wes/v1/runs/468ba909-c36f-4417-8d70-d4f6cbb0e5be[0m
[38;2;8;75;138m{
    "run_id": "468ba909-c36f-4417-8d70-d4f6cbb0e5be",
    "request": {
        "workflow_params": {
            "input": "drs://154.114.10.100:5000/063e092e-b9e2-496f-ac5a-27dc3a1764e5"
        },
        "workflow_type": "NEXTFLOW",
        "workflow_type_version": "21.04.0",
        "workflow_url": "https://github.com/yash-puligundla/samtools-head-nf"
    },
    "state": "EXECUTOR_ERROR",
    "run_log": {
        "name": "yash-puligundla/samtools-head-nf",
        "cmd": [
            "#!/bin/bash -ue",
            "samtools head https://s3.us-east-2.amazonaws.com/ga4gh-ismb-tutorial-2022/data/ref/GRCh38_full_analysis_set_plus_decoy_hla.fa.fai > output_samtools_head.txt"
        ],
        "start_time": "2023-07-14T12:05:47Z",
        "end_time": "2023-07-14T12:05:54Z",
        "stdout": "http://localhost:6000/ga4gh/wes/v1/logs/nextflow/stdout/468ba90