# Galaxy integration through API from FAIRDOM

* Upload data from list of URLs
* Run workflow


In [1]:
from bioblend import galaxy
from collections import OrderedDict
import json
import time

In [2]:
galaxy_config = {
    'url': 'https://usegalaxy.be',
    'api_key': '707a42add356ba80066c1aafde3c5b9d' # make an account on the Galaxy instance and then User - Preferences - Manage API key
}

In [3]:
fairdom_config = {
    #'investigation' : '8-way RIL population', # Wolfgang
    'investigation' : '8-way_RIL_population',# Frederik
    
    #'study' : 'RIL_8-way_growth_chamber',    
    #'assay' : 'RNA_seq_E-MTAB-3965',         
    'workflow' : 'Salmon_maize_paired' # name of the workflow, assumes there is only one with that name
}

#samples = {
#    'RIL1' : 
#        { 
#            'forward' : 'ftp://ftp.sra.ebi.ac.uk/vol1/fastq/ERR105/002/ERR1059392/ERR1059392_1.fastq.gz',
#            'reverse' : 'ftp://ftp.sra.ebi.ac.uk/vol1/fastq/ERR105/002/ERR1059392/ERR1059392_2.fastq.gz',
#            #'phenotyping' : 'https://floppy.psb.ugent.be/index.php/s/0ZK7y1gZb26BAEg/download'
# }
#}

In [4]:
samples = {
    'SAMPLENAME' : 
        { 
            'STEP1' : 'URL1',
            'STEP2' : 'URL2',
        }
}
print "The list of URLs read from the data file:\n"

for x in samples:
    print (x)
    for y in samples[x]:
        print (y +':'+ samples[x][y])

The list of URLs read from the data file:

SAMPLENAME
STEP2:URL2
STEP1:URL1


## Connecting to Galaxy

* Connection to Galaxy
'url': 'https://usegalaxy.be',
'api_key': 'API_KEY' # make an account on the Galaxy instance and then User - Preferences - Manage API key

* Info needed from FAIRDOM
investigation : 8-way RIL population
workflow : Salmon_maize_paired

* Info on data 

In [5]:
gi = galaxy.GalaxyInstance(url=galaxy_config['url'], key=galaxy_config['api_key'])

In [6]:
print ("Current user's information:\n")
current_user = gi.users.get_current_user()

print ("username:" + current_user["username"])
print ("email:" + current_user["email"])
print ("is_admin:" + str(current_user["is_admin"]))


Current user's information:

username:wolfgang
email:wolfgang.mueller@h-its.org
is_admin:False


## Get the data and copy it to Galaxy

* I created a FAIRDOM data library where we will upload data, getting a link to it
* I need give an accouynt access to allow to write in this folder (feature not a bug)

In [7]:
#I created a FAIRDOM data library where we will upload data, getting a link to it
#I need give an accouynt access to allow to write in this folder (feature not a bug)
library = gi.libraries.get_libraries(name = 'FAIRDOM')

In [8]:
# If a folder with the investigation name already exist, 

root_folder = gi.libraries.get_folders(library[0]['id'])
files = gi.libraries.show_library(library[0]['id'], contents=True)

investigation_present = False

investigation_library = ""

for file in files:
    if file['name'] == ("/" + fairdom_config['investigation']):
        investigation_present = True
        investigation_library = file['id']

if not investigation_present:
    print("A folder with the investigation name doesn't exist, create investigation!")
    investigation_library =  gi.libraries.create_folder(library[0]['id'], fairdom_config['investigation'], description=None)[0]
else:
    investigation_library =  gi.libraries.get_folders(library[0]['id'], name = "/" + fairdom_config['investigation'])[0]
    print("A folder with the investigation name already exist!")



A folder with the investigation name already exist!


In [9]:
uploads = {}
for sample in samples:
    print(sample)
    uploads[sample] = []
    for key, file in samples[sample].items():
            print(file)
            print("\n")
            # does not check if file is present
            file_present = False
            for avail_file in files:
                if avail_file['name'] == ("/" + fairdom_config['investigation'] + "/" + file):
                    print("file found: ")
                    print(avail_file)
                    print("\n")
                    uploads[sample].append(avail_file) 
                    file_present = True
                    break
            if not file_present :
                # this gives url as filename, can be changed through update, not yet implemented
                uploaded_file = gi.libraries.upload_file_from_url(library[0]['id'], 
                     file_url = file, 
                     folder_id=investigation_library['id'], 
                     file_type='fastqsanger.gz',
                     #file_type='auto',                                             
                     #dbkey='?'
                    )
                uploads[sample].append(uploaded_file[0])

SAMPLENAME
URL2


URL1




In [10]:
# to be improved, now waiting for all samples to be uploaded
not_yet_ready = True
errors = False
while not_yet_ready:
    for sample in samples:
        print(sample)
        for upload in uploads[sample]:
            print(gi.libraries.show_dataset(library[0]['id'], upload['id'])['state'])
            if gi.libraries.show_dataset(library[0]['id'], upload['id'])['state'] == 'ok':
                not_yet_ready = False
                # update_library_dataset(library[0]['id'], name="name_to_which_it_needs_to_be_changed")
            elif gi.libraries.show_dataset(library[0]['id'], upload['id'])['state'] == 'error':
                not_yet_ready = False
                errors = True
            else: 
                not_yet_ready = True

    if not_yet_ready:
        print("Waiting for upload")
        time.sleep(60)
            
print("The files are Ready !")

SAMPLENAME
queued
queued
Waiting for upload


KeyboardInterrupt: 

## Run workflow

In [None]:
#assumes a workflow with that name is present for the user
workflows = gi.workflows.get_workflows(name = fairdom_config['workflow'], published=True)
workflow = workflows[0]


In [None]:
invoked_workflows = {}
for sample in samples:
    
    # assuming order forward - reverse and only for the first pair
    inputs = {}
    inputs[0] = { 'src':'ld', 'id':uploads[sample][0]['id'] }
    inputs[1] = { 'src':'ld', 'id':uploads[sample][1]['id'] }

    invoked_workflow = gi.workflows.invoke_workflow(workflow['id'], 
                             inputs=inputs, 
                             import_inputs_to_history=True, 
                             history_name=sample)
    print("invoked workflow...\n")
    print("update time:"+invoked_workflow["update_time"])
    print("history id:"+invoked_workflow["history_id"])
    print("uuid:"+invoked_workflow["uuid"])
    print("state:"+invoked_workflow["state"])
    print("workflow id:"+invoked_workflow["workflow_id"])
    print("model_class:"+invoked_workflow["model_class"])
    print("id:"+invoked_workflow["id"])



    #print(invoked_workflow)
    invoked_workflows[sample] = invoked_workflow

In [None]:
# needs to match annotation of workflow step labels
# assuming downloading one file per step
downloads = {
    'FastQC forward': {
        'type' : 'html_file',
        'filename_postfix' : 'fastqc_fw.zip' # zip file with a html (and other stuff) inside
    }, 
    'FastQC reverse' : {
        'type' : 'text_file', # essentially same output as previous one, but text version
        'filename_postfix' : 'fastqc_rev.txt'
    }, 
    'Salmon' : {
        'type' : 'output_quant',
        'filename_postfix' : 'counts.txt'
    }
}

print "Downloading files:\n"

for x in downloads:
    print (x)
    for y in downloads[x]:
        print (y +':'+ downloads[x][y])

In [None]:
all_ready = False

while not all_ready:
    time.sleep(10)
    all_ready = True
    for sample in samples:
        print (sample)
        filename_prefix = fairdom_config['investigation'] + '_' + sample + '_' # to do: link with sample name, for now hardcoded
        #print ("filename_prefix:"+filename_prefix)
        #print (str(gi.workflows.show_invocation(invoked_workflows[sample]['workflow_id'], invoked_workflows[sample]['id'])['steps']))
        for step in gi.workflows.show_invocation(invoked_workflows[sample]['workflow_id'], invoked_workflows[sample]['id'])['steps']:
            print("steps:")
            #print(step)
            if step['job_id']: # input does not have job_id
                print(gi.jobs.get_state(step['job_id']))
                if gi.jobs.get_state(step['job_id']) == 'ok':
                    # job finished
                    all_ready = True
                else:
                    all_ready = False
                

In [None]:
for sample in samples:
    for step in gi.workflows.show_invocation(invoked_workflows[sample]['workflow_id'], invoked_workflows[sample]['id'])['steps']:
        if step['workflow_step_label'] in downloads:
            outputs = gi.jobs.show_job(step['job_id'])['outputs']
            for output in outputs:
                if output == downloads[step['workflow_step_label']]['type']:
                        #print(step['workflow_step_label'])
                        #print(output)
                        print(filename_prefix + downloads[step['workflow_step_label']]['filename_postfix'])
                        #print (outputs[output]['id'])
                        #print (outputs[output]['src'])
                        #gi.datasets.download_dataset(outputs[output]['id'], file_path=filename_prefix + downloads[step['workflow_step_label']]['filename_postfix'], use_default_filename=False, maxwait=12000)
                        
                        with open( filename_prefix + downloads[step['workflow_step_label']]['filename_postfix'], 'bw') as f:
                            f.write(gi.datasets.download_dataset(outputs[output]['id'], use_default_filename=False, maxwait=12000))