Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also .

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also .
Choose a Base Repository
Choose a base branch
...
Choose a Head Repository
Choose a head branch
Checking mergeability… Don’t worry, you can still create the pull request.
  • 11 commits
  • 16 files changed
  • 0 commit comments
  • 3 contributors
View
@@ -7,6 +7,10 @@ task-files/hisat2/*.ht2
reference/*.fa
reference/*.bt2
reference/*.ht2
+reference/*.txt
+reference/*.gtf
tools/AutoADAG.pyc
osg-gem.conf
+staging
+runs
Binary file not shown.
View
@@ -16,11 +16,14 @@ reference_prefix = chr21-GRCh38
# input2 = DRR046893
#
-input1 = ./Test_data/TEST_1.fastq.gz ./Test_data/TEST_2.fastq.gz
-input2 = DRR046893
+input1 = ./Test_data/SRR4343300.fastq.gz
[config]
+# Memory available to the jobs. This should be roughly 2X the
+# size of the reference genome, rounded up whole GB
+memory = 8 GB
+
# process using TopHat2
tophat2 = False
View
@@ -16,8 +16,8 @@
<site handle="stash" arch="x86_64" os="LINUX">
<directory type="shared-scratch" path="/stash2/user/${USER}/public">
- <!-- file-server operation="get" url="http://stash.osgconnect.net/~${USER}"/ -->
- <file-server operation="get" url="stash:///user/${USER}/public"/>
+ <file-server operation="get" url="http://stash.osgconnect.net/~${USER}"/>
+ <!-- file-server operation="get" url="stash:///user/${USER}/public"/ -->
<file-server operation="put" url="scp://${USER}@login02.osgconnect.net/stash2/user/${USER}/public"/>
</directory>
</site>
@@ -31,8 +31,21 @@
<profile namespace="condor" key="+WantsStashCache" >True</profile>
<!-- disable OSG squid caching for now -->
- <!-- profile namespace="env" key="http_proxy" ></profile>
- <profile namespace="env" key="OSG_SQUID_LOCATION" ></profile -->
+ <profile namespace="env" key="http_proxy" ></profile>
+ <profile namespace="env" key="OSG_SQUID_LOCATION" ></profile>
+ </site>
+
+ <site handle="jetstream-staging" arch="x86_64" os="LINUX">
+ <directory type="shared-scratch" path="${PWD}/staging">
+ <file-server operation="all" url="scp://${USER}@${HOSTNAME}${PWD}/staging"/>
+ </directory>
+ </site>
+
+ <site handle="jetstream-condorpool" arch="x86_64" os="LINUX">
+ <profile namespace="pegasus" key="style" >condor</profile>
+ <profile namespace="condor" key="universe" >vanilla</profile>
+ <profile namespace="condor" key="request_memory" >2 GB</profile>
+ <profile namespace="condor" key="request_disk" >1 GB</profile>
</site>
</sitecatalog>
View
122 submit
@@ -2,49 +2,89 @@
set -e
-# needed for stashcp to be picked up the site catalog for the local site
-module load xrootd
-module load stashcp
+# are we on OSG Connect?
+if [ -e /stash2 ]; then
-export PATH=/home/rynge/software/pegasus-4.7.0dev/bin:$PATH
-#export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/home/rynge/software/pegasus-4.7.0dev/bin
+ # needed for stashcp to be picked up the site catalog for the local site
+ module load xrootd
+ module load stashcp
+
+ export PATH=/home/rynge/software/pegasus-4.7.0dev/bin:$PATH
+
+ TOPDIR=`pwd`
+
+ export RUN_ID=osg-gem-`date +'%s'`
+
+ export RUN_DIR=/local-scratch/$USER/workflows/$RUN_ID
+ mkdir -p $RUN_DIR/scratch/$RUN_ID/level-2
+ # make the data availabile over http
+ mkdir -p /stash2/user/$USER/public/$RUN_ID/data
+ ln -s /stash2/user/$USER/public/$RUN_ID/data $RUN_DIR/data
+
+ # generate the site catalog
+ SC=$RUN_DIR/sites.xml
+ envsubst <sites.xml.template >$SC
+
+ # generate the dax
+ export PYTHONPATH=`pegasus-config --python`
+ ./tools/dax-level-1 $RUN_ID $RUN_DIR $RUN_DIR/data
+
+ echo
+ echo "An 'Output's directory will be created within the base of the workflow directory."
+ echo "This directory, $RUN_DIR/outputs"
+ echo "will have a 'merged_GEM.tab' file, an expression vector for each individual file,"
+ echo "and all standard output files from trimmomatic/hisat2 jobs."
+
+ # plan and submit the workflow
+ echo
+ pegasus-plan \
+ -Dpegasus.catalog.site.file=$SC \
+ --conf pegasus.conf \
+ --relative-dir $RUN_ID \
+ --sites condorpool \
+ --staging-site stash \
+ --output-site local \
+ --dir $RUN_DIR/workflow \
+ --dax dax.xml \
+ --submit
+else
+ # jetstream
-TOPDIR=`pwd`
-
-export RUN_ID=osg-gem-`date +'%s'`
-
-export RUN_DIR=/local-scratch/$USER/workflows/$RUN_ID
-mkdir -p $RUN_DIR/scratch/$RUN_ID/level-2
-# make the data availabile over http
-mkdir -p /stash2/user/$USER/public/$RUN_ID/data
-ln -s /stash2/user/$USER/public/$RUN_ID/data $RUN_DIR/data
-
-# generate the site catalog
-SC=$RUN_DIR/sites.xml
-envsubst <sites.xml.template >$SC
-
-# generate the dax
-export PYTHONPATH=`pegasus-config --python`
-./tools/dax-level-1 $RUN_ID $RUN_DIR $RUN_DIR/data
-
-echo
-echo "An 'Output's directory will be created within the base of the workflow directory."
-echo "This directory, $RUN_DIR/outputs"
-echo "will have a 'merged_GEM.tab' file, an expression vector for each individual file,"
-echo "and all standard output files from trimmomatic/hisat2 jobs."
-
-# plan and submit the workflow
-echo
-pegasus-plan \
- -Dpegasus.catalog.site.file=$SC \
- --conf pegasus.conf \
- --relative-dir $RUN_ID \
- --sites condorpool \
- --staging-site stash \
- --output-site local \
- --dir $RUN_DIR/workflow \
- --dax dax.xml \
- --submit
+ TOPDIR=`pwd`
+
+ export RUN_ID=osg-gem-`date +'%s'`
+
+ export RUN_DIR=$PWD/runs/$RUN_ID
+ mkdir -p $RUN_DIR/scratch/$RUN_ID/level-2
+ mkdir -p $RUN_DIR/data
+
+ # generate the site catalog
+ SC=$RUN_DIR/sites.xml
+ envsubst <sites.xml.template >$SC
+
+ # generate the dax
+ export PYTHONPATH=`pegasus-config --python`
+ ./tools/dax-level-1 $RUN_ID $RUN_DIR $RUN_DIR/data
+
+ echo
+ echo "An 'Output's directory will be created within the base of the workflow directory."
+ echo "This directory, $RUN_DIR/outputs"
+ echo "will have a 'merged_GEM.tab' file, an expression vector for each individual file,"
+ echo "and all standard output files from trimmomatic/hisat2 jobs."
+
+ # plan and submit the workflow
+ echo
+ pegasus-plan \
+ -Dpegasus.catalog.site.file=$SC \
+ --conf pegasus.conf \
+ --relative-dir $RUN_ID \
+ --sites jetstream-condorpool \
+ --staging-site jetstream-staging \
+ --output-site local \
+ --dir $RUN_DIR/workflow \
+ --dax dax.xml \
+ --submit
+fi
Binary file not shown.
View
@@ -1,5 +1,10 @@
#!/bin/bash
+# module init required when running on non-OSG resources, and has to sourced
+# before set -e as sometimes it exits non-0 when a module environment is
+# already set up
+. /cvmfs/oasis.opensciencegrid.org/osg/sw/module-init.sh
+
set -e
module load java/8u25
View
@@ -1,5 +1,10 @@
#!/bin/bash
+# module init required when running on non-OSG resources, and has to sourced
+# before set -e as sometimes it exits non-0 when a module environment is
+# already set up
+. /cvmfs/oasis.opensciencegrid.org/osg/sw/module-init.sh
+
set -e
module load java/8u25
View
@@ -90,28 +90,22 @@ for key, inputs in conf.items("inputs"):
# can either be from the filesystem, remote url, or from a SRA download job
forward_file = File("forward-" + str(input_id))
- reverse_file = File("reverse-" + str(input_id))
dax.addFile(forward_file)
- dax.addFile(reverse_file)
urls = inputs.split(" ")
- if len(urls) == 2:
+ if len(urls) == 1 and "fastq" in str(urls):
# user gave use forward/reverse files to use
# are the URLs local?
if not re.search(":", urls[0]):
urls[0] = "file://" + os.path.abspath(urls[0])
- if not re.search(":", urls[1]):
- urls[1] = "file://" + os.path.abspath(urls[1])
forward_file.addPFN(PFN(urls[0], "local"))
- reverse_file.addPFN(PFN(urls[1], "local"))
else:
# SRA input, we need a job first do download and split the data
dl = Job(name="sra-download")
dl.uses(forward_file, link=Link.OUTPUT, transfer=False)
- dl.uses(reverse_file, link=Link.OUTPUT, transfer=False)
- dl.addArguments(inputs, forward_file, reverse_file)
+ dl.addArguments(inputs, forward_file)
#dl.addProfile(Profile("hints", "execution.site", "local"))
dl.addProfile(Profile("dagman", "CATEGORY", "sradownload"))
dax.addJob(dl)
@@ -123,14 +117,8 @@ for key, inputs in conf.items("inputs"):
split1.addProfile(Profile("hints", "execution.site", "local"))
dax.addJob(split1)
- split2 = Job(name="prepare-inputs")
- split2.uses(reverse_file, link=Link.INPUT)
- split2.addArguments(base_dir, reverse_file , data_dir + "/" + str(input_id), str(input_id) + "-reverse")
- split2.addProfile(Profile("hints", "execution.site", "local"))
- dax.addJob(split2)
prepare_jobs.append(split1)
- prepare_jobs.append(split2)
# generate sub workflow
j2 = Job(name="dax-level-2")
@@ -142,13 +130,23 @@ for job in prepare_jobs:
# sub workflow job
j3 = DAX("level-2.dax")
-j3.addArguments("-Dpegasus.catalog.site.file=%s/sites.xml" % (run_dir),
- "--sites", "condorpool",
- "--staging-site", "stash",
- "--output-site", "local",
- "--basename", "level-2",
- "--force",
- "--cleanup", "none")
+# are we on OSG Connect?
+if os.path.exists("/stash2"):
+ j3.addArguments("-Dpegasus.catalog.site.file=%s/sites.xml" % (run_dir),
+ "--sites", "condorpool",
+ "--staging-site", "stash",
+ "--output-site", "local",
+ "--basename", "level-2",
+ "--force",
+ "--cleanup", "none")
+else:
+ j3.addArguments("-Dpegasus.catalog.site.file=%s/sites.xml" % (run_dir),
+ "--sites", "jetstream-condorpool",
+ "--staging-site", "jetstream-staging",
+ "--output-site", "local",
+ "--basename", "level-2",
+ "--force",
+ "--cleanup", "none")
j3.uses(subdax_file, link=Link.INPUT)
dax.addDAX(j3)
dax.depends(parent=j2, child=j3)
View
@@ -95,7 +95,7 @@ def add_gff3_file(dax, job):
return rf
-def hisat2(task_files, dax, base_name, part, common_part, forward_file, reverse_file):
+def hisat2(task_files, dax, base_name, part, common_part, forward_file):
# Add job
j = Job(name="hisat2")
@@ -108,7 +108,6 @@ def hisat2(task_files, dax, base_name, part, common_part, forward_file, reverse_
add_task_files(task_files, dax, j, "hisat2")
add_ref_files(dax, j, conf.get("reference", "reference_prefix"))
j.uses(forward_file, link=Link.INPUT)
- j.uses(reverse_file, link=Link.INPUT)
# output files
f1 = File(base_name + "-" + common_part + "-accepted_hits.bam")
j.uses(f1, link=Link.OUTPUT, transfer=False)
@@ -123,7 +122,7 @@ def hisat2(task_files, dax, base_name, part, common_part, forward_file, reverse_
return f1
-def tophat(task_files, dax, base_name, part, common_part, forward_file, reverse_file):
+def tophat(task_files, dax, base_name, part, common_part, forward_file):
# Add job
j = Job(name="tophat")
@@ -136,7 +135,6 @@ def tophat(task_files, dax, base_name, part, common_part, forward_file, reverse_
add_task_files(task_files, dax, j, "tophat")
add_ref_files(dax, j, conf.get("reference", "reference_prefix"))
j.uses(forward_file, link=Link.INPUT)
- j.uses(reverse_file, link=Link.INPUT)
# output files
f1 = File(base_name + "-" + common_part + "-accepted_hits.bam")
j.uses(f1, link=Link.OUTPUT, transfer=False)
@@ -222,16 +220,25 @@ def main():
# Create a abstract dag
dax = AutoADAG("level-2")
+ # memory requirements
+ memory = "2G"
+ if conf.has_option("config", "memory"):
+ memory = conf.get("config", "memory")
+
# Add executables to the DAX-level replica catalog
for exe_name in os.listdir(base_dir + "/tools/"):
exe = Executable(name=exe_name, arch="x86_64", installed=False)
exe.addPFN(PFN("file://" + base_dir + "/tools/" + exe_name, "local"))
if exe_name == "tophat" or exe_name == "hisat2":
exe.addProfile(Profile(Namespace.PEGASUS, "clusters.size", 5))
+ exe.addProfile(Profile(Namespace.CONDOR, "request_memory", memory))
dax.addExecutable(exe)
- # has to be the one under public/
- data_dir = "/stash2/user/" + getpass.getuser() + "/public/" + run_id + "/data"
+ # on OSG Connect, has to be the one under public/
+ if os.path.exists("/stash2"):
+ data_dir = "/stash2/user/" + getpass.getuser() + "/public/" + run_id + "/data"
+ else:
+ data_dir = run_dir + "/data"
# we need a bunch of workflows, and one merge/cuff for each base input
for base_name in os.listdir(data_dir):
@@ -249,27 +256,28 @@ def main():
if not "forward" in in_name:
continue
- # use stash urls for the data so we can bypass and grab it directly from
- # the jobs
- base_url = "http://stash.osgconnect.net/~" + getpass.getuser() + "/" + run_id + \
- "/data/" + base_name + "/" + part
+ # the jobs (OSG Connect uses stash)
+ if os.path.exists("/stash2"):
+ base_url = "http://stash.osgconnect.net/~" + getpass.getuser() + "/" + run_id + \
+ "/data/" + base_name + "/" + part
+ site_name = "stash"
+ else:
+ base_url = "file://" + run_dir + "/data/" + base_name + "/" + part
+ site_name = "local"
common_part = in_name
common_part = re.sub(".*-forward\-", "", common_part)
common_part = re.sub("\.gz", "", common_part)
for_file = File(base_name + "-forward-" + common_part)
- for_file.addPFN(PFN(base_url + "/" + base_name + "-forward-" + common_part, "stash"))
+ for_file.addPFN(PFN(base_url + "/" + base_name + "-forward-" + common_part, site_name))
dax.addFile(for_file)
- rev_file = File(base_name + "-reverse-" + common_part)
- rev_file.addPFN(PFN(base_url + "/" + base_name + "-reverse-" + common_part, "stash"))
- dax.addFile(rev_file)
if conf.getboolean("config", "hisat2"):
- out_file = hisat2(task_files, dax, base_name, part, common_part, for_file, rev_file)
+ out_file = hisat2(task_files, dax, base_name, part, common_part, for_file)
else:
- out_file = tophat(task_files, dax, base_name, part, common_part, for_file, rev_file)
+ out_file = tophat(task_files, dax, base_name, part, common_part, for_file)
accepted_hits.append(out_file)
# merge
Oops, something went wrong.

No commit comments for this range