Skip to content

Commit

Permalink
Merge e560f30 into a970e6f
Browse files Browse the repository at this point in the history
  • Loading branch information
ryan-williams committed Nov 3, 2014
2 parents a970e6f + e560f30 commit 3d58dd3
Show file tree
Hide file tree
Showing 6 changed files with 1,353 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ object ADAMMain extends Logging {
ListDict,
SummarizeGenotypes,
AlleleCount,
BuildInformation
BuildInformation,
View
)
)
)
Expand Down
135 changes: 135 additions & 0 deletions adam-cli/src/main/scala/org/bdgenomics/adam/cli/View.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/**
* Licensed to Big Data Genomics (BDG) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The BDG licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.bdgenomics.adam.cli

import org.apache.hadoop.mapreduce.Job
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.bdgenomics.adam.rdd.ADAMSaveArgs
import org.bdgenomics.adam.rdd.read.AlignmentRecordContext._
import org.bdgenomics.formats.avro.AlignmentRecord
import org.kohsuke.args4j.{ Argument, Option => Args4jOption }
import org.bdgenomics.adam.rdd.ADAMContext._

class ViewArgs extends Args4jBase with ParquetArgs with ADAMSaveArgs {
@Argument(required = true, metaVar = "INPUT", usage = "The ADAM, BAM or SAM file to view", index = 0)
var inputPath: String = null

@Argument(required = false, metaVar = "OUTPUT", usage = "Location to write output data", index = 1)
var outputPath: String = null

@Args4jOption(
required = false,
name = "-f",
metaVar = "N",
usage = "Restrict to reads that match all of the bits in <N>")
var matchAllBits: Int = 0

@Args4jOption(
required = false,
name = "-F",
metaVar = "N",
usage = "Restrict to reads that match none of the bits in <N>")
var matchNoBits: Int = 0

@Args4jOption(
required = false,
name = "-c",
usage = "Print count of matching records, instead of the records themselves")
var printCount: Boolean = false

@Args4jOption(
required = false,
name = "-o",
metaVar = "<FILE>",
usage = "Output to <FILE>; can also pass <FILE> as the second argument")
var outputPathArg: String = null
}

object View extends ADAMCommandCompanion {
val commandName = "view"
val commandDescription = "View certain reads from an alignment-record file."

def apply(cmdLine: Array[String]): View = {
val args = Args4j[ViewArgs](cmdLine)
if (args.outputPath == null && args.outputPathArg != null) {
args.outputPath = args.outputPathArg
}
new View(args)
}
}

/**
* The `adam view` command implements some of the functionality of `samtools view`, specifically the -f, -F, -c, and -o
* options, in an optionally distributed fashion.
*
* It is agnostic to its input and output being SAM, BAM, or ADAM files; when printing to stdout it prints SAM.
*/
class View(val args: ViewArgs) extends ADAMSparkCommand[ViewArgs] {
val companion = View

type ReadFilter = (AlignmentRecord => Boolean)

def getFilters(n: Int, ensureValue: Boolean = true): List[ReadFilter] = {
def getFilter(bit: Int, fn: ReadFilter): Option[ReadFilter] =
if ((n & bit) > 0)
Some(
fn(_) == ensureValue
)
else
None

List(
getFilter(0x1, _.getReadPaired),
getFilter(0x2, _.getProperPair),
getFilter(0x4, !_.getReadMapped),
getFilter(0x8, !_.getMateMapped),
getFilter(0x10, _.getReadNegativeStrand),
getFilter(0x20, _.getMateNegativeStrand),
getFilter(0x40, _.getFirstOfPair),
getFilter(0x80, _.getSecondOfPair),
getFilter(0x100, _.getFailedVendorQualityChecks),
getFilter(0x200, _.getDuplicateRead),
getFilter(0x400, _.getDuplicateRead),
getFilter(0x800, _.getSupplementaryAlignment)
).flatten
}

def run(sc: SparkContext, job: Job) = {

var reads: RDD[AlignmentRecord] = sc.adamLoad(args.inputPath)

val filters: List[ReadFilter] = getFilters(args.matchAllBits)
val notFilters: List[ReadFilter] = getFilters(args.matchNoBits, ensureValue = false)
val allFilters = filters ++ notFilters

if (allFilters.nonEmpty) {
reads = reads.filter(read => allFilters.forall(_(read)))
}

if (args.outputPath != null)
reads.adamAlignedRecordSave(args)
else {
if (args.printCount) {
println(reads.count())
} else {
println(reads.adamSAMString)
}
}
}
}
135 changes: 135 additions & 0 deletions adam-cli/src/test/scala/org/bdgenomics/adam/cli/ViewSuite.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/**
* Licensed to Big Data Genomics (BDG) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The BDG licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.bdgenomics.adam.cli

import java.io.File

import org.apache.commons.io.FileUtils
import org.apache.hadoop.mapreduce.Job
import org.bdgenomics.adam.util.SparkFunSuite
import org.bdgenomics.adam.rdd.ADAMContext._

class ViewSuite extends SparkFunSuite {

val inputSamPath = ClassLoader.getSystemClassLoader.getResource("flag-values.sam").getFile
val testDirectory = new File(inputSamPath).getParent
val intermediateAdamPath = "%s/flag-values.adam".format(testDirectory)
val unmappedAdamPath = "%s/flag-values-unmapped.adam".format(testDirectory)

def deleteTempDirs = {
FileUtils.deleteDirectory(new File(intermediateAdamPath))
FileUtils.deleteDirectory(new File(unmappedAdamPath))
}

sparkBefore("cleanup directories before run") {
deleteTempDirs

val transform =
new Transform(
Args4j[TransformArgs](
Array(
inputSamPath,
intermediateAdamPath
)
)
)

transform.run(sc, new Job())

}

sparkAfter("cleanup directories after run") {
deleteTempDirs
}

def runView(matchAllBits: Int = -1, matchNoBits: Int = -1): Unit =
runView(
if (matchAllBits >= 0) Some(matchAllBits) else None,
if (matchNoBits >= 0) Some(matchNoBits) else None
)

def runView(matchAllBitsOpt: Option[Int], matchNoBitsOpt: Option[Int]): Unit = {

FileUtils.deleteDirectory(new File(unmappedAdamPath))

val args: Array[String] =
(matchAllBitsOpt.toList.flatMap("-f %d".format(_).split(" ")).toList ++
matchNoBitsOpt.toList.flatMap("-F %d".format(_).split(" ")).toList :+
intermediateAdamPath :+
unmappedAdamPath).toArray

new View(
Args4j[ViewArgs](
args
)
).run(sc, new Job())
}

sparkTest("-f 0 -F 0 is a no-op") {
runView(0, 0)
assert(sc.adamLoad(unmappedAdamPath).count() == sc.adamLoad(intermediateAdamPath).count())
}

sparkTest("no -f or -F args is a no-op") {
runView()
assert(sc.adamLoad(unmappedAdamPath).count() == sc.adamLoad(intermediateAdamPath).count())
}

sparkTest("only unmapped reads") {
runView(4)

/**
* Of the 4096 (2^12) possible values of the 12 flag-field bits:
*
* - half (2048) have the 0x4 (unmapped read) flag set, which we are filtering around in this test case.
* - only 1/4 of those (512) have neither the "secondary alignment" (0x100) nor the "supplementary alignment"
* (0x800) flags set (HTSJDK doesn't allow them to be set if 0x4 is set, because that wouldn't make sense).
* - half (256) have the "paired" flag (0x1) set and half (256) don't:
* 1. of the 256 that do, 3/4ths (192) of them have at least one of {"first in template" (0x40), "second in
* template" (0x80)} set, and are therefore valid.
* 2. of those that don't, 1/32nd (8) of them (those with none of {"proper pair" (0x2), "mate unmapped" (0x8),
* "mate reversed" (0x20), "first in template" (0x40), "last in template" (0x80)} set) are valid.
* - 192 and 8 from 1. and 2. above make for 200 total.
*/
assert(sc.adamLoad(unmappedAdamPath).count() == 200)
}

sparkTest("only mapped reads") {
runView(matchNoBits = 4)
// 800 here is the rest of the 1000-read input file that was not counted in the 200 above.
assert(sc.adamLoad(unmappedAdamPath).count() == 800)
}

/**
* - 1/4 (1024) have 0x4 set and 0x8 *not* set.
* - 1/4 of those (256) have neither (0x100) nor (0x800), which aren't allowed on unmapped reads
* 1. of 128 "paired" reads, 3/4ths (96) have 0x40 or 0x80
* 2. of 128 "unpaired" reads, none are valid because 0x8 isn't allowed.
* - total: 96 reads from 1.
*/
sparkTest("unmapped reads with mapped mates") {
runView(4, 8)
assert(sc.adamLoad(unmappedAdamPath).count() == 96)
}

// 104 reads is the complement of the last case (96) from among the 200 from the "unmapped" case.
sparkTest("unmapped reads with unmapped mates") {
runView(12)
assert(sc.adamLoad(unmappedAdamPath).count() == 104)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
*/
package org.bdgenomics.adam.rdd.read

import java.io.StringWriter

import org.seqdoop.hadoop_bam.SAMRecordWritable
import htsjdk.samtools.SAMFileHeader
import htsjdk.samtools.{ ValidationStringency, SAMTextHeaderCodec, SAMTextWriter, SAMFileHeader }
import org.apache.hadoop.io.LongWritable
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.SparkContext._
Expand Down Expand Up @@ -94,6 +96,26 @@ class AlignmentRecordRDDFunctions(rdd: RDD[AlignmentRecord])
maybeSaveBam(args) || maybeSaveFastq(args) || { rdd.adamParquetSave(args); true }
}

def adamSAMString: String = {
// convert the records
val (convertRecords: RDD[SAMRecordWritable], header: SAMFileHeader) = rdd.adamConvertToSAM()

val records = convertRecords.coalesce(1, shuffle = true).collect()

val samHeaderCodec = new SAMTextHeaderCodec
samHeaderCodec.setValidationStringency(ValidationStringency.SILENT)

val samStringWriter = new StringWriter()
samHeaderCodec.encode(samStringWriter, header);

val samWriter: SAMTextWriter = new SAMTextWriter(samStringWriter)
//samWriter.writeHeader(stringHeaderWriter.toString)

records.foreach(record => samWriter.writeAlignment(record.get))

samStringWriter.toString
}

/**
* Saves an RDD of ADAM read data into the SAM/BAM format.
*
Expand Down
Loading

0 comments on commit 3d58dd3

Please sign in to comment.