/
Snakefile.count_dups
executable file
·191 lines (154 loc) · 6.6 KB
/
Snakefile.count_dups
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
#!/bin/bash
# vim: ft=python
# Execution script for count_well_duplicates.py based on Snakemake.
# Requires Snakemake in your default PATH, and the well_duplicates Python scripts
# to be in the same dir as this script or in your default PATH (remember
# temporary settings of $PATH don't transfer if you spawn jobs over the cluster).
# If $DATADIR is set, reads from that folder, else CWD
# If $WORKDIR is set, works in that folder, else
# ../../runqc/WellDuplicates/`basename $PWD`
# You don't need a cluster environment to run this, but in any case you'll need
# to set $CLUSTER_QUEUE to something, even if you set it to "none".
# Contents >>>
# + Embedded BASH script to bootstrap the workflow
# + Initialisation and configuration
# + Helper functions
# + The rules specific to this workflow
# + More generic rules
"""true" ### Begin shell script part
set -e ; set -u ; set -x
threads=${SNAKE_THREADS:-8}
#You have to set CLUSTER_QUEUE. There is no default now!
#Normally at EG it will be "casava"
#It can be "none" if you want to just run on the local machine.
queue=${CLUSTER_QUEUE}
datadir="${DATADIR:-`pwd`}"
#Set SNAKE_RERUN=1 if you want to enable re-running on an existing workdir
rerun=${SNAKE_RERUN:-0}
#Don't run until there is a RTARead1Complete.txt touch file,
#and a RunInfo.xml
test -e "$datadir"/RTARead1Complete.txt
test -e "$datadir"/Data/Intensities/s.locs
test -e "$datadir"/RunInfo.xml
workdir="${WORKDIR:-$datadir/../../runqc/WellDuplicates/`basename $datadir`}"
workdir="`readlink -f "$workdir"`"
#Make $workdir and in the process ensure the script only runs once.
#Link the $datadir back to the $workdir
if [ "$rerun" != 0 ] ; then set +e ; fi
mkdir "$workdir"
ln -sr "$datadir" "$workdir/datadir"
#Before changing dir, get the real path to this file on the assumption that
#count_well_duplicates.py may well be there.
#I can't just prepend it to the PATH here as it won't carry across to cluster jobs.
scriptdir="`dirname $0`"
## And off we go.
cd /tmp
if [ "${queue}" = none ] ; then
set +e
snakemake -s "$0" -j 1 --config workdir="$workdir" scriptdir="$scriptdir" -- "$@"
else
## Ensure the cluster output is going to the right place.
mkdir -p "$workdir"/sge_output
set +e
#Annoyingly I'm getting intermittent failures, so enable 3 retries as a crude
#workaround.
for try in 1 2 3 ; do
snakemake \
-s "$0" -j $threads -T \
--config workdir="$workdir" scriptdir="$scriptdir" \
-p --jobname "{rulename}.snakejob.{jobid}.sh" \
--drmaa " -q $queue -S /bin/bash -p -10 -V \
-o "$workdir"/sge_output -e "$workdir"/sge_output \
" \
-- "$@"
done
fi
"exit""" ### End of shell script part
#!/usr/bin/env snakemake
from snakemake.utils import format
import xml.etree.ElementTree as ET
#Regular glob() is useful but it can be improved like so.
import os
from glob import glob as _glob
glob = lambda pathname: sorted(_glob(os.path.expanduser(pathname)))
workdir: config['workdir']
#Configuration options as constants
TARGETS_TO_SAMPLE = 2500
#LANES_TO_SAMPLE = "1 2 3 4 5 6 7 8" #see below
READ_LENGTH = 50
LEVELS_TO_SCAN = 5
REPORT_VERBOSE = True
### Calculate some derived options
#Find the scripts if they are in the same folder as this one,
#even if it's not in the default PATH.
if 'scriptdir' in config:
_PATHSET = 'PATH=\'%s\'":$PATH" ' % config['scriptdir']
else:
_PATHSET = ''
PREP_INDICES =_PATHSET + "prepare_cluster_indexes.py"
COUNT_WELL_DUPL =_PATHSET + "count_well_duplicates.py"
#Get the run info
run_info_root = ET.parse("datadir/RunInfo.xml").getroot()
# Machine type is only used to determine the highest tile, so I'll make it
# that you can just pass the tile number directly. Note the tiles in the XML
# are not in order so I can't just take the last one!
LAST_LANE, LAST_TILE = max(te.text for te in run_info_root.findall(".//Tiles/Tile"))
# Lanes to sample is now variable sinde the arrival of Novaseq, so get it from
# RunInfo.xml...
LANES_TO_SAMPLE = range(1, int(LAST_LANE) + 1)
# For most runs we want to start at read 20, but some runs only have 51
# cycles in read1.
num_cycles = int(run_info_root.find("Run/Reads/Read[@Number='1']").get('NumCycles'))
if(num_cycles > READ_LENGTH + 20):
START_POS = 20
else:
assert num_cycles > READ_LENGTH
START_POS = 0
END_POS = READ_LENGTH + START_POS
### Specific rules
localrules: main, summarize_all_lanes
"""Main rule just defines everything to be generated.
The shell script should have made me a new working folder with datadir
being a symlink to the sequencer output directory.
"""
rule main:
input: format("{TARGETS_TO_SAMPLE}targets_all_lanes.txt")
rule summarize_all_lanes:
output: "{targets}targets_all_lanes.txt"
input:
expand( "{{targets}}targets_lane{lane}.txt",
lane=LANES_TO_SAMPLE )
shell: "tail -n $(( {LEVELS_TO_SCAN} + 1 )) {input} > {output}"
rule count_well_dupl:
output: "{targets}targets_lane{lane}.txt"
input: targfile = "{targets}clusters.list"
params: summary = '-S' if not REPORT_VERBOSE else ''
shell:
"{COUNT_WELL_DUPL} -f {input.targfile} -n {wildcards.targets} -s {LAST_TILE} -r datadir" +
" -i {wildcards.lane} -l {LEVELS_TO_SCAN} --cycles {START_POS}-{END_POS}" +
" {params.summary} > {output}"
rule prep_indices:
output: "{targets}clusters.list"
run:
#We don't want to re-calculate indices every time, but we don't
#want to assume to locs files are all identical. So let's have
#a shared pool of cluster lists based on the md5sum of the s.locs
slocs = "datadir/Data/Intensities/s.locs"
if os.path.exists("../cluster_lists"):
md5, = [ l.split()[0] for l in
shell("md5sum {slocs}", iterable=True) ]
cached_list = format("../cluster_lists/{wildcards.targets}clusters_{md5}.list")
if not os.path.exists(format("{cached_list}.done")):
#Make it now. Slight paranoia regarding race condition on the file.
#If two processes try to generate the .list file at once then one should
#at least fail with a modicum of grace. If PREP_INDICES fails for
#some reason, future jobs will fail until you fix or delete the partial output.
shell("set -o noclobber ;" +
" {PREP_INDICES} -n {wildcards.targets} -f {slocs} > {cached_list}")
shell("touch {cached_list}.done")
shell("ln -sr {cached_list} {output}")
else:
#No-cache mode it is, then
shell("{PREP_INDICES} -n {wildcards.targets} -f {slocs} > {output}")
### Generic rules
# none