Permalink
Switch branches/tags
Nothing to show
Find file
Fetching contributors…
Cannot retrieve contributors at this time
executable file 268 lines (212 sloc) 9.23 KB
#!/usr/bin/env python
from __future__ import division
from AutoADAG import *
from Pegasus.DAX3 import *
import sys
import math
import os
import re
import subprocess
import ConfigParser
base_dir = os.getcwd()
run_id = sys.argv[1]
run_dir = sys.argv[2]
data_dir = sys.argv[3]
# read the config file
conf = ConfigParser.SafeConfigParser()
r = conf.read("osg-gem.conf")
if len(r) != 1:
print("ERROR: Unable to read osg-gem.conf!")
sys.exit(1)
# validate the settings in the config file
if not conf.getboolean("config", "hisat2") and not conf.getboolean("config", "tophat2") and not conf.getboolean("config", "star"):
print("Please enable either hisat2 or tophat2 or star in the config file")
sys.exit(1)
if conf.getboolean("config", "hisat2") and conf.getboolean("config", "tophat2") and conf.getboolean("config", "star"):
print("Please enable only one of hisat2 or tophat2 or star in the config file")
sys.exit(1)
if conf.getboolean("config", "hisat2") and conf.getboolean("config", "tophat2") and not conf.getboolean("config", "star"):
print("Please enable only one of hisat2 and tophat2 in the config file")
sys.exit(1)
if conf.getboolean("config", "hisat2") and not conf.getboolean("config", "tophat2") and conf.getboolean("config", "star"):
print("Please enable only one of hisat2 and star in the config file")
sys.exit(1)
if not conf.getboolean("config", "hisat2") and conf.getboolean("config", "tophat2") and conf.getboolean("config", "star"):
print("Please enable only one of tophat2 and star in the config file")
sys.exit(1)
if not conf.getboolean("config", "stringtie") and not conf.getboolean("config", "cufflinks"):
print("Please enable either stringtie or cufflinks in the config file")
sys.exit(1)
if conf.getboolean("config", "stringtie") and conf.getboolean("config", "cufflinks"):
print("Please enable only one of stringtie and cufflinks in the config file")
sys.exit(1)
# making sure the user specified a reference set
print("\nAdding reference files ...")
count = 0
if conf.getboolean("config", "star"):
for fname in os.listdir(base_dir + "/reference/star_index/"):
print(" " + fname)
count += 1
if count == 0:
print("ERROR: Unable to find reference files in the reference/star_index directory" +
" with the specified prefix: " + conf.get("reference", "reference_prefix"))
sys.exit(1)
print("\nAdding gff3 file ...")
count = 0
for fname in os.listdir(base_dir + "/reference/star_index"):
if re.search("\.gff3$", fname):
print(" " + fname)
count += 1
if count != 1:
print("ERROR: Unable to find one, and only one, gff3 file in the reference/ directory")
sys.exit(1)
else:
print("\nAdding reference files ...")
count = 0
for fname in os.listdir(base_dir + "/reference/"):
if re.search("^" + conf.get("reference", "reference_prefix"), fname):
print(" " + fname)
count += 1
if count == 0:
print("ERROR: Unable to find reference files in the reference/ directory" +
" with the specified prefix: " + conf.get("reference", "reference_prefix"))
sys.exit(1)
print("\nAdding gff3 file ...")
count = 0
for fname in os.listdir(base_dir + "/reference/"):
if re.search("\.gff3$", fname):
print(" " + fname)
count += 1
if count != 1:
print("ERROR: Unable to find one, and only one, gff3 file in the reference/ directory")
sys.exit(1)
# Create a abstract dag
dax = AutoADAG("gem")
# email notificiations for when the state of the workflow changes
dax.invoke('all', base_dir + "/email-notify")
# Add executables to the DAX-level replica catalog
for exe_name in os.listdir("./tools/"):
exe = Executable(name=exe_name, arch="x86_64", installed=False)
exe.addPFN(PFN("file://" + base_dir + "/tools/" + exe_name, "local"))
dax.addExecutable(exe)
subdax_file = File("level-2.dax")
subdax_file.addPFN(PFN("file://%s/workflow/level-2.dax" % (run_dir), "local"))
dax.addFile(subdax_file)
print(" ")
prepare_jobs = []
input_id = 0
if conf.getboolean("config", "paired"):
for key, inputs in conf.items("inputs"):
input_id += 1
inputs = inputs.strip()
print("Found input: " + inputs)
# can either be from the filesystem, remote url, or from a SRA download job
forward_file = File("forward-" + str(input_id))
reverse_file = File("reverse-" + str(input_id))
dax.addFile(forward_file)
dax.addFile(reverse_file)
urls = inputs.split(" ")
if len(urls) == 2:
# user gave use forward/reverse files to use
# are the URLs local?
if not re.search(":", urls[0]):
urls[0] = "file://" + os.path.abspath(urls[0])
if not re.search(":", urls[1]):
urls[1] = "file://" + os.path.abspath(urls[1])
forward_file.addPFN(PFN(urls[0], "local"))
reverse_file.addPFN(PFN(urls[1], "local"))
else:
# SRA input, we need a job first do download and split the data
dl = Job(name="sra-download")
dl.uses(forward_file, link=Link.OUTPUT, transfer=False)
dl.uses(reverse_file, link=Link.OUTPUT, transfer=False)
dl.addArguments(inputs, forward_file, reverse_file, "paired")
#dl.addProfile(Profile("hints", "execution.site", "local"))
dl.addProfile(Profile("dagman", "CATEGORY", "sradownload"))
dax.addJob(dl)
# set up split jobs
split1 = Job(name="prepare-inputs")
split1.uses(forward_file, link=Link.INPUT)
split1.addArguments(base_dir, forward_file, data_dir + "/" + str(input_id), str(input_id) + "-forward")
split1.addProfile(Profile("hints", "execution.site", "local"))
dax.addJob(split1)
split2 = Job(name="prepare-inputs")
split2.uses(reverse_file, link=Link.INPUT)
split2.addArguments(base_dir, reverse_file , data_dir + "/" + str(input_id), str(input_id) + "-reverse")
split2.addProfile(Profile("hints", "execution.site", "local"))
dax.addJob(split2)
prepare_jobs.append(split1)
prepare_jobs.append(split2)
numsamples = str(input_id)
elif conf.getboolean("config", "single"):
for key, inputs in conf.items("inputs"):
input_id += 1
inputs = inputs.strip()
print("Found input: " + inputs)
# can either be from the filesystem, remote url, or from a SRA download job
forward_file = File("forward-" + str(input_id))
dax.addFile(forward_file)
urls = inputs.split(" ")
if len(urls) == 1 and "fastq" in str(urls):
# user gave use forward/reverse files to use
# are the URLs local?
if not re.search(":", urls[0]):
urls[0] = "file://" + os.path.abspath(urls[0])
forward_file.addPFN(PFN(urls[0], "local"))
else:
# SRA input, we need a job first do download and split the data
dl = Job(name="sra-download")
dl.uses(forward_file, link=Link.OUTPUT, transfer=False)
dl.addArguments(inputs, forward_file, "single")
#dl.addProfile(Profile("hints", "execution.site", "local"))
dl.addProfile(Profile("dagman", "CATEGORY", "sradownload"))
dax.addJob(dl)
# set up split jobs
split1 = Job(name="prepare-inputs")
split1.uses(forward_file, link=Link.INPUT)
split1.addArguments(base_dir, forward_file, data_dir + "/" + str(input_id), str(input_id) + "-forward")
split1.addProfile(Profile("hints", "execution.site", "local"))
dax.addJob(split1)
prepare_jobs.append(split1)
numsamples = str(input_id)
# generate sub workflow
j2 = Job(name="dax-level-2")
j2.addArguments(run_id, base_dir, run_dir)
j2.addProfile(Profile("hints", "execution.site", "local"))
dax.addJob(j2)
for job in prepare_jobs:
dax.depends(parent=job, child=j2)
# sub workflow job
j3 = DAX("level-2.dax")
# are we on OSG Connect?
if os.path.exists("/stash2"):
j3.addArguments("-Dpegasus.catalog.site.file=%s/sites.xml" % (run_dir),
"--sites", "condorpool",
"--staging-site", "stash",
"--output-site", "local",
"--basename", "level-2",
"--force",
"--cleanup", "none")
else:
j3.addArguments("-Dpegasus.catalog.site.file=%s/sites.xml" % (run_dir),
"--sites", "jetstream-condorpool",
"--staging-site", "jetstream-staging",
"--output-site", "local",
"--basename", "level-2",
"--force",
"--cleanup", "none")
j3.uses(subdax_file, link=Link.INPUT)
dax.addDAX(j3)
dax.depends(parent=j2, child=j3)
# organize output files for input into ballgown
if conf.getboolean("config", "stringtie"):
outpath = str(run_dir) + '/outputs'
gown = Job(name="ballgown-wrapper")
gown.addArguments(outpath, numsamples)
gown.addProfile(Profile("hints", "execution.site", "local"))
dax.addJob(gown)
dax.depends(parent=j3, child=gown)
# Write the DAX to stdout
f = open("dax.xml", "w")
dax.writeXML(f)
f.close()