In [11]:
import sevenbridges as sbg
from sevenbridges.errors import SbgError
from sevenbridges.http.error_handlers import rate_limit_sleeper, maintenance_sleeper
import sys
import re
import pdb
import concurrent.futures
from requests import request
import json
config = sbg.Config(profile='turbo')
api = sbg.Api(config=config, error_handlers=[rate_limit_sleeper, maintenance_sleeper])

## Create example bash calls from workflow
Works best with non-restarted WF

In [2]:
project = "d3b-bixu/dev-rnaseq-snv"
task_id = "8b37cb5b-ef56-4f3b-a7c9-87a89b07171f"
out_file = open("/Users/brownm28/Documents/2020-Apr-8_RNAseq_snv_dev/2020-06-16_gatk4.tsv", "w")
# task_id = "3c20cc8e-18d7-43f2-bc2c-4a76d38a88f8"
task = api.tasks.get(task_id)
jobs = {}
temp = {}
for job in task.get_execution_details().jobs:
    if job.status == "COMPLETED":
        check = job.name.split('_')
        cmd = job.command_line
        if job.command_line == None:
            # pdb.set_trace()
            cmd = "embedded script or task retry"
            sys.stderr.write("WARN: Job " + job.name + " had null cmd\n")
        if check[-1] == "s":
            key = "_".join(check[:-2])
            if key not in temp:
                jobs[job.start_time] = {}
                jobs[job.start_time][key] = cmd
                temp[key] = 1
            else:
                temp[key] += 1
        else:
            jobs[job.start_time] = {}
            jobs[job.start_time][job.name] = cmd
out_file.write("Step\tType\tNum scatter\tCommand\n")
for rtime in sorted(jobs.keys()):
    for key in jobs[rtime]:
        rtype = "run step"
        sct = "NA"
        if key in temp and temp[key] > 1:
            rtype = "scatter"
            sct = str(temp[key])
        cmds = jobs[rtime][key].split('\n')
        for cmd in cmds:
            out_file.write(key + "\t" + rtype + "\t" + sct + "\t" + cmd + "\n")
out_file.close()


## Convert tsv to markdown table

In [3]:
import sys

# max desired col width
max_w = 200
tsv_in = open("/Users/brownm28/Documents/2020-Apr-8_RNAseq_snv_dev/2020-06-16_gatk4.tsv")
out_md = open("/Users/brownm28/Documents/2020-Apr-8_RNAseq_snv_dev/2020-06-16_gatk4.md", "w")
data = []
max_len = []


for line in tsv_in:
    info = line.rstrip('\n').split('\t')
    data.append(info)
    if len(max_len) == 0:
        for item in info:
            max_len.append(len(item))
    else:
        for i in range(len(max_len)):
            if len(info[i]) > max_w:
                max_len[i] = max_w
            elif len(info[i]) > max_len[i]:
                max_len[i] = len(info[i])
# print header first
d_ct = []
for i in range(len(data[0])):
    d_ct.append(len(data[0][i]))
    out_md.write(" | " + data[0][i] + "".join([" "] * max_len[i]))
    d_ct[i] += max_len[i]
out_md.write(" |\n")
for i in range(len(data[0])):
    out_md.write(" | " + "".join(["-"] * d_ct[i]))
out_md.write(" |\n")
# pdb.set_trace()
for i in range(1, len(data), 1):
    for j in range(len(data[i])):
        d_ct = len(data[i][j]) + 2
        out_md.write(" | " + data[i][j] + "".join([" "] * max_len[j]))
        d_ct += max_len[j]
    out_md.write(" |\n")
out_md.close()

## Get run times

### Get run times by step

In [2]:
def get_job_run_time(task, phrase):
    data = []
    if re.search(phrase, task.name):
        try:
            for job in task.get_execution_details().jobs:
                if job.status != "COMPLETED":
                    sys.stderr.write("Skipping job likely killed due to spot instance kill for " + job.name + " from task " + task.id + "\n")
                else:
                    data.append([job.name, str((job.end_time-job.start_time).seconds/3600)])
            # pdb.set_trace()
            hold=1
            return task.id, task.name, str(task.price.amount), str((task.end_time - task.start_time).seconds/3600), data
        except Exception as e:
            return [e, task.id]
    else:
        return []


In [3]:
project = "kfdrc-harmonization/sd-bhjxbdqk-06"
phrase = "RNAfusion-"

tasks = api.tasks.query(project=project, status="COMPLETED").all()
actual_out = open("/Users/brownm28/Documents/2020-Apr-8_RNAseq_snv_dev/cost_est/actual_cost.txt", "w")
actual_out.write("Task name\tTask ID\tCost\tRun Time in hours\n")
step_run = open("/Users/brownm28/Documents/2020-Apr-8_RNAseq_snv_dev/cost_est/step_run_times.txt", "w")
step_run.write("Run step\tRun time in hours\n")
# for task in tasks:
#     result = get_job_run_time(task, phrase)
#     if len(result) > 0:
#         pdb.set_trace()
#         actual_out.write("\t".join(result[0:4]) + "\n")
#         for step in result[4]:
#             step_run.write("\t".join(step) + "\n")
x = 0
m = 100
with concurrent.futures.ThreadPoolExecutor(16) as executor:
    results = {executor.submit(get_job_run_time, task, phrase): task for task in tasks}
    for result in concurrent.futures.as_completed(results):
        if len(result.result()) > 2:
            if x % m == 0:
                sys.stderr.write("Processed " + str(x) + " valid tasks\n")
            actual_out.write("\t".join(result.result()[0:4]) + "\n")
            for step in result.result()[4]:
                step_run.write("\t".join(step) + "\n")
            x += 1
        elif len(result.result()) == 2:
            sys.stderr.write(str(result.result()[0]) + "\tFailed processing task ID " + result.result()[1] + "\n")
            exit(1)
actual_out.close()
step_run.close()


Skipping job likely killed due to spot instance kill for star_fusion from task 90520afe-23e8-408e-a6d1-7a4307bff6cb
Skipping job likely killed due to spot instance kill for star_fusion from task ad4166df-eb20-4403-9e62-a434fc023946
Skipping job likely killed due to spot instance kill for pizzly from task b75638c2-fdc0-4321-8320-b7b3b7b6940e
Skipping job likely killed due to spot instance kill for star_fusion from task cf2e69a6-c9b8-47d3-92a6-52c41f4cbc2a
Skipping job likely killed due to spot instance kill for star_fusion from task c011ab43-7a02-4892-981d-5f897de739ac
Skipping job likely killed due to spot instance kill for arriba_fusion from task b6276343-2100-44c2-afc7-23a45ba141cf
Skipping job likely killed due to spot instance kill for pizzly from task c9cff929-f971-41b1-90b0-bf0a3e40f2fb
Skipping job likely killed due to spot instance kill for star_fusion from task 5d9f1b03-f375-4363-bfaf-304fadb3ee6f
Skipping job likely killed due to spot instance kill for star_fusion from task 4

## Tag source files

In [7]:
def tag_file(info, header):
    try:
        meta = info.rstrip('\n').split('\t')
        f_obj = api.files.get(meta[0])
        metadata = {}
        for i in range(3, len(header), 1):
            metadata[header[i]] = meta[i]
        f_obj.metadata = metadata
        f_obj.save()
    except Exception as e:
        sys.stderr.write(str(e) + "\n")
        sys.stderr.write("Could not process " + info)
        exit(1)

In [8]:
project = "d3b-bixu/rs-vpf5jbc3-cov-irt-controlled-access-study"
manifest = open("/Users/brownm28/Documents/2020-Apr-8_RNAseq_snv_dev/manifests/covwc_to_tag.txt")
head = next(manifest)
header = head.rstrip("\n").split("\t")
x = 1
m = 250
with concurrent.futures.ThreadPoolExecutor(16) as executor:
    results = {executor.submit(tag_file, line, header): line for line in manifest}
    for result in concurrent.futures.as_completed(results):
        if x % m == 0:
            sys.stderr.write('Processed ' + str(x) + ' files\n')
            sys.stderr.flush()


No relevant changes were detected in order to update the resource on the server.
No relevant changes were detected in order to update the resource on the server.
Could not process 5eebb671e4b0a6d31133e684	COVWC-20200312-P2-E02-P.all-reads_Aligned.sortedByCoord.out.bam	d3b-bixu/rs-vpf5jbc3-cov-irt-controlled-access-study	COVWC-20200312-P2-E02-P	Positive
No relevant changes were detected in order to update the resource on the server.
Could not process 5eebb672e4b0a6d31133e6a7	COVWC-20200312-P2-E02-P.all-reads_ReadsPerGene.out.tab	d3b-bixu/rs-vpf5jbc3-cov-irt-controlled-access-study	COVWC-20200312-P2-E02-P	Positive
No relevant changes were detected in order to update the resource on the server.
Could not process 5eebb672e4b0a6d31133e693	COVWC-20200312-P2-E02-P.all-reads_Chimeric.out.junction	d3b-bixu/rs-vpf5jbc3-cov-irt-controlled-access-study	COVWC-20200312-P2-E02-P	Positive
No relevant changes were detected in order to update the resource on the server.
Could not process 5eebb673e4b0a6d

No relevant changes were detected in order to update the resource on the server.
Could not process 5eebb67ae4b0a6d31133e7d1	COVWC-20200313-P4-G01-N.all-reads_ReadsPerGene.out.tab	d3b-bixu/rs-vpf5jbc3-cov-irt-controlled-access-study	COVWC-20200313-P4-G01-N	Negative


## Set up GATK4 RNAseq WF Tasks

In [12]:
def get_gatk_refs(api, project):
    try:
        ref_dict = {}
        ref_dict['reference_fasta'] = api.files.get('5eebc5d1e4b0a6d311357eb9')
        ref_dict['reference_dict'] = api.files.get('5eecb14ae4b0efd899f474da')
        known_sites = []
        known_sites.append(api.files.get('5eecd8c3e4b0efd899f4ae44'))
        known_sites.append(api.files.get('5eecd846e4b0efd899f4ae27'))
        known_sites.append(api.files.get('5eecd846e4b0efd899f4ae26'))
        known_sites.append(api.files.get('5eecd846e4b0efd899f4ae21'))
        ref_dict['knownsites'] = known_sites
        ref_dict['call_bed_file'] = api.files.get('5eebdeece4b0efd899f43eaa')
        ref_dict['dbsnp_vcf'] = api.files.get('5eecd846e4b0efd899f4ae24')
        ref_dict['tool_name'] = 'STAR_GATK4'
    except Exception as e:
        sys.stderr.write(str(e) + "\nFailed to get REFS\n")
        exit(1)
    return ref_dict

In [13]:
def draft_task(in_file):
    try:
        input_dict = {}
        for key in ref_obj:
            input_dict[key] = ref_obj[key]
        info = in_file.rstrip('\n').split('\t')
        input_dict['STAR_sorted_genomic_bam'] = api.files.get(info[0])
        task_name = "GATK RNAseq SNV: " + info[3]
        task = api.tasks.create(name=task_name, project=project, app=app_name, inputs=input_dict, run=False)
        task.inputs['output_basename'] = task.id
        task.save()
    except Exception as e:
        sys.stderr.write(str(e) + "\nfailed to set up task for " + in_file)
        exit(1)

In [15]:
project = 'd3b-bixu/rs-vpf5jbc3-cov-irt-controlled-access-study'
app_name = project + "/d3b-gatk-rnaseq-snv-wf"
manifest = open("/Users/brownm28/Documents/2020-Apr-8_RNAseq_snv_dev/manifests/bams_for_gatk_to_run.tsv")
head = next(manifest)
ref_obj = get_gatk_refs(api, project)
x = 1
m = 250
with concurrent.futures.ThreadPoolExecutor(16) as executor:
    results = {executor.submit(draft_task, line ): line for line in manifest}
    for result in concurrent.futures.as_completed(results):
        if x % m == 0:
            sys.stderr.write('Processed ' + str(x) + ' tasks\n')
            sys.stderr.flush()
