Skip to content

Commit

Permalink
[ADAM-771] add helper for loading paired fastq files
Browse files Browse the repository at this point in the history
* generalize stringency, fastqs use it from cmdline
* support loading paired-fastq via loadAlignments
* support setting recordGroup from cmdline
* "-concat" flag to merge two AlignmentRecord RDDs

Resolves #771.
  • Loading branch information
ryan-williams authored and fnothaft committed Oct 2, 2015
1 parent 3ac54a1 commit 4c9654e
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 37 deletions.
57 changes: 45 additions & 12 deletions adam-cli/src/main/scala/org/bdgenomics/adam/cli/Transform.scala
Expand Up @@ -50,8 +50,8 @@ class TransformArgs extends Args4jBase with ADAMSaveAnyArgs with ParquetArgs {
var markDuplicates: Boolean = false
@Args4jOption(required = false, name = "-recalibrate_base_qualities", usage = "Recalibrate the base quality scores (ILLUMINA only)")
var recalibrateBaseQualities: Boolean = false
@Args4jOption(required = false, name = "-strict_bqsr", usage = "Run BQSR with strict validation.")
var strictBQSR: Boolean = false
@Args4jOption(required = false, name = "-stringency", usage = "Stringency level for various checks; can be SILENT, LENIENT, or STRICT. Defaults to LENIENT")
var stringency: String = "LENIENT"
@Args4jOption(required = false, name = "-dump_observations", usage = "Local path to dump BQSR observations to. Outputs CSV format.")
var observationsPath: String = null
@Args4jOption(required = false, name = "-known_snps", usage = "Sites-only VCF giving location of known SNPs")
Expand Down Expand Up @@ -84,11 +84,19 @@ class TransformArgs extends Args4jBase with ADAMSaveAnyArgs with ParquetArgs {
var forceLoadParquet: Boolean = false
@Args4jOption(required = false, name = "-single", usage = "Saves OUTPUT as single file")
var asSingleFile: Boolean = false
@Args4jOption(required = false, name = "-paired_fastq", usage = "When converting two (paired) FASTQ files to ADAM, pass the path to the second file here.")
var pairedFastqFile: String = null
@Args4jOption(required = false, name = "-record_group", usage = "Set converted FASTQs' record-group names to this value; if empty-string is passed, use the basename of the input file, minus the extension.")
var fastqRecordGroup: String = null
@Args4jOption(required = false, name = "-concat", usage = "Concatenate this file with <INPUT> and write the result to <OUTPUT>")
var concatFilename: String = null
}

class Transform(protected val args: TransformArgs) extends BDGSparkCommand[TransformArgs] with Logging {
val companion = Transform

val stringency = ValidationStringency.valueOf(args.stringency)

def apply(rdd: RDD[AlignmentRecord]): RDD[AlignmentRecord] = {

var adamRecords = rdd
Expand Down Expand Up @@ -122,14 +130,11 @@ class Transform(protected val args: TransformArgs) extends BDGSparkCommand[Trans
if (args.recalibrateBaseQualities) {
log.info("Recalibrating base qualities")
val knownSnps: SnpTable = createKnownSnpsTable(sc)
val stringency = if (args.strictBQSR) {
ValidationStringency.STRICT
} else {
ValidationStringency.LENIENT
}
adamRecords = adamRecords.adamBQSR(sc.broadcast(knownSnps),
adamRecords = adamRecords.adamBQSR(
sc.broadcast(knownSnps),
Option(args.observationsPath),
stringency)
stringency
)
}

if (args.coalesce != -1) {
Expand All @@ -147,18 +152,46 @@ class Transform(protected val args: TransformArgs) extends BDGSparkCommand[Trans
}

def run(sc: SparkContext) {
this.apply({
val rdd =
if (args.forceLoadBam) {
sc.loadBam(args.inputPath)
} else if (args.forceLoadFastq) {
sc.loadUnpairedFastq(args.inputPath)
sc.loadFastq(args.inputPath, Option(args.pairedFastqFile), Option(args.fastqRecordGroup), stringency)
} else if (args.forceLoadIFastq) {
sc.loadInterleavedFastq(args.inputPath)
} else if (args.forceLoadParquet) {
sc.loadParquetAlignments(args.inputPath)
} else {
sc.loadAlignments(args.inputPath)
sc.loadAlignments(
args.inputPath,
filePath2Opt = Option(args.pairedFastqFile),
recordGroupOpt = Option(args.fastqRecordGroup),
stringency = stringency
)
}

// Optionally load a second RDD and concatenate it with the first.
// Paired-FASTQ loading is avoided here because that wouldn't make sense
// given that it's already happening above.
val concatRddOpt =
Option(args.concatFilename).map(concatFilename =>
if (args.forceLoadBam) {
sc.loadBam(concatFilename)
} else if (args.forceLoadIFastq) {
sc.loadInterleavedFastq(concatFilename)
} else if (args.forceLoadParquet) {
sc.loadParquetAlignments(concatFilename)
} else {
sc.loadAlignments(
concatFilename,
recordGroupOpt = Option(args.fastqRecordGroup)
)
}
)

this.apply(concatRddOpt match {
case Some(concatRdd) => rdd ++ concatRdd
case None => rdd
}).adamSave(args, args.sortReads)
}

Expand Down
Expand Up @@ -113,30 +113,60 @@ class FastqRecordConverter extends Serializable with Logging {
.build()
}

def convertRead(element: (Void, Text)): AlignmentRecord = {
def convertRead(element: (Void, Text),
recordGroupOpt: Option[String] = None,
setFirstOfPair: Boolean = false,
setSecondOfPair: Boolean = false): AlignmentRecord = {
val lines = element._2.toString.split('\n')
require(lines.length == 4, "Record has wrong format:\n" + element._2.toString)

def trimTrailingReadNumber(readName: String): String = {
if (readName.endsWith("/1")) {
if (setSecondOfPair) {
throw new Exception(
s"Found read name $readName ending in '/1' despite second-of-pair flag being set"
)
}
readName.dropRight(2)
} else if (readName.endsWith("/2")) {
if (setFirstOfPair) {
throw new Exception(
s"Found read name $readName ending in '/2' despite first-of-pair flag being set"
)
}
readName.dropRight(2)
} else {
readName
}
}

// get fields for first read in pair
val readName = lines(0).drop(1)
val readName = trimTrailingReadNumber(lines(0).drop(1))
val readSequence = lines(1)
val readQualities = lines(3)

require(readSequence.length == readQualities.length,
"Read " + readName + " has different sequence and qual length.")

AlignmentRecord.newBuilder()
val builder = AlignmentRecord.newBuilder()
.setReadName(readName)
.setSequence(readSequence)
.setQual(readQualities)
.setReadPaired(false)
.setReadPaired(setFirstOfPair || setSecondOfPair)
.setProperPair(null)
.setReadNum(0)
.setReadNum(
if (setFirstOfPair) 0
else if (setSecondOfPair) 1
else null
)
.setReadNegativeStrand(null)
.setMateNegativeStrand(null)
.setPrimaryAlignment(null)
.setSecondaryAlignment(null)
.setSupplementaryAlignment(null)
.build()

recordGroupOpt.foreach(builder.setRecordGroupName)

builder.build()
}
}
90 changes: 71 additions & 19 deletions adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala
Expand Up @@ -19,15 +19,20 @@ package org.bdgenomics.adam.rdd

import java.io.FileNotFoundException
import java.util.regex.Pattern
import htsjdk.samtools.SAMFileHeader
import htsjdk.samtools.{ IndexedBamInputFormat, SAMFileHeader, ValidationStringency }
import htsjdk.samtools.{ ValidationStringency, SAMFileHeader, IndexedBamInputFormat }
import org.apache.avro.Schema
import org.apache.avro.generic.IndexedRecord
import org.apache.avro.specific.SpecificRecord
import org.apache.hadoop.fs.{ FileSystem, FileStatus, Path }
import org.apache.hadoop.fs.{ FileStatus, FileSystem, Path }
import org.apache.hadoop.io.{ LongWritable, Text }
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.spark.rdd.RDD
import org.apache.parquet.avro.{ AvroParquetInputFormat, AvroReadSupport }
import org.apache.parquet.filter2.predicate.FilterPredicate
import org.apache.parquet.hadoop.ParquetInputFormat
import org.apache.parquet.hadoop.util.ContextUtil
import org.apache.spark.rdd.MetricsContext._
import org.apache.spark.rdd.RDD
import org.apache.spark.{ Logging, SparkContext }
import org.bdgenomics.adam.converters._
import org.bdgenomics.adam.instrumentation.Timers._
Expand All @@ -43,15 +48,10 @@ import org.bdgenomics.adam.rich.RichAlignmentRecord
import org.bdgenomics.formats.avro._
import org.bdgenomics.utils.instrumentation.Metrics
import org.bdgenomics.utils.misc.HadoopUtil
import org.seqdoop.hadoop_bam.util.SAMHeaderReader
import org.seqdoop.hadoop_bam._
import org.apache.parquet.avro.{ AvroParquetInputFormat, AvroReadSupport }
import org.apache.parquet.filter2.predicate.FilterPredicate
import org.apache.parquet.hadoop.ParquetInputFormat
import org.apache.parquet.hadoop.util.ContextUtil
import org.seqdoop.hadoop_bam.util.SAMHeaderReader
import scala.collection.JavaConversions._
import scala.collection.Map
import htsjdk.samtools.IndexedBamInputFormat

object ADAMContext {
// Add ADAM Spark context methods
Expand Down Expand Up @@ -104,7 +104,7 @@ object ADAMContext {
implicit def setToJavaSet[A](set: Set[A]): java.util.Set[A] = setAsJavaSet(set)
}

import ADAMContext._
import org.bdgenomics.adam.rdd.ADAMContext._

class ADAMContext(val sc: SparkContext) extends Serializable with Logging {

Expand Down Expand Up @@ -233,9 +233,7 @@ class ADAMContext(val sc: SparkContext) extends Serializable with Logging {
}

def loadBam(filePath: String): RDD[AlignmentRecord] = {

val path = new Path(filePath)

val fs =
Option(
FileSystem.get(path.toUri, sc.hadoopConfiguration)
Expand Down Expand Up @@ -365,8 +363,47 @@ class ADAMContext(val sc: SparkContext) extends Serializable with Logging {
records.flatMap(fastqRecordConverter.convertPair)
}

def loadUnpairedFastq(
filePath: String): RDD[AlignmentRecord] = {
def loadFastq(filePath1: String,
filePath2Opt: Option[String],
recordGroupOpt: Option[String] = None,
stringency: ValidationStringency = ValidationStringency.STRICT): RDD[AlignmentRecord] = {
filePath2Opt match {
case Some(filePath2) => loadPairedFastq(filePath1, filePath2, recordGroupOpt, stringency)
case None => loadUnpairedFastq(filePath1)
}
}

def loadPairedFastq(filePath1: String,
filePath2: String,
recordGroupOpt: Option[String],
stringency: ValidationStringency): RDD[AlignmentRecord] = {
val reads1 = loadUnpairedFastq(filePath1, setFirstOfPair = true)
val reads2 = loadUnpairedFastq(filePath2, setSecondOfPair = true)

stringency match {
case ValidationStringency.STRICT | ValidationStringency.LENIENT =>
val count1 = reads1.cache.count
val count2 = reads2.cache.count

if (count1 != count2) {
val msg = s"Fastq 1 ($filePath1) has $count1 reads, fastq 2 ($filePath2) has $count2 reads"
if (stringency == ValidationStringency.STRICT)
throw new IllegalArgumentException(msg)
else {
// ValidationStringency.LENIENT
logError(msg)
}
}
case ValidationStringency.SILENT =>
}

reads1 ++ reads2
}

def loadUnpairedFastq(filePath: String,
recordGroupOpt: Option[String] = None,
setFirstOfPair: Boolean = false,
setSecondOfPair: Boolean = false): RDD[AlignmentRecord] = {

val job = HadoopUtil.newJob(sc)
val records = sc.newAPIHadoopFile(
Expand All @@ -380,7 +417,19 @@ class ADAMContext(val sc: SparkContext) extends Serializable with Logging {

// convert records
val fastqRecordConverter = new FastqRecordConverter
records.map(fastqRecordConverter.convertRead)
records.map(
fastqRecordConverter.convertRead(
_,
recordGroupOpt.map(recordGroup =>
if (recordGroup.isEmpty)
filePath.substring(filePath.lastIndexOf("/") + 1)
else
recordGroup
),
setFirstOfPair,
setSecondOfPair
)
)
}

def loadVcf(filePath: String, sd: Option[SequenceDictionary]): RDD[VariantContext] = {
Expand Down Expand Up @@ -467,8 +516,8 @@ class ADAMContext(val sc: SparkContext) extends Serializable with Logging {
.setContig(
Contig.newBuilder()
.setContigName(seqRecord.name)
.setReferenceURL(seqRecord.url.getOrElse(null))
.setContigMD5(seqRecord.md5.getOrElse(null))
.setReferenceURL(seqRecord.url.orNull)
.setContigMD5(seqRecord.md5.orNull)
.setContigLength(seqRecord.length)
.build()
)
Expand Down Expand Up @@ -606,7 +655,10 @@ class ADAMContext(val sc: SparkContext) extends Serializable with Logging {

def loadAlignments(
filePath: String,
projection: Option[Schema] = None): RDD[AlignmentRecord] = LoadAlignmentRecords.time {
projection: Option[Schema] = None,
filePath2Opt: Option[String] = None,
recordGroupOpt: Option[String] = None,
stringency: ValidationStringency = ValidationStringency.STRICT): RDD[AlignmentRecord] = LoadAlignmentRecords.time {

if (filePath.endsWith(".sam") ||
filePath.endsWith(".bam")) {
Expand All @@ -618,7 +670,7 @@ class ADAMContext(val sc: SparkContext) extends Serializable with Logging {
} else if (filePath.endsWith(".fq") ||
filePath.endsWith(".fastq")) {
log.info("Loading " + filePath + " as unpaired FASTQ and converting to AlignmentRecords. Projection is ignored.")
loadUnpairedFastq(filePath)
loadFastq(filePath, filePath2Opt, recordGroupOpt, stringency)
} else if (filePath.endsWith(".fa") ||
filePath.endsWith(".fasta")) {
log.info("Loading " + filePath + " as FASTA and converting to AlignmentRecords. Projection is ignored.")
Expand Down

0 comments on commit 4c9654e

Please sign in to comment.