Skip to content

Commit

Permalink
Merge pull request #805 from ryan-williams/fnf
Browse files Browse the repository at this point in the history
Better transform error when file doesn't exist
  • Loading branch information
heuermh committed Aug 24, 2015
2 parents 4fd82e6 + e671030 commit 9bf6942
Showing 1 changed file with 43 additions and 26 deletions.
69 changes: 43 additions & 26 deletions adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala
Expand Up @@ -17,6 +17,7 @@
*/
package org.bdgenomics.adam.rdd

import java.io.FileNotFoundException
import java.util.regex.Pattern
import htsjdk.samtools.SAMFileHeader
import org.apache.avro.Schema
Expand All @@ -27,7 +28,7 @@ import org.apache.hadoop.io.{ LongWritable, Text }
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.MetricsContext._
import org.apache.spark.{ Logging, SparkConf, SparkContext }
import org.apache.spark.{ Logging, SparkContext }
import org.bdgenomics.adam.converters._
import org.bdgenomics.adam.instrumentation.Timers._
import org.bdgenomics.adam.io._
Expand All @@ -49,7 +50,6 @@ import org.apache.parquet.hadoop.ParquetInputFormat
import org.apache.parquet.hadoop.util.ContextUtil
import scala.collection.JavaConversions._
import scala.collection.Map
import scala.reflect.ClassTag
import htsjdk.samtools.IndexedBamInputFormat

object ADAMContext {
Expand Down Expand Up @@ -234,32 +234,49 @@ class ADAMContext(val sc: SparkContext) extends Serializable with Logging {
filePath: String): RDD[AlignmentRecord] = {

val path = new Path(filePath)
val fs = FileSystem.get(path.toUri, sc.hadoopConfiguration)
val bamFiles = if (fs.isDirectory(path)) fs.listStatus(path) else fs.globStatus(path)
val (seqDict, readGroups) = bamFiles
.map(fs => fs.getPath)
.flatMap(fp => {

try {
// We need to separately read the header, so that we can inject the sequence dictionary
// data into each individual Read (see the argument to samRecordConverter.convert,
// below).
val samHeader = SAMHeaderReader.readSAMHeaderFrom(fp, sc.hadoopConfiguration)
log.info(s"Loaded header from $fp")
val sd = adamBamDictionaryLoad(samHeader)
val rg = adamBamLoadReadGroups(samHeader)
Some((sd, rg))
} catch {
case e: Throwable => {
log.error(
s"Loading failed for $fp:n${e.getMessage}\n\t${e.getStackTrace.take(25).map(_.toString).mkString("\n\t")}"
)
None
val fs =
Option(
FileSystem.get(path.toUri, sc.hadoopConfiguration)
).getOrElse(
throw new FileNotFoundException(
s"Couldn't find filesystem for ${path.toUri} with Hadoop configuration ${sc.hadoopConfiguration}"
)
)

val bamFiles =
Option(
if (fs.isDirectory(path)) fs.listStatus(path) else fs.globStatus(path)
).getOrElse(
throw new FileNotFoundException(
s"Couldn't find any files matching ${path.toUri}"
)
)

val (seqDict, readGroups) =
bamFiles
.map(fs => fs.getPath)
.flatMap(fp => {
try {
// We need to separately read the header, so that we can inject the sequence dictionary
// data into each individual Read (see the argument to samRecordConverter.convert,
// below).
val samHeader = SAMHeaderReader.readSAMHeaderFrom(fp, sc.hadoopConfiguration)
log.info("Loaded header from " + fp)
val sd = adamBamDictionaryLoad(samHeader)
val rg = adamBamLoadReadGroups(samHeader)
Some((sd, rg))
} catch {
case e: Throwable => {
log.error(
s"Loading failed for $fp:n${e.getMessage}\n\t${e.getStackTrace.take(25).map(_.toString).mkString("\n\t")}"
)
None
}
}
}
}).reduce((kv1, kv2) => {
(kv1._1 ++ kv2._1, kv1._2 ++ kv2._2)
})
}).reduce((kv1, kv2) => {
(kv1._1 ++ kv2._1, kv1._2 ++ kv2._2)
})

val job = HadoopUtil.newJob(sc)
val records = sc.newAPIHadoopFile(filePath, classOf[AnySAMInputFormat], classOf[LongWritable],
Expand Down

0 comments on commit 9bf6942

Please sign in to comment.