diff --git a/build.sbt b/build.sbt index fd12caf618..cb5f28f26a 100644 --- a/build.sbt +++ b/build.sbt @@ -2,7 +2,7 @@ organization := "org.hammerlab.adam" name := sparkName("core") -version := "0.23.0" +version := "0.23.1-SNAPSHOT" addSparkDeps publishTestJar diff --git a/project/plugins.sbt b/project/plugins.sbt index 14c0989e00..cf38390554 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1 +1 @@ -addSbtPlugin("org.hammerlab" % "sbt-parent" % "1.7.5") +addSbtPlugin("org.hammerlab" % "sbt-parent" % "1.7.7-SNAPSHOT") diff --git a/src/main/scala/org/bdgenomics/adam/rdd/read/ADAMBAMOutputFormat.scala b/src/main/scala/org/bdgenomics/adam/rdd/read/ADAMBAMOutputFormat.scala index b0380b24b3..4f0fc29ac7 100644 --- a/src/main/scala/org/bdgenomics/adam/rdd/read/ADAMBAMOutputFormat.scala +++ b/src/main/scala/org/bdgenomics/adam/rdd/read/ADAMBAMOutputFormat.scala @@ -47,10 +47,12 @@ class ADAMBAMOutputFormat[K] readSAMHeaderFrom(path, conf) // now that we have the header set, we need to make a record reader - return new KeyIgnoringBAMRecordWriter[K](getDefaultWorkFile(context, ""), + new KeyIgnoringBAMRecordWriter[K]( + getDefaultWorkFile(context, ""), header, true, - context) + context + ) } } @@ -84,10 +86,12 @@ class ADAMBAMOutputFormatHeaderLess[K] readSAMHeaderFrom(path, conf) // now that we have the header set, we need to make a record reader - return new KeyIgnoringBAMRecordWriter[K](getDefaultWorkFile(context, ""), + new KeyIgnoringBAMRecordWriter[K]( + getDefaultWorkFile(context, ""), header, false, - context) + context + ) } } diff --git a/src/main/scala/org/bdgenomics/adam/rdd/read/ADAMSAMOutputFormat.scala b/src/main/scala/org/bdgenomics/adam/rdd/read/ADAMSAMOutputFormat.scala index b035f739f2..8c4828dc1a 100644 --- a/src/main/scala/org/bdgenomics/adam/rdd/read/ADAMSAMOutputFormat.scala +++ b/src/main/scala/org/bdgenomics/adam/rdd/read/ADAMSAMOutputFormat.scala @@ -17,17 +17,11 @@ */ package org.bdgenomics.adam.rdd.read -import htsjdk.samtools.SAMFileHeader import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce.{ OutputFormat, RecordWriter, TaskAttemptContext } import org.apache.spark.rdd.InstrumentedOutputFormat import org.bdgenomics.adam.instrumentation.Timers -import org.seqdoop.hadoop_bam.{ - KeyIgnoringAnySAMOutputFormat, - KeyIgnoringSAMRecordWriter, - SAMFormat, - SAMRecordWritable -} +import org.seqdoop.hadoop_bam.{ KeyIgnoringAnySAMOutputFormat, KeyIgnoringSAMRecordWriter, SAMFormat, SAMRecordWritable } class ADAMSAMOutputFormat[K] extends KeyIgnoringAnySAMOutputFormat[K](SAMFormat.valueOf("SAM")) with Serializable { @@ -44,10 +38,12 @@ class ADAMSAMOutputFormat[K] readSAMHeaderFrom(path, conf) // now that we have the header set, we need to make a record reader - return new KeyIgnoringSAMRecordWriter(getDefaultWorkFile(context, ""), + new KeyIgnoringSAMRecordWriter( + getDefaultWorkFile(context, ""), header, true, - context) + context + ) } } @@ -71,10 +67,12 @@ class ADAMSAMOutputFormatHeaderLess[K] readSAMHeaderFrom(path, conf) // now that we have the header set, we need to make a record reader - return new KeyIgnoringSAMRecordWriter(getDefaultWorkFile(context, ""), + new KeyIgnoringSAMRecordWriter( + getDefaultWorkFile(context, ""), header, false, - context) + context + ) } } diff --git a/src/main/scala/org/bdgenomics/adam/rdd/variant/ADAMVCFOutputFormat.scala b/src/main/scala/org/bdgenomics/adam/rdd/variant/ADAMVCFOutputFormat.scala index 806d5bc2de..280f21dcb0 100644 --- a/src/main/scala/org/bdgenomics/adam/rdd/variant/ADAMVCFOutputFormat.scala +++ b/src/main/scala/org/bdgenomics/adam/rdd/variant/ADAMVCFOutputFormat.scala @@ -38,7 +38,7 @@ class ADAMVCFOutputFormat[K] val path = new Path(conf.get(HEADER_PATH_KEY)) // read the header file - readHeaderFrom(path, FileSystem.get(conf)) + readHeaderFrom(path, path.getFileSystem(conf)) // return record writer new KeyIgnoringVCFRecordWriter[K]( @@ -70,7 +70,7 @@ class ADAMHeaderlessVCFOutputFormat[K] val path = new Path(conf.get(HEADER_PATH_KEY)) // read the header file - readHeaderFrom(path, FileSystem.get(conf)) + readHeaderFrom(path, path.getFileSystem(conf)) // return record writer new KeyIgnoringVCFRecordWriter[K]( diff --git a/src/main/scala/org/bdgenomics/adam/rdd/variant/VariantContextRDD.scala b/src/main/scala/org/bdgenomics/adam/rdd/variant/VariantContextRDD.scala index 5d6b5c8c2d..96d9905ad2 100644 --- a/src/main/scala/org/bdgenomics/adam/rdd/variant/VariantContextRDD.scala +++ b/src/main/scala/org/bdgenomics/adam/rdd/variant/VariantContextRDD.scala @@ -128,6 +128,7 @@ case class VariantContextRDD(rdd: RDD[VariantContext], def saveAsVcf(path: Path, asSingleFile: Boolean = false, stringency: ValidationStringency = LENIENT)(implicit factory: Factory) { + val vcfFormat = inferFromFilePath(path.toString) assert(vcfFormat == VCF, "BCF not yet supported") // TODO: Add BCF support @@ -152,9 +153,12 @@ case class VariantContextRDD(rdd: RDD[VariantContext], } // make header - val header = new VCFHeader( - headerLines.toSet, - samples.map(_.getSampleId)) + val header = + new VCFHeader( + headerLines.toSet, + samples.map(_.getSampleId) + ) + header.setSequenceDictionary(sequences.toSAMSequenceDictionary) // write header @@ -162,7 +166,6 @@ case class VariantContextRDD(rdd: RDD[VariantContext], // configure things for saving to disk val conf = rdd.context.hadoopConfiguration -// val fs = headPath.getFileSystem(conf) // write vcf header VCFHeaderUtils.write( diff --git a/src/test/scala/org/bdgenomics/adam/rdd/ParallelFileMergerSuite.scala b/src/test/scala/org/bdgenomics/adam/rdd/ParallelFileMergerSuite.scala index 1d6e668155..167ebb6847 100644 --- a/src/test/scala/org/bdgenomics/adam/rdd/ParallelFileMergerSuite.scala +++ b/src/test/scala/org/bdgenomics/adam/rdd/ParallelFileMergerSuite.scala @@ -54,18 +54,23 @@ class ParallelFileMergerSuite } test("get the size of several files") { - val files = Seq(testFile("unmapped.sam"), - testFile("small.sam")) + val files = + Seq( + testFile("unmapped.sam"), + testFile("small.sam") + ) .map(new Path(_)) + val fileSizes = Seq(29408, 3093) val fs = FileSystem.get(sc.hadoopConfiguration) val (size, sizes) = getFullSize(fs, files) assert(size === fileSizes.sum.toLong) + sizes.map(_._2) .zip(fileSizes) - .foreach(p => assert(p._1 === p._2)) + .foreach(p ⇒ assert(p._1 === p._2)) } test("block size must be positive and non-zero when trying to merge files") { @@ -81,13 +86,18 @@ class ParallelFileMergerSuite } test("if two files are both below the block size, they should merge into one shard") { - val files = Seq(testFile("unmapped.sam"), - testFile("small.sam")) + val files = + Seq( + testFile("unmapped.sam"), + testFile("small.sam") + ) .map(new Path(_)) val fs = FileSystem.get(sc.hadoopConfiguration) - val fileSizesMap = files.map(f => (f, fs.getFileStatus(f).getLen().toInt)) - .toMap + val fileSizesMap = + files + .map(f ⇒ (f, fs.getFileStatus(f).getLen().toInt)) + .toMap val (_, filesWithSizes) = getFullSize(fs, files) val merges = generateMerges(Int.MaxValue, filesWithSizes) @@ -95,7 +105,7 @@ class ParallelFileMergerSuite val (index, paths) = merges.head assert(index === 0) assert(paths.size === 2) - paths.foreach(t => { + paths.foreach { t ⇒ val (file, start, end) = t val path = new Path(file) assert(start === 0) @@ -103,21 +113,24 @@ class ParallelFileMergerSuite val fileSize = fileSizesMap(path) assert(end === fileSize - 1) - }) + } } test("merge two files where one is greater than the block size") { // unmapped.sam -> slightly under 29k // small.sam -> 3k - val files = Seq(testFile("unmapped.sam"), - testFile("small.sam")) - .map(new Path(_)) + val files = + Seq( + testFile("unmapped.sam"), + testFile("small.sam") + ) + .map(new Path(_)) val fs = FileSystem.get(sc.hadoopConfiguration) val fileSizesMap = files - .map(f => (f, fs.getFileStatus(f).getLen().toInt)) + .map(f ⇒ (f, fs.getFileStatus(f).getLen().toInt)) .toMap val (_, filesWithSizes) = getFullSize(fs, files) diff --git a/src/test/scala/org/bdgenomics/adam/rdd/fragment/FragmentRDDSuite.scala b/src/test/scala/org/bdgenomics/adam/rdd/fragment/FragmentRDDSuite.scala index 6215ee107c..48bacb0048 100644 --- a/src/test/scala/org/bdgenomics/adam/rdd/fragment/FragmentRDDSuite.scala +++ b/src/test/scala/org/bdgenomics/adam/rdd/fragment/FragmentRDDSuite.scala @@ -41,7 +41,7 @@ class FragmentRDDSuite extends ADAMFunSuite { val pipedRdd: AlignmentRecordRDD = ardd.pipe( "python $0", - files = Seq(scriptPath.toString) + files = Seq(scriptPath.path.toString) ) val newRecords = pipedRdd.rdd.count diff --git a/src/test/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDDSuite.scala b/src/test/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDDSuite.scala index 35a4f99d21..ca65a1f509 100644 --- a/src/test/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDDSuite.scala +++ b/src/test/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDDSuite.scala @@ -707,11 +707,11 @@ class AlignmentRecordRDDSuite val pipedRdd: AlignmentRecordRDD = ardd.pipe( - s"/bin/bash $scriptPath", + s"/bin/bash ${scriptPath.path}", environment = Map( - "INPUT_PATH" → smallSam.toString, - "OUTPUT_PATH" → writePath.toString + "INPUT_PATH" → smallSam.path.toString, + "OUTPUT_PATH" → writePath.path.toString ) )