Skip to content

Commit

Permalink
path upgrade, fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
ryan-williams committed Apr 20, 2017
1 parent 5567d67 commit b61fdd5
Show file tree
Hide file tree
Showing 9 changed files with 58 additions and 40 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ organization := "org.hammerlab.adam"

name := sparkName("core")

version := "0.23.0"
version := "0.23.1-SNAPSHOT"

addSparkDeps
publishTestJar
Expand Down
2 changes: 1 addition & 1 deletion project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
addSbtPlugin("org.hammerlab" % "sbt-parent" % "1.7.5")
addSbtPlugin("org.hammerlab" % "sbt-parent" % "1.7.7-SNAPSHOT")
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,12 @@ class ADAMBAMOutputFormat[K]
readSAMHeaderFrom(path, conf)

// now that we have the header set, we need to make a record reader
return new KeyIgnoringBAMRecordWriter[K](getDefaultWorkFile(context, ""),
new KeyIgnoringBAMRecordWriter[K](
getDefaultWorkFile(context, ""),
header,
true,
context)
context
)
}
}

Expand Down Expand Up @@ -84,10 +86,12 @@ class ADAMBAMOutputFormatHeaderLess[K]
readSAMHeaderFrom(path, conf)

// now that we have the header set, we need to make a record reader
return new KeyIgnoringBAMRecordWriter[K](getDefaultWorkFile(context, ""),
new KeyIgnoringBAMRecordWriter[K](
getDefaultWorkFile(context, ""),
header,
false,
context)
context
)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,11 @@
*/
package org.bdgenomics.adam.rdd.read

import htsjdk.samtools.SAMFileHeader
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.{ OutputFormat, RecordWriter, TaskAttemptContext }
import org.apache.spark.rdd.InstrumentedOutputFormat
import org.bdgenomics.adam.instrumentation.Timers
import org.seqdoop.hadoop_bam.{
KeyIgnoringAnySAMOutputFormat,
KeyIgnoringSAMRecordWriter,
SAMFormat,
SAMRecordWritable
}
import org.seqdoop.hadoop_bam.{ KeyIgnoringAnySAMOutputFormat, KeyIgnoringSAMRecordWriter, SAMFormat, SAMRecordWritable }

class ADAMSAMOutputFormat[K]
extends KeyIgnoringAnySAMOutputFormat[K](SAMFormat.valueOf("SAM")) with Serializable {
Expand All @@ -44,10 +38,12 @@ class ADAMSAMOutputFormat[K]
readSAMHeaderFrom(path, conf)

// now that we have the header set, we need to make a record reader
return new KeyIgnoringSAMRecordWriter(getDefaultWorkFile(context, ""),
new KeyIgnoringSAMRecordWriter(
getDefaultWorkFile(context, ""),
header,
true,
context)
context
)
}
}

Expand All @@ -71,10 +67,12 @@ class ADAMSAMOutputFormatHeaderLess[K]
readSAMHeaderFrom(path, conf)

// now that we have the header set, we need to make a record reader
return new KeyIgnoringSAMRecordWriter(getDefaultWorkFile(context, ""),
new KeyIgnoringSAMRecordWriter(
getDefaultWorkFile(context, ""),
header,
false,
context)
context
)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class ADAMVCFOutputFormat[K]
val path = new Path(conf.get(HEADER_PATH_KEY))

// read the header file
readHeaderFrom(path, FileSystem.get(conf))
readHeaderFrom(path, path.getFileSystem(conf))

// return record writer
new KeyIgnoringVCFRecordWriter[K](
Expand Down Expand Up @@ -70,7 +70,7 @@ class ADAMHeaderlessVCFOutputFormat[K]
val path = new Path(conf.get(HEADER_PATH_KEY))

// read the header file
readHeaderFrom(path, FileSystem.get(conf))
readHeaderFrom(path, path.getFileSystem(conf))

// return record writer
new KeyIgnoringVCFRecordWriter[K](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ case class VariantContextRDD(rdd: RDD[VariantContext],
def saveAsVcf(path: Path,
asSingleFile: Boolean = false,
stringency: ValidationStringency = LENIENT)(implicit factory: Factory) {

val vcfFormat = inferFromFilePath(path.toString)
assert(vcfFormat == VCF, "BCF not yet supported") // TODO: Add BCF support

Expand All @@ -152,17 +153,19 @@ case class VariantContextRDD(rdd: RDD[VariantContext],
}

// make header
val header = new VCFHeader(
headerLines.toSet,
samples.map(_.getSampleId))
val header =
new VCFHeader(
headerLines.toSet,
samples.map(_.getSampleId)
)

header.setSequenceDictionary(sequences.toSAMSequenceDictionary)

// write header
val headPath = path + "_head"

// configure things for saving to disk
val conf = rdd.context.hadoopConfiguration
// val fs = headPath.getFileSystem(conf)

// write vcf header
VCFHeaderUtils.write(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,18 +54,23 @@ class ParallelFileMergerSuite
}

test("get the size of several files") {
val files = Seq(testFile("unmapped.sam"),
testFile("small.sam"))
val files =
Seq(
testFile("unmapped.sam"),
testFile("small.sam")
)
.map(new Path(_))

val fileSizes = Seq(29408, 3093)

val fs = FileSystem.get(sc.hadoopConfiguration)
val (size, sizes) = getFullSize(fs, files)

assert(size === fileSizes.sum.toLong)

sizes.map(_._2)
.zip(fileSizes)
.foreach(p => assert(p._1 === p._2))
.foreach(p assert(p._1 === p._2))
}

test("block size must be positive and non-zero when trying to merge files") {
Expand All @@ -81,43 +86,51 @@ class ParallelFileMergerSuite
}

test("if two files are both below the block size, they should merge into one shard") {
val files = Seq(testFile("unmapped.sam"),
testFile("small.sam"))
val files =
Seq(
testFile("unmapped.sam"),
testFile("small.sam")
)
.map(new Path(_))

val fs = FileSystem.get(sc.hadoopConfiguration)
val fileSizesMap = files.map(f => (f, fs.getFileStatus(f).getLen().toInt))
.toMap
val fileSizesMap =
files
.map(f (f, fs.getFileStatus(f).getLen().toInt))
.toMap

val (_, filesWithSizes) = getFullSize(fs, files)
val merges = generateMerges(Int.MaxValue, filesWithSizes)
assert(merges.size === 1)
val (index, paths) = merges.head
assert(index === 0)
assert(paths.size === 2)
paths.foreach(t => {
paths.foreach { t
val (file, start, end) = t
val path = new Path(file)
assert(start === 0)
assert(fileSizesMap.contains(path))

val fileSize = fileSizesMap(path)
assert(end === fileSize - 1)
})
}
}

test("merge two files where one is greater than the block size") {

// unmapped.sam -> slightly under 29k
// small.sam -> 3k
val files = Seq(testFile("unmapped.sam"),
testFile("small.sam"))
.map(new Path(_))
val files =
Seq(
testFile("unmapped.sam"),
testFile("small.sam")
)
.map(new Path(_))

val fs = FileSystem.get(sc.hadoopConfiguration)
val fileSizesMap =
files
.map(f => (f, fs.getFileStatus(f).getLen().toInt))
.map(f (f, fs.getFileStatus(f).getLen().toInt))
.toMap

val (_, filesWithSizes) = getFullSize(fs, files)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class FragmentRDDSuite extends ADAMFunSuite {
val pipedRdd: AlignmentRecordRDD =
ardd.pipe(
"python $0",
files = Seq(scriptPath.toString)
files = Seq(scriptPath.path.toString)
)

val newRecords = pipedRdd.rdd.count
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -707,11 +707,11 @@ class AlignmentRecordRDDSuite

val pipedRdd: AlignmentRecordRDD =
ardd.pipe(
s"/bin/bash $scriptPath",
s"/bin/bash ${scriptPath.path}",
environment =
Map(
"INPUT_PATH" smallSam.toString,
"OUTPUT_PATH" writePath.toString
"INPUT_PATH" smallSam.path.toString,
"OUTPUT_PATH" writePath.path.toString
)
)

Expand Down

0 comments on commit b61fdd5

Please sign in to comment.