-
Notifications
You must be signed in to change notification settings - Fork 17
/
Bedtools.scala
193 lines (160 loc) · 7.16 KB
/
Bedtools.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
/**
* Licensed to Big Data Genomics (BDG) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The BDG licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.bdgenomics.cannoli.cli
import org.apache.hadoop.fs.{ FileSystem, Path }
import org.apache.spark.SparkContext
import org.bdgenomics.adam.projections.{ FeatureField, Projection }
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.rdd.ADAMSaveAnyArgs
import org.bdgenomics.adam.rdd.feature.{
FeatureRDD,
BEDInFormatter,
BEDOutFormatter
}
import org.bdgenomics.adam.sql.{ Feature => FeatureProduct }
import org.bdgenomics.cannoli.builder.CommandBuilders
import org.bdgenomics.formats.avro.Feature;
import org.bdgenomics.utils.cli._
import org.bdgenomics.utils.misc.Logging
import org.kohsuke.args4j.{ Argument, Option => Args4jOption }
import scala.collection.JavaConversions._
/**
* Bedtools function arguments.
*/
class BedtoolsFnArgs extends Args4jBase {
@Args4jOption(required = false, name = "-a", usage = "Bedtools intersect -a option. One of {-a,-b} should be left unspecified to accept piped input.")
var a: String = null
@Args4jOption(required = false, name = "-b", usage = "Bedtools intersect -b option. One of {-a,-b} should be left unspecified to accept piped input.")
var b: String = null
@Args4jOption(required = false, name = "-sorted", usage = "Bedtools intersect -sorted option. Inputs must be sorted by chromosome and then by start position.")
var sorted: Boolean = false
@Args4jOption(required = false, name = "-executable", usage = "Path to the Bedtools executable. Defaults to bedtools.")
var executable: String = "bedtools"
@Args4jOption(required = false, name = "-image", usage = "Container image to use. Defaults to quay.io/biocontainers/bedtools:2.27.1--0.")
var image: String = "quay.io/biocontainers/bedtools:2.27.1--0"
@Args4jOption(required = false, name = "-sudo", usage = "Run via sudo.")
var sudo: Boolean = false
@Args4jOption(required = false, name = "-add_files", usage = "If true, use the SparkFiles mechanism to distribute files to executors.")
var addFiles: Boolean = false
@Args4jOption(required = false, name = "-use_docker", usage = "If true, uses Docker to launch Bedtools.")
var useDocker: Boolean = false
@Args4jOption(required = false, name = "-use_singularity", usage = "If true, uses Singularity to launch Bedtools.")
var useSingularity: Boolean = false
}
/**
* Bedtools wrapper as a function FeatureRDD → FeatureRDD,
* for use in cannoli-shell or notebooks.
*
* <code>
* val args = new BedtoolsFnArgs()
* args.b = "foo.bed"
* args.useDocker = true
* val features = ...
* val pipedFeatures = new BedtoolsFn(args, sc).apply(features)
* </code>
*
* @param args Bedtools function arguments.
* @param sc Spark context.
*/
class BedtoolsFn(
val args: BedtoolsFnArgs,
sc: SparkContext) extends CannoliFn[FeatureRDD, FeatureRDD](sc) with Logging {
override def apply(features: FeatureRDD): FeatureRDD = {
val optA = Option(args.a)
val optB = Option(args.b)
require(optA.size + optB.size == 1,
"Strictly one of {-a,-b} should be left unspecified to accept piped input.")
val file = List(optA, optB).flatten.get(0)
var builder = CommandBuilders.create(args.useDocker, args.useSingularity)
.setExecutable(args.executable)
.add("intersect")
.add("-a")
.add(optA.fold("stdin")(f => if (args.addFiles) "$0" else absolute(f)))
.add("-b")
.add(optB.fold("stdin")(f => if (args.addFiles) "$0" else absolute(f)))
if (args.sorted) builder.add("-sorted")
if (args.addFiles) builder.addFile(file)
if (args.useDocker || args.useSingularity) {
builder
.setImage(args.image)
.setSudo(args.sudo)
.addMount(if (args.addFiles) "$root" else root(file))
}
log.info("Piping {} to bedtools with command: {} files: {}",
features, builder.build(), builder.getFiles())
implicit val tFormatter = BEDInFormatter
implicit val uFormatter = new BEDOutFormatter
features.pipe[Feature, FeatureProduct, FeatureRDD, BEDInFormatter](
cmd = builder.build(),
files = builder.getFiles()
)
}
}
object Bedtools extends BDGCommandCompanion {
val commandName = "bedtools"
val commandDescription = "ADAM Pipe API wrapper for Bedtools intersect."
def apply(cmdLine: Array[String]) = {
new Bedtools(Args4j[BedtoolsArgs](cmdLine))
}
}
/**
* Bedtools command line arguments.
*/
class BedtoolsArgs extends BedtoolsFnArgs with ADAMSaveAnyArgs with ParquetArgs {
@Argument(required = true, metaVar = "INPUT", usage = "Location to pipe features from (e.g., .bed, .gff/.gtf, .gff3, .interval_list, .narrowPeak). If extension is not detected, Parquet is assumed.", index = 0)
var inputPath: String = null
@Argument(required = true, metaVar = "OUTPUT", usage = "Location to pipe features to. If extension is not detected, Parquet is assumed.", index = 1)
var outputPath: String = null
@Args4jOption(required = false, name = "-limit_projection", usage = "If input is Parquet, limit to BED format-only fields by projection.")
var limitProjection: Boolean = false
@Args4jOption(required = false, name = "-partitions", usage = "Number of partitions to use when loading a text file.")
var partitions: Int = _
@Args4jOption(required = false, name = "-single", usage = "Saves OUTPUT as single file.")
var asSingleFile: Boolean = false
@Args4jOption(required = false, name = "-disable_fast_concat", usage = "Disables the parallel file concatenation engine.")
var disableFastConcat: Boolean = false
@Args4jOption(required = false, name = "-defer_merging", usage = "Defers merging single file output.")
var deferMerging: Boolean = false
// must be defined due to ADAMSaveAnyArgs, but unused here
var sortFastqOutput: Boolean = false
}
/**
* Bedtools command line wrapper.
*/
class Bedtools(protected val args: BedtoolsArgs) extends BDGSparkCommand[BedtoolsArgs] with Logging {
val companion = Bedtools
override def run(sc: SparkContext) {
val projection = Projection(
FeatureField.contigName,
FeatureField.start,
FeatureField.end,
FeatureField.name,
FeatureField.score,
FeatureField.strand
)
val features = sc.loadFeatures(
args.inputPath,
optMinPartitions = Option(args.partitions),
optProjection = if (args.limitProjection) Some(projection) else None
)
val pipedFeatures = new BedtoolsFn(args, sc).apply(features)
pipedFeatures.save(args.outputPath,
asSingleFile = args.asSingleFile,
disableFastConcat = args.disableFastConcat)
}
}