Permalink
Switch branches/tags
Nothing to show
Find file
Fetching contributors…
Cannot retrieve contributors at this time
executable file 509 lines (413 sloc) 17.4 KB
#!/usr/bin/env python
from __future__ import division
import getpass
import sys
import math
import os
import re
import subprocess
import ConfigParser
run_id = sys.argv[1]
base_dir = sys.argv[2]
run_dir = sys.argv[3]
os.sys.path.insert(0, base_dir + "/tools")
from AutoADAG import *
from Pegasus.DAX3 import *
# globals
ref_files = {}
gff3_files = {}
fpkm_files = []
log_files = []
# read the config file
conf = ConfigParser.SafeConfigParser()
r = conf.read(base_dir + "/osg-gem.conf")
if len(r) != 1:
print("ERROR: Unable to read osg-gem.conf!")
sys.exit(1)
def add_task_files(task_files, dax, job, task_name):
"""
add a set of input files from the task-files dir to a task
"""
if task_name not in task_files:
task_files[task_name] = {}
# add to DAX-level replica catalog
for fname in os.listdir(base_dir + "/task-files/" + task_name):
if fname not in task_files[task_name]:
print("Adding task file for " + task_name + ": " + fname)
task_files[task_name][fname] = File(fname)
task_files[task_name][fname].addPFN( \
PFN("file://" + base_dir + "/task-files/" + task_name + "/" + fname, "local"))
dax.addFile(task_files[task_name][fname])
for f in task_files[task_name]:
job.uses(f, link=Link.INPUT)
def add_ref_files(dax, job, ref_prefix):
"""
add a set of reference input files
"""
if conf.getboolean("config", "star"):
global ref_files
# add to DAX-level replica catalog
for fname in os.listdir(base_dir + "/reference/star_index/"):
if fname not in ref_files:
ref_files[fname] = File(fname)
ref_files[fname].addPFN( \
PFN("file://" + base_dir + "/reference/star_index/" + fname, "local"))
dax.addFile(ref_files[fname])
for f in ref_files:
job.uses(f, link=Link.INPUT)
else:
global ref_files
if ref_prefix not in ref_files:
ref_files[ref_prefix] = {}
# add to DAX-level replica catalog
for fname in os.listdir(base_dir + "/reference/"):
if (not re.search("^" + ref_prefix, fname)) or (re.search("\.gff3$", fname)):
# skip non-matching files
continue
if fname not in ref_files[ref_prefix]:
ref_files[ref_prefix][fname] = File(fname)
ref_files[ref_prefix][fname].addPFN( \
PFN("file://" + base_dir + "/reference/" + fname, "local"))
dax.addFile(ref_files[ref_prefix][fname])
for f in ref_files[ref_prefix]:
job.uses(f, link=Link.INPUT)
def add_gff3_file(dax, job):
"""
adds a gff3 to a job
"""
global gff3_files
rf = None
# add to DAX-level replica catalog
for fname in os.listdir(base_dir + "/reference/"):
if not re.search("\.gff3$", fname):
# skip non-matching files
continue
if fname not in gff3_files:
gff3_files[fname] = File(fname)
gff3_files[fname].addPFN( \
PFN("file://" + base_dir + "/reference/" + fname, "local"))
dax.addFile(gff3_files[fname])
for f in gff3_files:
job.uses(f, link=Link.INPUT)
# should only be one
rf = f
return rf
if conf.getboolean("config", "paired"):
def hisat2(task_files, dax, base_name, part, common_part, forward_file, reverse_file):
# Add job
j = Job(name="hisat2")
j.addArguments(getpass.getuser(), \
run_id, \
conf.get("reference", "reference_prefix"), \
base_name, \
part, \
common_part, "paired")
add_task_files(task_files, dax, j, "hisat2")
add_ref_files(dax, j, conf.get("reference", "reference_prefix"))
j.uses(forward_file, link=Link.INPUT)
j.uses(reverse_file, link=Link.INPUT)
# output files
f1 = File(base_name + "-" + common_part + "-accepted_hits.bam")
j.uses(f1, link=Link.OUTPUT, transfer=False)
#f2 = File(common_part + "-align_summary.txt")
#j.uses(f2, link=Link.OUTPUT, transfer=True)
f3 = File(base_name + "-" + common_part + "-out.txt")
log_files.append(f3)
j.uses(f3, link=Link.OUTPUT, transfer=False)
f4 = File(base_name + "-" + common_part + "-trimmomatic.txt")
j.uses(f4, link=Link.OUTPUT, transfer=False)
log_files.append(f4)
dax.addJob(j)
return f1
def tophat(task_files, dax, base_name, part, common_part, forward_file, reverse_file):
# Add job
j = Job(name="tophat")
j.addArguments(getpass.getuser(), \
run_id, \
conf.get("reference", "reference_prefix"), \
base_name, \
part, \
common_part, "paired")
add_task_files(task_files, dax, j, "tophat")
add_ref_files(dax, j, conf.get("reference", "reference_prefix"))
j.uses(forward_file, link=Link.INPUT)
j.uses(reverse_file, link=Link.INPUT)
# output files
f1 = File(base_name + "-" + common_part + "-accepted_hits.bam")
j.uses(f1, link=Link.OUTPUT, transfer=False)
f2 = File(base_name + "-" + common_part + "-align_summary.txt")
j.uses(f2, link=Link.OUTPUT, transfer=False)
log_files.append(f2)
f3 = File(base_name + "-" + common_part + "-out.txt")
j.uses(f3, link=Link.OUTPUT, transfer=False)
f4 = File(base_name + "-" + common_part + "-trimmomatic.txt")
j.uses(f4, link=Link.OUTPUT, transfer=True)
log_files.append(f4)
dax.addJob(j)
return f1
def star(task_files, dax, base_name, part, common_part, forward_file, reverse_file):
# Add job
j = Job(name="star")
j.addArguments(getpass.getuser(), \
run_id, \
conf.get("reference", "reference_prefix"), \
base_name, \
part, \
common_part, "paired")
add_task_files(task_files, dax, j, "star")
add_ref_files(dax, j, conf.get("reference", "reference_prefix"))
j.uses(forward_file, link=Link.INPUT)
j.uses(reverse_file, link=Link.INPUT)
# output files
f1 = File(base_name + "-" + common_part + "-accepted_hits.bam")
j.uses(f1, link=Link.OUTPUT, transfer=False)
f3 = File(base_name + "-" + common_part + "-out.txt")
log_files.append(f3)
l = File(base_name + "-" + common_part + "-Log.final.out")
j.uses(l, link=Link.OUTPUT, transfer=False)
log_files.append(l)
j.uses(f3, link=Link.OUTPUT, transfer=False)
f4 = File(base_name + "-" + common_part + "-trimmomatic.txt")
j.uses(f4, link=Link.OUTPUT, transfer=False)
log_files.append(f4)
dax.addJob(j)
return f1
elif conf.getboolean("config", "single"):
def hisat2(task_files, dax, base_name, part, common_part, forward_file):
# Add job
j = Job(name="hisat2")
j.addArguments(getpass.getuser(), \
run_id, \
conf.get("reference", "reference_prefix"), \
base_name, \
part, \
common_part, "single")
add_task_files(task_files, dax, j, "hisat2")
add_ref_files(dax, j, conf.get("reference", "reference_prefix"))
j.uses(forward_file, link=Link.INPUT)
# output files
f1 = File(base_name + "-" + common_part + "-accepted_hits.bam")
j.uses(f1, link=Link.OUTPUT, transfer=False)
#f2 = File(common_part + "-align_summary.txt")
#j.uses(f2, link=Link.OUTPUT, transfer=True)
f3 = File(base_name + "-" + common_part + "-out.txt")
log_files.append(f3)
j.uses(f3, link=Link.OUTPUT, transfer=False)
f4 = File(base_name + "-" + common_part + "-trimmomatic.txt")
log_files.append(f4)
j.uses(f4, link=Link.OUTPUT, transfer=False)
dax.addJob(j)
return f1
def tophat(task_files, dax, base_name, part, common_part, forward_file):
# Add job
j = Job(name="tophat")
j.addArguments(getpass.getuser(), \
run_id, \
conf.get("reference", "reference_prefix"), \
base_name, \
part, \
common_part, "single")
add_task_files(task_files, dax, j, "tophat")
add_ref_files(dax, j, conf.get("reference", "reference_prefix"))
j.uses(forward_file, link=Link.INPUT)
# output files
f1 = File(base_name + "-" + common_part + "-accepted_hits.bam")
j.uses(f1, link=Link.OUTPUT, transfer=False)
f2 = File(base_name + "-" + common_part + "-align_summary.txt")
log_files.append(f2)
j.uses(f2, link=Link.OUTPUT, transfer=False)
f3 = File(base_name + "-" + common_part + "-out.txt")
j.uses(f3, link=Link.OUTPUT, transfer=False)
f4 = File(base_name + "-" + common_part + "-trimmomatic.txt")
log_files.append(f4)
j.uses(f4, link=Link.OUTPUT, transfer=False)
dax.addJob(j)
return f1
def star(task_files, dax, base_name, part, common_part, forward_file):
# Add job
j = Job(name="star")
j.addArguments(getpass.getuser(), \
run_id, \
conf.get("reference", "reference_prefix"), \
base_name, \
part, \
common_part, "single")
add_task_files(task_files, dax, j, "star")
add_ref_files(dax, j, conf.get("reference", "reference_prefix"))
j.uses(forward_file, link=Link.INPUT)
# output files
f1 = File(base_name + "-" + common_part + "-accepted_hits.bam")
j.uses(f1, link=Link.OUTPUT, transfer=False)
f3 = File(base_name + "-" + common_part + "-out.txt")
log_files.append(f3)
j.uses(f3, link=Link.OUTPUT, transfer=False)
l = File(base_name + "-" + common_part + "-Log.final.out")
j.uses(l, link=Link.OUTPUT, transfer=False)
log_files.append(l)
f4 = File(base_name + "-" + common_part + "-trimmomatic.txt")
log_files.append(f4)
j.uses(f4, link=Link.OUTPUT, transfer=False)
dax.addJob(j)
return f1
def merge(dax, base_name, in_list):
if len(in_list) == 0:
print("Error - input list to merge is of 0 length!")
sys.exit(1)
# Add job
j = Job(name="merge")
j.addArguments(base_name)
for f in in_list:
j.uses(f, link=Link.INPUT)
f = File(base_name + "-merged.bam")
j.uses(f, link=Link.OUTPUT, transfer=False)
dax.addJob(j)
return f
def cuff(task_files, dax, base_name, merged):
global fpkm_files
# Add job
j = Job(name="cuff")
j.addArguments(base_name)
# inputs
add_task_files(task_files, dax, j, "cuff")
gff3 = add_gff3_file(dax, j)
j.addArguments(gff3)
j.uses(merged, link=Link.INPUT)
# outputs
f = File(base_name + "-merged_counts.fpkm")
fpkm_files.append(f)
j.uses(f, link=Link.OUTPUT, transfer=False)
dax.addJob(j)
return f
def stringtie(task_files, dax, base_name, merged):
global fpkm_files
# Add job
j = Job(name="StringTie")
j.addArguments(base_name)
# inputs
add_task_files(task_files, dax, j, "StringTie")
gff3 = add_gff3_file(dax, j)
j.addArguments(gff3)
j.uses(merged, link=Link.INPUT)
# outputs
f = File(base_name + "-merged_counts.fpkm")
fpkm_files.append(f)
t = File(base_name + "-t_data.ctab")
e = File(base_name + "-e_data.ctab")
i = File(base_name + "-i_data.ctab")
x = File(base_name + "-i2t.ctab")
z = File(base_name + "-e2t.ctab")
j.uses(e, link=Link.OUTPUT, transfer=True)
j.uses(i, link=Link.OUTPUT, transfer=True)
j.uses(f, link=Link.OUTPUT, transfer=False)
j.uses(t, link=Link.OUTPUT, transfer=True)
j.uses(x, link=Link.OUTPUT, transfer=True)
j.uses(z, link=Link.OUTPUT, transfer=True)
dax.addJob(j)
return f
def main():
task_files = {}
# Create a abstract dag
dax = AutoADAG("level-2")
# memory requirements
memory = "2G"
if conf.has_option("config", "memory"):
memory = conf.get("config", "memory")
# Add executables to the DAX-level replica catalog
for exe_name in os.listdir(base_dir + "/tools/"):
exe = Executable(name=exe_name, arch="x86_64", installed=False)
exe.addPFN(PFN("file://" + base_dir + "/tools/" + exe_name, "local"))
if exe_name == "tophat" or exe_name == "hisat2":
exe.addProfile(Profile(Namespace.PEGASUS, "clusters.size", 5))
exe.addProfile(Profile(Namespace.CONDOR, "request_memory", memory))
dax.addExecutable(exe)
# on OSG Connect, has to be the one under public/
if os.path.exists("/stash2"):
data_dir = "/stash2/user/" + getpass.getuser() + "/public/" + run_id + "/data"
else:
data_dir = run_dir + "/data"
# we need a bunch of workflows, and one merge/cuff for each base input
for base_name in os.listdir(data_dir):
parts = []
for part in os.listdir(data_dir + "/" + base_name):
my_data_dir = run_dir + "/data/" + base_name + "/" + part
accepted_hits = []
for in_name in os.listdir(my_data_dir):
# only need the forward file
if not "forward" in in_name:
continue
# use stash urls for the data so we can bypass and grab it directly from
# the jobs (OSG Connect uses stash)
if os.path.exists("/stash2"):
base_url = "http://stash.osgconnect.net/~" + getpass.getuser() + "/" + run_id + \
"/data/" + base_name + "/" + part
site_name = "stash"
else:
base_url = "file://" + run_dir + "/data/" + base_name + "/" + part
site_name = "local"
common_part = in_name
common_part = re.sub(".*-forward\-", "", common_part)
common_part = re.sub("\.gz", "", common_part)
for_file = File(base_name + "-forward-" + common_part)
for_file.addPFN(PFN(base_url + "/" + base_name + "-forward-" + common_part, site_name))
dax.addFile(for_file)
if conf.getboolean("config", "paired"):
rev_file = File(base_name + "-reverse-" + common_part)
rev_file.addPFN(PFN(base_url + "/" + base_name + "-reverse-" + common_part, site_name))
dax.addFile(rev_file)
if conf.getboolean("config", "hisat2") and conf.getboolean("config", "paired"):
out_file = hisat2(task_files, dax, base_name, part, common_part, for_file, rev_file)
elif conf.getboolean("config", "hisat2") and conf.getboolean("config", "single"):
out_file = hisat2(task_files, dax, base_name, part, common_part, for_file)
elif conf.getboolean("config", "tophat2") and conf.getboolean("config", "paired"):
out_file = tophat(task_files, dax, base_name, part, common_part, for_file, rev_file)
elif conf.getboolean("config", "tophat2") and conf.getboolean("config", "single"):
out_file = tophat(task_files, dax, base_name, part, common_part, for_file)
elif conf.getboolean("config", "star") and conf.getboolean("config", "paired"):
out_file = star(task_files, dax, base_name, part, common_part, for_file, rev_file)
elif conf.getboolean("config", "star") and conf.getboolean("config", "single"):
out_file = star(task_files, dax, base_name, part, common_part, for_file)
accepted_hits.append(out_file)
# merge
merged = merge(dax, base_name + "-" + part, accepted_hits)
parts.append(part)
# second level merge
accepted_hits = []
for part in parts:
f = File(base_name + "-" + part + "-merged.bam")
accepted_hits.append(f)
merged = merge(dax, base_name, accepted_hits)
if conf.getboolean("config", "cufflinks"):
# cuff
cuff(task_files, dax, base_name, merged)
else:
# StringTie
stringtie(task_files, dax, base_name, merged)
# merge all the fpkms
j = Job(name="gem-merge")
if conf.getboolean("config", "hisat2"):
j.addArguments("hisat2")
elif conf.getboolean("config", "tophat2"):
j.addArguments("tophat2")
elif conf.getboolean("config", "star"):
j.addArguments("star")
if conf.getboolean("config", "paired"):
j.addArguments("paired")
elif conf.getboolean("config", "single"):
j.addArguments("single")
# inputs
add_task_files(task_files, dax, j, "gem-merge")
for f in fpkm_files:
j.uses(f, link=Link.INPUT)
for g in log_files:
j.uses(g, link=Link.INPUT)
# outputs
f = File("merged_GEM.tab")
g = File("QC_Report.tab")
j.uses(f, link=Link.OUTPUT, transfer=True)
j.uses(g, link=Link.OUTPUT, transfer=True)
dax.addJob(j)
# Write the DAX
f = open(run_dir + "/workflow/level-2.dax", "w")
dax.writeXML(f)
f.close()
main()