## Run WES workflow on Kid First DRS files searched on NCPI FHIR server

This script demonstrates uses of the NCPI FHIR server to find patients and files from the Familial Leukemia study. The query returns DRS ids allowing the files accessed via the Kid's First Data Resource Center DRS server. The workflow is run on the Seven Bridges Cancer Genomics Cloud WES server. 

The FHIR server is accessed through the fhir-py Python client.

### Set up the FHIR client
Access is via a cookie obtained as described [here](https://github.com/NIH-NCPI/ncpi-api-fhir-service).

In [1]:
from fhirpy import SyncFHIRClient
import os
import json


endpoint = 'https://ncpi-api-fhir-service-dev.kidsfirstdrc.org'
full_cookie_path = os.path.expanduser('~/.keys/ncpi_fhir_cookie.json')
with open(full_cookie_path) as f:
        cookies = json.load(f)

client = SyncFHIRClient(endpoint, extra_headers=cookies)

In [3]:
resources = client.resources('ResearchSubject')
studyid = 577137
resources = resources.search(study=studyid).limit(10)
subjects = resources.fetch()
print("# of subjects found {}:{}".format(studyid, len(subjects)))
#for s in subjects:
#    print(json.dumps(s.serialize(), indent=3))
#    print('_'*50)

# of subjects found 577137:10


## Retrieve phenotypic data for each subject from FHIR
Save the results to a DataFrame

In [5]:
import pandas as pd
mySubjects = []
for s in subjects:
    sjson = s.serialize()
    patientID = sjson['individual']['reference']
    thisSubject = {"patientID":patientID}
    patients = client.resources('Patient')
    p = patients.search(_id=patientID).get()
    pjson=p.serialize()
    #print(json.dumps(pjson, indent=3))
    

    #print("Observations",'.'*20)
    resources = client.resources('Observation')
    observations = resources.search(subject=patientID).fetch_all()
    on = 0
    csn = 0
    fmn = 0
    for o in observations:
        on = on+1
        ojson=o.serialize()
        #print(json.dumps(ojson, indent=3))
        if ojson['code']['text'] == 'Clinical status':
            csn +=1
            thisSubject['ClinStatus{}'.format(csn)] = ojson['valueCodeableConcept']['text']
            #thisSubject['CS{}_code'.format(csn)] = ojson['code']['coding'][0]['code']
        '''
        elif ojson['code']['coding'][0]['code'] == 'FAMMEMB':
            fmn +=1
            thisSubject['FamMember{}'.format(fmn)] = ojson['valueCodeableConcept']['text']
            thisSubject['FamMember{}_focus'.format(fmn)] = ojson['focus'][0]['reference']
            #thisSubject['O{}_code'.format(on)] = ojson['code']['coding'][0]['code']

        else:
            thisSubject['O{}_valuetext'.format(on)] = ojson['valueCodeableConcept']['text']
            thisSubject['O{}_code'.format(on)] = ojson['code']['coding'][0]['code']

        print("."*50)
        '''

    #print("Conditions",'+'*20)
    resources = client.resources('Condition')
    conditions = resources.search(subject=patientID).fetch_all()
    cn = 0
    for c in conditions:
        cn += 1
        cjson=c.serialize()
        #print(json.dumps(cjson, indent=3))
        #print("+"*50)
        thisSubject['Cndtn{}_codetext'.format(cn)] = c['code']['text']
        cdn = 0
        if 'coding' in c['code']:
            for cd in c['code']['coding']:
                cdn +=1
                thisSubject['Cndtn{}_code{}'.format(cn,cdn)] = cd['code']

    mySubjects.append(thisSubject)
    #print('_'*50)
    
df = pd.DataFrame(mySubjects)



In [6]:
df

Unnamed: 0,patientID,ClinStatus1,ClinStatus2,Cndtn1_codetext,Cndtn1_code1,Cndtn1_code2
0,Patient/576466,Alive,Alive,,,
1,Patient/576465,Alive,Alive,ALL,MONDO:0004967,NCIT:C3167
2,Patient/576467,Alive,Alive,,,
3,Patient/576473,Alive,Alive,NHL,MONDO:0018908,NCIT:C3211
4,Patient/576468,Alive,Alive,HL,MONDO:0004952,NCIT:C9357
5,Patient/576474,Alive,Alive,Hodgkins Lymphoma,MONDO:0004952,NCIT:C9357
6,Patient/576470,Alive,Alive,Hodgkin's Lymphoma,MONDO:0004620,NCIT:C24182
7,Patient/576469,Alive,Alive,,,
8,Patient/576472,Alive,Alive,,,
9,Patient/576450,Reported Unknown,Reported Unknown,HL,MONDO:0004952,NCIT:C9357


Use DRSMetaresolver to direct requests for DRS URIs to the right DRS server

In [7]:
from fasp.loc import DRSMetaResolver
drsClient = DRSMetaResolver()

Searching the GA4GH registry for org.ga4gh:drs services


Set up a function to retrieve a DRS id from the FHIR server

In [8]:
def getFiles(client, subject_id, docFormat):
    resources = client.resources('DocumentReference')
    resources = resources.search(subject=subject_id).limit(1000)
    # The following works for the data for certain studies/patients, but not others
    # resources.search(subject=subject_id, format__text=docFormat)
    # So we do a query just based on subject, and then filter the results ourselves
    documents = resources.fetch_all()
    print("# of documents for subject {} :{}".format(subject_id, len(documents)))
    # The following filters for the required format
    myDocs = []
    retval = None
    for d in documents:
        djson = d.serialize()
        if djson['content'][0]['format']['display'] == docFormat:
            retval = djson['content'][0]['attachment']['url']
    return retval

Iterate over each subject to submit the correspinding bam file to the Seven Bridges WES server. The worflow will execute SAMTools Stats on each file.

In [11]:
from fasp.workflow import sbcgcWESClient
from fasp.runner import FASPRunner
import datetime

faspRunner = FASPRunner()
runNote = 'KF Leukemia bam files retrieved from NCPI FHIR Server and computed on SB'

# Set up WES Client for WES server of our choice
settings = faspRunner.settings
wesClient = sbcgcWESClient(settings['SevenBridgesProject'])

run_ids = []

# repeat for each row of the query
for s in subjects:
    sjson = s.serialize()
    patientID = sjson['individual']['reference']
    drsuri = getFiles(client, patientID, 'bam')
    
    #DRS ids stored in the NCPI FHIR server include the host prefix. Just get the id
    #subjectID = sjson['id']
    
    if drsuri:
        print("subject={}, DRS URI={}".format(patientID, drsuri))
        #drsID = drsuri.split('/')[-1]
        #Call the DRS server to get file details
        # We're going to keep the file size
        objInfo = drsClient.getObject2(drsuri)
        fileSize = objInfo['size']
        # Use DRS to get the URL
        url = drsClient.getAccessURL2(drsuri,'s3')
        # Step 3 - Run a pipeline on the file at the drs url
        outfile = "{}.txt".format(sjson['id'])
        pipeline_id = wesClient.runWorkflow(url, outfile)
        print('submitted:{}'.format(pipeline_id))
    else:
        print("No bam for subject={}".format(patientID))
        pipeline_id = None
        fileSize = None
        

    via = 'WES'
    note = 'WES samtools on KF Leukemia'
    time = datetime.datetime.now().strftime("%m/%d/%Y, %H:%M:%S")
    faspRunner.logRun(time, via, note,  pipeline_id, outfile, str(fileSize),
        client, drsClient, wesClient)
    run_ids.append({"patientID":patientID, "run_id":pipeline_id})

runs_df = pd.DataFrame(run_ids)
runs_df


Running /var/folders/wz/jjbxsnr13v7dkw2jkbpmqd_dly65gq/T/ipykernel_39426/1934627716.py
# of documents for subject Patient/576466 :8
subject=Patient/576466, DRS URI=drs://data.kidsfirstdrc.org/e6eecb29-1ae4-4f65-ae83-9ecf1c496de1
id:e6eecb29-1ae4-4f65-ae83-9ecf1c496de1
sending to: kfDRSClient
https://cgc-ga4gh-api.sbgenomics.com/ga4gh/wes/v1
submitted:4f874352-27fa-4573-9b68-a5394bdf0e0c
# of documents for subject Patient/576465 :8
subject=Patient/576465, DRS URI=drs://data.kidsfirstdrc.org/a57f1be2-d756-4f47-810a-6455e7431aba
id:a57f1be2-d756-4f47-810a-6455e7431aba
sending to: kfDRSClient
https://cgc-ga4gh-api.sbgenomics.com/ga4gh/wes/v1
submitted:f9e75d07-9ce1-47bc-b5a7-34d4554e2903
# of documents for subject Patient/576467 :8
subject=Patient/576467, DRS URI=drs://data.kidsfirstdrc.org/f7448bea-ea5c-4e91-86e2-1aa41c3dc5c4
id:f7448bea-ea5c-4e91-86e2-1aa41c3dc5c4
sending to: kfDRSClient
https://cgc-ga4gh-api.sbgenomics.com/ga4gh/wes/v1
submitted:27b3d723-c811-4559-9fe2-f926b248e48b
# of

Unnamed: 0,patientID,run_id
0,Patient/576466,4f874352-27fa-4573-9b68-a5394bdf0e0c
1,Patient/576465,f9e75d07-9ce1-47bc-b5a7-34d4554e2903
2,Patient/576467,27b3d723-c811-4559-9fe2-f926b248e48b
3,Patient/576473,1c0cdacb-2b3f-4612-b9d8-c5dce9d60b5a
4,Patient/576468,e24917d7-035c-4459-b7da-58110f914e9d
5,Patient/576474,aca24b6e-5fa7-4ba2-b7f4-142db68a0991
6,Patient/576470,1f64713f-a679-4e29-a049-5400e7f89030
7,Patient/576469,87459b70-3e01-4bf2-85a3-e39df8941a53
8,Patient/576472,
9,Patient/576450,60bf9c82-a7f8-4bb5-9482-33c157da3f70


Check status of each run via the WES server

In [21]:
for index, row in runs_df.iterrows():
    if row['run_id'] != None:
        status = wesClient.getTaskStatus(row['run_id'])
        runs_df.at[index, "status"] = status
    else:
        print ('no run for {}'.format(row['patientID']))
runs_df

no run for Patient/576472


Unnamed: 0,patientID,run_id,status
0,Patient/576466,4f874352-27fa-4573-9b68-a5394bdf0e0c,COMPLETE
1,Patient/576465,f9e75d07-9ce1-47bc-b5a7-34d4554e2903,COMPLETE
2,Patient/576467,27b3d723-c811-4559-9fe2-f926b248e48b,COMPLETE
3,Patient/576473,1c0cdacb-2b3f-4612-b9d8-c5dce9d60b5a,COMPLETE
4,Patient/576468,e24917d7-035c-4459-b7da-58110f914e9d,COMPLETE
5,Patient/576474,aca24b6e-5fa7-4ba2-b7f4-142db68a0991,COMPLETE
6,Patient/576470,1f64713f-a679-4e29-a049-5400e7f89030,COMPLETE
7,Patient/576469,87459b70-3e01-4bf2-85a3-e39df8941a53,COMPLETE
8,Patient/576472,,
9,Patient/576450,60bf9c82-a7f8-4bb5-9482-33c157da3f70,COMPLETE


Define function to download a file and to extract the result from the output file from SAMTools Stats

In [22]:
import requests
import os
def download(url, file_path):
    '''Download a file from a URL to a local file path'''
    with open(os.path.expanduser(file_path), "wb") as file:
        response = requests.get(url)
        file.write(response.content)
        
def getStats(filePath, statsList):
    ''' Extract values from a SAMTools Stats results file'''
    ### 
    
    retDict = {}
    f = open(filePath, "r")
    for x in f:
        if x.startswith('SN'):   
            parts = x.split('\t')
            statName = parts[1].split(':')[0]
            if statName in statsList:
                retDict[statName] = parts[2].rstrip()
    return retDict
 

For the completed workflows retrieve the results file and extract the required result values. Add the result values to a DataFrame.

In [23]:
from fasp.loc import sbcgcDRSClient
statsList = []
for index, row in runs_df.iterrows():
    if row['run_id'] != None and row['status'] == 'COMPLETE':
        log = wesClient.GetRunLog(row['run_id'])
        resultsDRSID = log['outputs']['statistics']['path']
        #print("DRS URI for results for subject {}: {}".format(row['patientID'] ,resultsDRSID))
        url = drsClient.getAccessURL2(resultsDRSID,'s3')
        fileName = log['outputs']['statistics']['name']
        download(url, fileName)
        statsRequired = ['insert size average','insert size standard deviation']
        stats = getStats(fileName, statsRequired)
        #print(stats)
        stats['run_id'] = row['run_id']
        stats['patientID'] = row['patientID']
        statsList.append(stats)
        os.remove(fileName)
stats_df =  pd.DataFrame(statsList)
stats_df

Unnamed: 0,insert size average,insert size standard deviation,run_id,patientID
0,360.3,112.1,4f874352-27fa-4573-9b68-a5394bdf0e0c,Patient/576466
1,378.4,116.0,f9e75d07-9ce1-47bc-b5a7-34d4554e2903,Patient/576465
2,365.1,131.9,27b3d723-c811-4559-9fe2-f926b248e48b,Patient/576467
3,346.4,107.9,1c0cdacb-2b3f-4612-b9d8-c5dce9d60b5a,Patient/576473
4,350.7,113.6,e24917d7-035c-4459-b7da-58110f914e9d,Patient/576468
5,371.8,98.6,aca24b6e-5fa7-4ba2-b7f4-142db68a0991,Patient/576474
6,357.5,100.7,1f64713f-a679-4e29-a049-5400e7f89030,Patient/576470
7,338.3,102.0,87459b70-3e01-4bf2-85a3-e39df8941a53,Patient/576469
8,291.8,130.0,60bf9c82-a7f8-4bb5-9482-33c157da3f70,Patient/576450


In [28]:
final_df = pd.merge(
    df,
    stats_df,
    how="outer",
    left_on='patientID',
    right_on='patientID'
)
final_df.drop('run_id', axis=1, inplace=True)
final_df.drop('ClinStatus2', axis=1, inplace=True)

final_df

Unnamed: 0,patientID,ClinStatus1,Cndtn1_codetext,Cndtn1_code1,Cndtn1_code2,insert size average,insert size standard deviation
0,Patient/576466,Alive,,,,360.3,112.1
1,Patient/576465,Alive,ALL,MONDO:0004967,NCIT:C3167,378.4,116.0
2,Patient/576467,Alive,,,,365.1,131.9
3,Patient/576473,Alive,NHL,MONDO:0018908,NCIT:C3211,346.4,107.9
4,Patient/576468,Alive,HL,MONDO:0004952,NCIT:C9357,350.7,113.6
5,Patient/576474,Alive,Hodgkins Lymphoma,MONDO:0004952,NCIT:C9357,371.8,98.6
6,Patient/576470,Alive,Hodgkin's Lymphoma,MONDO:0004620,NCIT:C24182,357.5,100.7
7,Patient/576469,Alive,,,,338.3,102.0
8,Patient/576472,Alive,,,,,
9,Patient/576450,Reported Unknown,HL,MONDO:0004952,NCIT:C9357,291.8,130.0
