diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala index e82dbc2979..346cab34a9 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala @@ -17,6 +17,7 @@ */ package org.bdgenomics.adam.rdd +import java.io.FileNotFoundException import java.util.regex.Pattern import htsjdk.samtools.SAMFileHeader import org.apache.avro.Schema @@ -27,7 +28,7 @@ import org.apache.hadoop.io.{ LongWritable, Text } import org.apache.hadoop.mapreduce.lib.input.TextInputFormat import org.apache.spark.rdd.RDD import org.apache.spark.rdd.MetricsContext._ -import org.apache.spark.{ Logging, SparkConf, SparkContext } +import org.apache.spark.{ Logging, SparkContext } import org.bdgenomics.adam.converters._ import org.bdgenomics.adam.instrumentation.Timers._ import org.bdgenomics.adam.io._ @@ -49,7 +50,6 @@ import org.apache.parquet.hadoop.ParquetInputFormat import org.apache.parquet.hadoop.util.ContextUtil import scala.collection.JavaConversions._ import scala.collection.Map -import scala.reflect.ClassTag import htsjdk.samtools.IndexedBamInputFormat object ADAMContext { @@ -234,32 +234,49 @@ class ADAMContext(val sc: SparkContext) extends Serializable with Logging { filePath: String): RDD[AlignmentRecord] = { val path = new Path(filePath) - val fs = FileSystem.get(path.toUri, sc.hadoopConfiguration) - val bamFiles = if (fs.isDirectory(path)) fs.listStatus(path) else fs.globStatus(path) - val (seqDict, readGroups) = bamFiles - .map(fs => fs.getPath) - .flatMap(fp => { - try { - // We need to separately read the header, so that we can inject the sequence dictionary - // data into each individual Read (see the argument to samRecordConverter.convert, - // below). - val samHeader = SAMHeaderReader.readSAMHeaderFrom(fp, sc.hadoopConfiguration) - log.info(s"Loaded header from $fp") - val sd = adamBamDictionaryLoad(samHeader) - val rg = adamBamLoadReadGroups(samHeader) - Some((sd, rg)) - } catch { - case e: Throwable => { - log.error( - s"Loading failed for $fp:n${e.getMessage}\n\t${e.getStackTrace.take(25).map(_.toString).mkString("\n\t")}" - ) - None + val fs = + Option( + FileSystem.get(path.toUri, sc.hadoopConfiguration) + ).getOrElse( + throw new FileNotFoundException( + s"Couldn't find filesystem for ${path.toUri} with Hadoop configuration ${sc.hadoopConfiguration}" + ) + ) + + val bamFiles = + Option( + if (fs.isDirectory(path)) fs.listStatus(path) else fs.globStatus(path) + ).getOrElse( + throw new FileNotFoundException( + s"Couldn't find any files matching ${path.toUri}" + ) + ) + + val (seqDict, readGroups) = + bamFiles + .map(fs => fs.getPath) + .flatMap(fp => { + try { + // We need to separately read the header, so that we can inject the sequence dictionary + // data into each individual Read (see the argument to samRecordConverter.convert, + // below). + val samHeader = SAMHeaderReader.readSAMHeaderFrom(fp, sc.hadoopConfiguration) + log.info("Loaded header from " + fp) + val sd = adamBamDictionaryLoad(samHeader) + val rg = adamBamLoadReadGroups(samHeader) + Some((sd, rg)) + } catch { + case e: Throwable => { + log.error( + s"Loading failed for $fp:n${e.getMessage}\n\t${e.getStackTrace.take(25).map(_.toString).mkString("\n\t")}" + ) + None + } } - } - }).reduce((kv1, kv2) => { - (kv1._1 ++ kv2._1, kv1._2 ++ kv2._2) - }) + }).reduce((kv1, kv2) => { + (kv1._1 ++ kv2._1, kv1._2 ++ kv2._2) + }) val job = HadoopUtil.newJob(sc) val records = sc.newAPIHadoopFile(filePath, classOf[AnySAMInputFormat], classOf[LongWritable],