Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding a BamLoader class to have only 1 header parse for multiple ind… #1966

Closed
wants to merge 2 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
270 changes: 142 additions & 128 deletions adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala
Expand Up @@ -1502,28 +1502,36 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
}

/**
* Load alignment records from BAM/CRAM/SAM into an AlignmentRecordRDD.
*
* This reads the sequence and record group dictionaries from the BAM/CRAM/SAM file
* header. SAMRecords are read from the file and converted to the
* AlignmentRecord schema.
*
* @param pathName The path name to load BAM/CRAM/SAM formatted alignment records from.
* Globs/directories are supported.
* @param stringency The validation stringency to use when validating the
* BAM/CRAM/SAM format header. Defaults to ValidationStringency.STRICT.
* @return Returns an AlignmentRecordRDD which wraps the RDD of alignment records,
* sequence dictionary representing contigs the alignment records may be aligned to,
* and the record group dictionary for the alignment records if one is available.
*/
def loadBam(
pathName: String,
stringency: ValidationStringency = ValidationStringency.STRICT): AlignmentRecordRDD = LoadBam.time {

val path = new Path(pathName)
val bamFiles = getFsAndFiles(path)
val filteredFiles = bamFiles.filter(p => {
val pPath = p.getName()
* Returns an [[BamLoader]] what can call .loadAll, .loadRegion and .loadRegions
* This give the same result as loadBam and loadIndexedBam only now the header is saved and is not parsed again at a next load.
* @see loadBam
* @see loadIndexedBam
* @param pathName The path name to load BAM/CRAM/SAM formatted alignment records from.
* Globs/directories are supported.
* @param stringency The validation stringency to use when validating the
* BAM/CRAM/SAM format header. Defaults to ValidationStringency.STRICT.
* @return Returns an [[BamLoader]] what can call .loadAll, .loadRegion and .loadRegions
* This give the same result as loadBam and loadIndexedBam only now the header is saved and not reparsed at a next load.
*/
def bamLoader(pathName: String,
stringency: ValidationStringency = ValidationStringency.STRICT) = new BamLoader(pathName, stringency)

/**
* This has methods .loadAll, .loadRegion and .loadRegions
* This give the same result as loadBam and loadIndexedBam only now the header is saved and is not parsed again at a next load.
* @see loadBam
* @see loadIndexedBam
* @param pathName The path name to load BAM/CRAM/SAM formatted alignment records from.
* Globs/directories are supported.
* @param stringency The validation stringency to use when validating the
* BAM/CRAM/SAM format header. Defaults to ValidationStringency.STRICT.
*/
private class BamLoader(pathName: String,
stringency: ValidationStringency = ValidationStringency.STRICT) {
private val path = new Path(pathName)
private val bamFiles = getFsAndFiles(path)
private val filteredFiles = bamFiles.filter(p => {
val pPath = p.getName
isBamExt(pPath) || pPath.startsWith("part-")
})

Expand All @@ -1545,7 +1553,7 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
val pgs = loadBamPrograms(samHeader)
Some((sd, rg, pgs))
} catch {
case e: Throwable => {
case e: Throwable =>
if (stringency == ValidationStringency.STRICT) {
throw e
} else if (stringency == ValidationStringency.LENIENT) {
Expand All @@ -1554,37 +1562,119 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
)
}
None
}
}
}).reduce((kv1, kv2) => {
(kv1._1 ++ kv2._1, kv1._2 ++ kv2._2, kv1._3 ++ kv2._3)
})

val job = HadoopUtil.newJob(sc)
(kv1._1 ++ kv2._1, kv1._2 ++ kv2._2, kv1._3 ++ kv2._3)
})

// this logic is counterintuitive but important.
// hadoop-bam does not filter out .bai files, etc. as such, if we have a
// directory of bam files where all the bams also have bais or md5s etc
// in the same directory, hadoop-bam will barf. if the directory just
// contains bams, hadoop-bam is a-ok! i believe that it is better (perf) to
// just load from a single newAPIHadoopFile call instead of a union across
// files, so we do that whenever possible
val records = if (filteredFiles.length != bamFiles.length) {
sc.union(filteredFiles.map(p => {
sc.newAPIHadoopFile(p.toString, classOf[AnySAMInputFormat], classOf[LongWritable],
/**
* @see loadBam
* @return
*/
def loadAll: AlignmentRecordRDD = {
val job = HadoopUtil.newJob(sc)

// this logic is counterintuitive but important.
// hadoop-bam does not filter out .bai files, etc. as such, if we have a
// directory of bam files where all the bams also have bais or md5s etc
// in the same directory, hadoop-bam will barf. if the directory just
// contains bams, hadoop-bam is a-ok! i believe that it is better (perf) to
// just load from a single newAPIHadoopFile call instead of a union across
// files, so we do that whenever possible
val records = if (filteredFiles.length != bamFiles.length) {
sc.union(filteredFiles.map(p => {
sc.newAPIHadoopFile(p.toString, classOf[AnySAMInputFormat], classOf[LongWritable],
classOf[SAMRecordWritable], ContextUtil.getConfiguration(job))
}))
} else {
sc.newAPIHadoopFile(pathName, classOf[AnySAMInputFormat], classOf[LongWritable],
classOf[SAMRecordWritable], ContextUtil.getConfiguration(job))
}))
} else {
sc.newAPIHadoopFile(pathName, classOf[AnySAMInputFormat], classOf[LongWritable],
classOf[SAMRecordWritable], ContextUtil.getConfiguration(job))
}
if (Metrics.isRecording) records.instrument() else records
val samRecordConverter = new SAMRecordConverter

AlignmentRecordRDD(records.map(p => samRecordConverter.convert(p._2.get)),
seqDict,
readGroups,
programs)
}
if (Metrics.isRecording) records.instrument() else records
val samRecordConverter = new SAMRecordConverter

AlignmentRecordRDD(records.map(p => samRecordConverter.convert(p._2.get)),
seqDict,
readGroups,
programs)
/** This method checks the indexes if they exist */
private def checkIndexes(): Unit = {
// If pathName is a single file or *.bam, append .bai to find all bam indices.
// Otherwise, pathName is a directory and the entire path must be searched
// for indices.
val indexPath = if (pathName.endsWith(".bam")) {
new Path(pathName.toString.replace(".bam", "*.bai"))
} else {
path
}

val indexFiles = getFsAndFiles(indexPath).filter(p => p.toString.endsWith(".bai"))
.map(r => r.toString)

// look for index files named <pathname>.bam.bai and <pathname>.bai
val missingIndices = bamFiles.filterNot(f => {
indexFiles.contains(f.toString + ".bai") || indexFiles.contains(f.toString.dropRight(3) + "bai")
})

if (!missingIndices.isEmpty) {
throw new FileNotFoundException("Missing indices for BAMs:\n%s".format(missingIndices.mkString("\n")))
}
}

/**
* @see loadIndexedBam
* @param viewRegion The ReferenceRegion we are reading.
* @return
*/
def loadRegion(viewRegion: ReferenceRegion): AlignmentRecordRDD = loadRegions(List(viewRegion))

/**
* @see loadIndexedBam
* @param viewRegions The ReferenceRegion's we are reading.
* @return
*/
def loadRegions(viewRegions: Iterable[ReferenceRegion]): AlignmentRecordRDD = {
checkIndexes
val job = HadoopUtil.newJob(sc)
val conf = ContextUtil.getConfiguration(job)
BAMInputFormat.setIntervals(conf, viewRegions.toList.map(r => LocatableReferenceRegion(r)))

val records = sc.newAPIHadoopFile(pathName,
classOf[BAMInputFormat],
classOf[LongWritable],
classOf[SAMRecordWritable],
conf)

if (Metrics.isRecording) records.instrument() else records
val samRecordConverter = new SAMRecordConverter
AlignmentRecordRDD(records.map(p => samRecordConverter.convert(p._2.get)),
seqDict,
readGroups,
programs)
}
}

/**
* Load alignment records from BAM/CRAM/SAM into an AlignmentRecordRDD.
*
* This reads the sequence and record group dictionaries from the BAM/CRAM/SAM file
* header. SAMRecords are read from the file and converted to the
* AlignmentRecord schema.
*
* @param pathName The path name to load BAM/CRAM/SAM formatted alignment records from.
* Globs/directories are supported.
* @param stringency The validation stringency to use when validating the
* BAM/CRAM/SAM format header. Defaults to ValidationStringency.STRICT.
* @return Returns an AlignmentRecordRDD which wraps the RDD of alignment records,
* sequence dictionary representing contigs the alignment records may be aligned to,
* and the record group dictionary for the alignment records if one is available.
*/
def loadBam(
pathName: String,
stringency: ValidationStringency = ValidationStringency.STRICT): AlignmentRecordRDD = LoadBam.time {
bamLoader(pathName, stringency).loadAll
}

/**
Expand All @@ -1598,11 +1688,11 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
* sequence dictionary representing contigs the alignment records may be aligned to,
* and the record group dictionary for the alignment records if one is available.
*/
// todo: add stringency with default if possible
def loadIndexedBam(
pathName: String,
viewRegion: ReferenceRegion): AlignmentRecordRDD = {
loadIndexedBam(pathName, Iterable(viewRegion))
viewRegion: ReferenceRegion,
stringency: ValidationStringency = ValidationStringency.STRICT): AlignmentRecordRDD = {
loadIndexedBam(pathName, Iterable(viewRegion), stringency)
}

/**
Expand All @@ -1622,83 +1712,7 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
pathName: String,
viewRegions: Iterable[ReferenceRegion],
stringency: ValidationStringency = ValidationStringency.STRICT)(implicit s: DummyImplicit): AlignmentRecordRDD = LoadIndexedBam.time {

val path = new Path(pathName)

// If pathName is a single file or *.bam, append .bai to find all bam indices.
// Otherwise, pathName is a directory and the entire path must be searched
// for indices.
val indexPath = if (pathName.endsWith(".bam")) {
new Path(pathName.toString.replace(".bam", "*.bai"))
} else {
path
}

// currently only supports BAM files, see https://github.com/bigdatagenomics/adam/issues/1833
val bamFiles = getFsAndFiles(path).filter(p => p.toString.endsWith(".bam"))

val indexFiles = getFsAndFiles(indexPath).filter(p => p.toString.endsWith(".bai"))
.map(r => r.toString)

require(bamFiles.nonEmpty,
"Did not find any BAM files at %s.".format(path))

// look for index files named <pathname>.bam.bai and <pathname>.bai
val missingIndices = bamFiles.filterNot(f => {
indexFiles.contains(f.toString + ".bai") || indexFiles.contains(f.toString.dropRight(3) + "bai")
})

if (!missingIndices.isEmpty) {
throw new FileNotFoundException("Missing indices for BAMs:\n%s".format(missingIndices.mkString("\n")))
}

val (seqDict, readGroups, programs) = bamFiles
.flatMap(fp => {
try {
// We need to separately read the header, so that we can inject the sequence dictionary
// data into each individual Read (see the argument to samRecordConverter.convert,
// below).
sc.hadoopConfiguration.set(SAMHeaderReader.VALIDATION_STRINGENCY_PROPERTY, stringency.toString)
val samHeader = SAMHeaderReader.readSAMHeaderFrom(fp, sc.hadoopConfiguration)

log.info("Loaded header from " + fp)
val sd = loadBamDictionary(samHeader)
val rg = loadBamReadGroups(samHeader)
val pgs = loadBamPrograms(samHeader)

Some((sd, rg, pgs))
} catch {
case e: Throwable => {
if (stringency == ValidationStringency.STRICT) {
throw e
} else if (stringency == ValidationStringency.LENIENT) {
log.error(
s"Loading failed for $fp:\n${e.getMessage}\n\t${e.getStackTrace.take(25).map(_.toString).mkString("\n\t")}"
)
}
None
}
}
}).reduce((kv1, kv2) => {
(kv1._1 ++ kv2._1, kv1._2 ++ kv2._2, kv1._3 ++ kv2._3)
})

val job = HadoopUtil.newJob(sc)
val conf = ContextUtil.getConfiguration(job)
BAMInputFormat.setIntervals(conf, viewRegions.toList.map(r => LocatableReferenceRegion(r)))

val records = sc.newAPIHadoopFile(pathName,
classOf[BAMInputFormat],
classOf[LongWritable],
classOf[SAMRecordWritable],
conf)

if (Metrics.isRecording) records.instrument() else records
val samRecordConverter = new SAMRecordConverter
AlignmentRecordRDD(records.map(p => samRecordConverter.convert(p._2.get)),
seqDict,
readGroups,
programs)
bamLoader(pathName, stringency).loadRegions(viewRegions)
}

/**
Expand Down