Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added additional arguments to GenomicRDD.pipe() #1758

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion adam-apis/pom.xml
Expand Up @@ -33,7 +33,7 @@
As explained here: http://stackoverflow.com/questions/1660441/java-flag-to-enable-extended-serialization-debugging-info
The second option allows us better debugging for serialization-based errors.
-->
<argLine>-Xmx1024m -Dsun.io.serialization.extendedDebugInfo=true</argLine>
<argLine>-Xmx1024m -Dsun.io.serialization.extendedDebugInfo=true -Djava.io.tmpdir=/tmp/adamTestMvnNTasIKA</argLine>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove these

<stdout>F</stdout>
</configuration>
<executions>
Expand Down
2 changes: 1 addition & 1 deletion adam-cli/pom.xml
Expand Up @@ -60,7 +60,7 @@
As explained here: http://stackoverflow.com/questions/1660441/java-flag-to-enable-extended-serialization-debugging-info
The second option allows us better debugging for serialization-based errors.
-->
<argLine>-Xmx1024m -Dsun.io.serialization.extendedDebugInfo=true</argLine>
<argLine>-Xmx1024m -Dsun.io.serialization.extendedDebugInfo=true -Djava.io.tmpdir=/tmp/adamTestMvnNTasIKA</argLine>
<stdout>F</stdout>
</configuration>
<executions>
Expand Down
2 changes: 1 addition & 1 deletion adam-codegen/pom.xml
Expand Up @@ -66,7 +66,7 @@
As explained here: http://stackoverflow.com/questions/1660441/java-flag-to-enable-extended-serialization-debugging-info
The second option allows us better debugging for serialization-based errors.
-->
<argLine>-Xmx1024m -Dsun.io.serialization.extendedDebugInfo=true</argLine>
<argLine>-Xmx1024m -Dsun.io.serialization.extendedDebugInfo=true -Djava.io.tmpdir=/tmp/adamTestMvnNTasIKA</argLine>
<stdout>F</stdout>
</configuration>
<executions>
Expand Down
2 changes: 1 addition & 1 deletion adam-core/pom.xml
Expand Up @@ -33,7 +33,7 @@
As explained here: http://stackoverflow.com/questions/1660441/java-flag-to-enable-extended-serialization-debugging-info
The second option allows us better debugging for serialization-based errors.
-->
<argLine>-Xmx1024m -Dsun.io.serialization.extendedDebugInfo=true</argLine>
<argLine>-Xmx1024m -Dsun.io.serialization.extendedDebugInfo=true -Djava.io.tmpdir=/tmp/adamTestMvnNTasIKA</argLine>
<stdout>F</stdout>
</configuration>
<executions>
Expand Down
22 changes: 13 additions & 9 deletions adam-core/src/main/scala/org/bdgenomics/adam/rdd/GenomicRDD.scala
Expand Up @@ -464,11 +464,13 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] extends Logging {
def pipe[X, Y <: GenomicRDD[X, Y], V <: InFormatter[T, U, V]](cmd: String,
files: Seq[String] = Seq.empty,
environment: Map[String, String] = Map.empty,
flankSize: Int = 0)(implicit tFormatterCompanion: InFormatterCompanion[T, U, V],
xFormatter: OutFormatter[X],
convFn: (U, RDD[X]) => Y,
tManifest: ClassTag[T],
xManifest: ClassTag[X]): Y = {
flankSize: Int = 0,
repartitionInput: Boolean = true,
filterOutput: Boolean = true)(implicit tFormatterCompanion: InFormatterCompanion[T, U, V],
xFormatter: OutFormatter[X],
convFn: (U, RDD[X]) => Y,
tManifest: ClassTag[T],
xManifest: ClassTag[X]): Y = {

// TODO: support broadcasting files
files.foreach(f => {
Expand All @@ -484,7 +486,10 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] extends Logging {
val bins = GenomeBins(totalLength / rdd.partitions.size, seqLengths)

// if the input rdd is mapped, then we need to repartition
val partitionedRdd = if (sequences.records.size > 0) {
val partitionedRdd = if (sequences.records.isEmpty ||
!repartitionInput) {
rdd
} else {
// get region covered, expand region by flank size, and tag with bins
val binKeyedRdd = rdd.flatMap(r => {

Expand All @@ -506,8 +511,6 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] extends Logging {
.repartitionAndSortWithinPartitions(
ManualRegionPartitioner(bins.numBins))
.values
} else {
rdd
}

// are we in local mode?
Expand Down Expand Up @@ -561,7 +564,8 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] extends Logging {

// if the original rdd was aligned and the final rdd is aligned, then we must filter
if (newRdd.sequences.isEmpty ||
sequences.isEmpty) {
sequences.isEmpty ||
!filterOutput) {
newRdd
} else {
def filterPartition(idx: Int, iter: Iterator[X]): Iterator[X] = {
Expand Down