 ### Combining GA4GH standards to perform an end-to-end workflow
 
#### Learning Objectives
Combine Data Connect, WES and DRS services  

What will participants do as part of the exercise?

 - Search for files with Data Connect
 - Obtain links to access files
 - Submit the files to a WES workflow
 - Retrieve the results of the analysis
 
 
 #### Icons in this Guide

 🖐 A hands-on section where you will code something or interact with the server
 
 #### 1. Run a cell in a Jupyter notebook
 
 ## Obtain Thousand Genomes files from SRA DRS and submit to Seven Bridges WES

🖐 Set up your project name

In [69]:
SB_PROJECT = 'forei/ismb-tutorial'
SB_API_KEY_PATH = '~/.keys/sbcgc_key.json'
DOWNLOAD_LOCATION = '~/Downloads'

In [82]:
from fasp.search import DataConnectClient

# Step 1 - Discovery
# query for relevant DRS objects
searchClient = DataConnectClient('https://ga4gh-search-adapter-presto-public.prod.dnastack.com/',debug=True)

query = '''SELECT f.sample_name, drs_id bam_drs_id, acc
FROM thousand_genomes.onek_genomes.ssd_drs s 
join thousand_genomes.onek_genomes.sra_drs_files f on f.sample_name = s.su_submitter_id 
where filetype = 'bam' and mapped = 'mapped' 
and sequencing_type ='exome' and  population = 'PUR' LIMIT 3'''

df = searchClient.run_query(query, returnType='dataframe')
df

Query: {"query":"SELECT f.sample_name, drs_id bam_drs_id, acc FROM thousand_genomes.onek_genomes.ssd_drs s  join thousand_genomes.onek_genomes.sra_drs_files f on f.sample_name = s.su_submitter_id  where filetype = 'bam' and mapped = 'mapped'  and sequencing_type ='exome' and  population = 'PUR' LIMIT 3", "parameters":[]}
Retrieving the query
____Page1_______________
b'{"data":[],"pagination":{"next_page_url":"https://ga4gh-search-adapter-presto-public.prod.dnastack.com/search/v1/statement/queued/20220630_012803_00003_qrrje/ya9ce1b4499de4b498a7e7698c5a7d09ff29d2230/1?queryJobId=20220630_012803_00003_qrrje"}}'
{
   "data": [],
   "pagination": {
      "next_page_url": "https://ga4gh-search-adapter-presto-public.prod.dnastack.com/search/v1/statement/queued/20220630_012803_00003_qrrje/ya9ce1b4499de4b498a7e7698c5a7d09ff29d2230/1?queryJobId=20220630_012803_00003_qrrje"
   }
}
____Page2_______________
b'{"data":[],"pagination":{"next_page_url":"https://ga4gh-search-adapter-presto-public.prod.

Unnamed: 0,sample_name,bam_drs_id,acc
0,HG00731,515ae091f29ac699a4d2e272812cea47,SRR1606560
1,HG00637,475dfc02f643c368036df6816d05afe4,SRR1596919
2,HG00640,58e2964f2a0adbf41ab0e8c7a95e7d0c,SRR1596923


In [83]:
json_result = searchClient.run_query(query, returnType='json')
json_result

Query: {"query":"SELECT f.sample_name, drs_id bam_drs_id, acc FROM thousand_genomes.onek_genomes.ssd_drs s  join thousand_genomes.onek_genomes.sra_drs_files f on f.sample_name = s.su_submitter_id  where filetype = 'bam' and mapped = 'mapped'  and sequencing_type ='exome' and  population = 'PUR' LIMIT 3", "parameters":[]}
Retrieving the query
____Page1_______________
b'{"data":[],"pagination":{"next_page_url":"https://ga4gh-search-adapter-presto-public.prod.dnastack.com/search/v1/statement/queued/20220630_013046_00004_qrrje/yc419b2e309e07bfcbe020a8b84c48b774ce08236/1?queryJobId=20220630_013046_00004_qrrje"}}'
{
   "data": [],
   "pagination": {
      "next_page_url": "https://ga4gh-search-adapter-presto-public.prod.dnastack.com/search/v1/statement/queued/20220630_013046_00004_qrrje/yc419b2e309e07bfcbe020a8b84c48b774ce08236/1?queryJobId=20220630_013046_00004_qrrje"
   }
}
____Page2_______________
b'{"data":[],"pagination":{"next_page_url":"https://ga4gh-search-adapter-presto-public.prod.

[{'sample_name': 'HG00731',
  'bam_drs_id': '515ae091f29ac699a4d2e272812cea47',
  'acc': 'SRR1606560'},
 {'sample_name': 'HG00637',
  'bam_drs_id': '475dfc02f643c368036df6816d05afe4',
  'acc': 'SRR1596919'},
 {'sample_name': 'HG00640',
  'bam_drs_id': '58e2964f2a0adbf41ab0e8c7a95e7d0c',
  'acc': 'SRR1596923'}]

### Use DRS to obtain file details

The method of calling the Data Connect client above returns a dataframe. This is convenient for many purposes, including listing the results as above. The default return type from the runQuery gives a list of lists.

In [71]:
results = searchClient.run_query(query)
results

Retrieving the query
____Page1_______________
____Page2_______________
____Page3_______________
____Page4_______________
____Page5_______________
____Page6_______________
____Page7_______________


[['HG00731', '515ae091f29ac699a4d2e272812cea47', 'SRR1606560'],
 ['HG00637', '475dfc02f643c368036df6816d05afe4', 'SRR1596919'],
 ['HG00640', '58e2964f2a0adbf41ab0e8c7a95e7d0c', 'SRR1596923']]

The following shows how the SRA DRS server can be used to determine where the files can be obtained from. The following shows this for the first DRS id from the query results. 

In [84]:
from fasp.loc import DRSClient

drsClient = DRSClient('https://locate.be-md.ncbi.nlm.nih.gov', public=True, debug=False)
test_id = json_result[0]['bam_drs_id']
print(test_id)
objInfo = drsClient.get_object(test_id)
objInfo

515ae091f29ac699a4d2e272812cea47


{'access_methods': [{'access_id': '8cc282a3e09887491fa5aa7ff1c209b1a4b9bf1cc55dd9767075e625968f364a',
   'region': 'gs.US',
   'type': 'https'},
  {'access_id': '2c8e9f0f20117987660e677c0d3556c198ec109447b378dfcc6f8639b6a0b5e2',
   'type': 'https'},
  {'access_id': 'fa57eb71b1f001a479f2462a0ca9fc9f35f64e544150086e4a55fc86d8eeaed3',
   'region': 's3.us-east-1',
   'type': 'https'}],
 'checksums': [{'checksum': '515ae091f29ac699a4d2e272812cea47',
   'type': 'md5'}],
 'created_time': '2013-05-08T10:25:13Z',
 'id': '515ae091f29ac699a4d2e272812cea47',
 'name': 'HG00731.mapped.ILLUMINA.bwa.PUR.exome.20130422.bam',
 'self_url': 'drs://locate.be-md.ncbi.nlm.nih.gov/515ae091f29ac699a4d2e272812cea47',
 'size': 32108614682}

A second DRS call can be used to obtain a url to access the file from one of the above locations.

In [5]:
access_id = objInfo['access_methods'][0]['access_id']
print('access_id:{}'.format(access_id))
url = drsClient.get_access_url(test_id, access_id=access_id)
print('url:{}'.format(url))

access_id:1e4846c05c81a49f684e7f940ffbd3a98e5f0e335f019ee4d32d85c72096b743
url:https://storage.googleapis.com/genomics-public-data/ftp-trace.ncbi.nih.gov/1000genomes/ftp/phase3/data/NA18948/exome_alignment/NA18948.mapped.ILLUMINA.bwa.JPT.exome.20121211.bam


### Set up a WES client

In [74]:
from fasp.workflow import sbcgcWESClient
wesClient = sbcgcWESClient(SB_PROJECT, api_key_path=SB_API_KEY_PATH)

#### Define a function to submit the workflow

In [80]:
import json
import requests

def runWorkflow(wesClient, fileurl, outfile):

    sam_view_app = 'sbg://forei/ismb-tutorial/samtools-view-drsurl-1-8-url'

    ref_drs_id = 'drs://cgc-ga4gh-api.sbgenomics.com/5caf7ebec80cb0e41b007adf'
    params = {
        "project": wesClient.project_id,
        "inputs": {
          "alignment_file_url": fileurl,
          "count_alignments": True,
          "reference_file": {
            "path": ref_drs_id,
            "name": "references-hs37d5-hs37d5.fasta",
            "class": "File"
          },
          "output_file_path": outfile
        }
     }


    body = {
      "workflow_params": (None, json.dumps(params), 'application/json'),
      "workflow_type": "CWL",
      "workflow_type_version": "sbg:draft-2",
      "workflow_url": sam_view_app
    }
    #print(wesClient.api_url_base)
    #response = requests.request("POST", wesClient.api_url_base+'/runs', 
    #                            headers=wesClient.headers, files = body)
    
    run_id= wesClient.run_generic_workflow(
        workflow_url=sam_view_app,
        workflow_params = json.dumps(params),
        workflow_type = "CWL",
        workflow_type_version = "sbg:draft-2",
        verbose=False
    )
    return(run_id)

This is the loop that would normally be within FASPRunner, but because of the different approach to access_id we will write a custom version

In [87]:
import datetime

# set the region we want to access data from
region = 's3.us-east-1'
my_runs = []
        
for row in json_result:

    print("subject={}, drsID={}".format(row['bam_drs_id'], row['sample_name']))
    drs_id = row['bam_drs_id']


    objInfo = drsClient.get_object(drs_id)
    url = drsClient.get_url_for_region(drs_id,region)

    # Step 3 - Run a pipeline on the file at the drs url
    if url != None:
        outfile = "{}.txt".format(row['sample_name'])
        time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        run_id = runWorkflow(wesClient, url, outfile)
        print('Submitted run {} to {}'.format(run_id, wesClient.__class__.__name__))
        my_runs.append(run_id)
        row['run_id']=run_id
    print('_________________________________________________________________________')

subject=515ae091f29ac699a4d2e272812cea47, drsID=HG00731
Submitted run 9a8df37a-d265-4159-8d64-37244ff8f63a to sbcgcWESClient
_________________________________________________________________________
subject=475dfc02f643c368036df6816d05afe4, drsID=HG00637
Submitted run 493e58a7-b631-4482-9a3c-b069a0b73cba to sbcgcWESClient
_________________________________________________________________________
subject=58e2964f2a0adbf41ab0e8c7a95e7d0c, drsID=HG00640
Submitted run 25b1b213-6fee-4122-a26c-b348c08610fd to sbcgcWESClient
_________________________________________________________________________


In [88]:
json_result

[{'sample_name': 'HG00731',
  'bam_drs_id': '515ae091f29ac699a4d2e272812cea47',
  'acc': 'SRR1606560',
  'run_id': '9a8df37a-d265-4159-8d64-37244ff8f63a'},
 {'sample_name': 'HG00637',
  'bam_drs_id': '475dfc02f643c368036df6816d05afe4',
  'acc': 'SRR1596919',
  'run_id': '493e58a7-b631-4482-9a3c-b069a0b73cba'},
 {'sample_name': 'HG00640',
  'bam_drs_id': '58e2964f2a0adbf41ab0e8c7a95e7d0c',
  'acc': 'SRR1596923',
  'run_id': '25b1b213-6fee-4122-a26c-b348c08610fd'}]

In [91]:
for run in json_result:
    status = wesClient.get_task_status(run['run_id'])
    print(("Run {} {}".format(run['run_id'], status)))

Run 9a8df37a-d265-4159-8d64-37244ff8f63a RUNNING
Run 493e58a7-b631-4482-9a3c-b069a0b73cba RUNNING
Run 25b1b213-6fee-4122-a26c-b348c08610fd RUNNING


## Getting the results

In [34]:
runLog = wesClient.get_run_log(my_runs[0])
runLog

{'request': {'tags': {},
  'workflow_params': {'name': 'SAMtools View 1.8 run - 06-29-22 23:50:13',
   'project': 'forei/ismb-tutorial',
   'inputs': {'total_memory_GB': None,
    'coverage_limit': None,
    'count_alignments': True,
    'include_only_read_group': None,
    'remove_duplicates': None,
    'max_insert_size': None,
    'reference_file': {'path': 'drs://cgc-ga4gh-api.sbgenomics.com/62b07ea84e3edb6b1c23c8d5',
     'name': 'Homo_sapiens_assembly19_1000genomes_decoy.fasta',
     'class': 'File'},
    'output_file_path': 'NA18948.txt',
    'alignment_file_url': 'https://1000genomes.s3.amazonaws.com/phase3/data/NA18948/exome_alignment/NA18948.mapped.ILLUMINA.bwa.JPT.exome.20121211.bam'}},
  'workflow_type': 'CWL',
  'workflow_engine_params': {},
  'workflow_url': 'sbg://forei/ismb-tutorial/samtools-view-drsurl-1-8-url'},
 'state': 'COMPLETE',
 'outputs': {'counts': {'path': 'drs://cgc-ga4gh-api.sbgenomics.com/62bce6d14e3edb6b1c423d75',
   'name': 'NA18948.txt',
   'class': 'Fil

Use the Seven Bridges CGC DRS service to retrieve the output file

In [17]:
from  fasp.loc import sbcgcDRSClient
results_DRS_client = sbcgcDRSClient(SB_API_KEY_PATH, 's3')
resultsDRSID = runLog['outputs']['counts']['path']
resultsDRSID = resultsDRSID.split('/')[-1]
print(resultsDRSID)
fileDetails = results_DRS_client.get_object(resultsDRSID)
print(fileDetails)
url = results_DRS_client.get_access_url(resultsDRSID, 's3')

62bce6d14e3edb6b1c423d75
{'id': '62bce6d14e3edb6b1c423d75', 'name': 'NA18948.txt', 'size': 10, 'checksums': [{'type': 'etag', 'checksum': '723977676c726f15f91b1a6c3eb982f8-1'}], 'self_uri': 'drs://cgc-ga4gh-api.sbgenomics.com/62bce6d14e3edb6b1c423d75', 'created_time': '2022-06-29T23:57:05Z', 'updated_time': '2022-06-29T23:57:05Z', 'mime_type': 'application/json', 'access_methods': [{'type': 's3', 'region': 'us-east-1', 'access_id': 'aws-us-east-1'}]}


In [20]:
import requests
import os
def download(url, file_path):
    with open(os.path.expanduser(file_path), "wb") as file:
        response = requests.get(url)
        file.write(response.content)

      

In [None]:
fileDetails = resultsDRS.get_object(resultsDRSID)
fullPath = '~/Downloads/' + fileDetails['name']
print(url)
download(url, fullPath) 

In [35]:
wesClient.get_run_log('4c1d705f-c9bb-4347-818b-a1e84af60255')

{'request': {'tags': {},
  'workflow_params': {'name': 'SAMtools View 1.8 run - 06-29-22 23:50:13',
   'project': 'forei/ismb-tutorial',
   'inputs': {'total_memory_GB': None,
    'coverage_limit': None,
    'count_alignments': True,
    'include_only_read_group': None,
    'remove_duplicates': None,
    'max_insert_size': None,
    'reference_file': {'path': 'drs://cgc-ga4gh-api.sbgenomics.com/62b07ea84e3edb6b1c23c8d5',
     'name': 'Homo_sapiens_assembly19_1000genomes_decoy.fasta',
     'class': 'File'},
    'output_file_path': 'NA18948.txt',
    'alignment_file_url': 'https://1000genomes.s3.amazonaws.com/phase3/data/NA18948/exome_alignment/NA18948.mapped.ILLUMINA.bwa.JPT.exome.20121211.bam'}},
  'workflow_type': 'CWL',
  'workflow_engine_params': {},
  'workflow_url': 'sbg://forei/ismb-tutorial/samtools-view-drsurl-1-8-url'},
 'state': 'COMPLETE',
 'outputs': {'counts': {'path': 'drs://cgc-ga4gh-api.sbgenomics.com/62bce6d14e3edb6b1c423d75',
   'name': 'NA18948.txt',
   'class': 'Fil

In [66]:
import tempfile

def get_sam_view_result(run_id):
    log = wesClient.get_run_log(run_id)
    resultsDRSID = log['outputs']['counts']['path']
    resultsDRSID = resultsDRSID.split('/')[-1]
    url = results_DRS_client.get_access_url(resultsDRSID,'s3')
    with tempfile.NamedTemporaryFile(mode='r+') as file:
        response = requests.get(url)
        file.write(response.text)
        file.seek(0)
        x = file.read()
    return x.strip()

    

In [68]:
for run in json_result:
    count_result = get_sam_view_result(run['run_id']))
    run['count_result'] = count_result


105981107
132813326
135301464
