Skip to content

Commit

Permalink
use hidden .adam/.{sq,rg}dict.avro paths for Avro-formatted dictionar…
Browse files Browse the repository at this point in the history
…ies, fixes bigdatagenomics#945
  • Loading branch information
heuermh committed Mar 24, 2016
1 parent 2e843de commit 04b601e
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 62 deletions.
Expand Up @@ -459,8 +459,8 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
* if the reads are aligned, and the record group dictionary for the reads
* if one is available.
* @note The sequence dictionary is read from an avro file stored at
* filePath.seqdict and the record group dictionary is read from an
* avro file stored at filePath.rgdict. These files are pure avro,
* filePath/.seqdict.avro and the record group dictionary is read from an
* avro file stored at filePath/.rgdict.avro. These files are pure avro,
* not Parquet.
* @see loadAlignments
*/
Expand All @@ -471,9 +471,9 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log

// load from disk
val rdd = loadParquet[AlignmentRecord](filePath, predicate, projection)
val avroSd = loadAvro[Contig]("%s.seqdict".format(filePath),
val avroSd = loadAvro[Contig]("%s/.seqdict.avro".format(filePath),
Contig.SCHEMA$)
val avroRgd = loadAvro[RecordGroupMetadata]("%s.rgdict".format(filePath),
val avroRgd = loadAvro[RecordGroupMetadata]("%s/.rgdict.avro".format(filePath),
RecordGroupMetadata.SCHEMA$)

// convert avro to sequence dictionary
Expand Down
Expand Up @@ -162,9 +162,9 @@ class AlignmentRecordRDDFunctions(rdd: RDD[AlignmentRecord])
*
* The RDD is written as a directory of Parquet files, with
* Parquet configuration described by the input param args.
* The provided sequence dictionary is written at args.outputPath.seqdict
* The provided sequence dictionary is written at args.outputPath/.seqdict.avro
* while the provided record group dictionary is written at
* args.outputPath.rgdict. These two files are written as Avro binary.
* args.outputPath/.rgdict.avro. These two files are written as Avro binary.
*
* @param args Save configuration arguments.
* @param sd Sequence dictionary describing the contigs these reads are
Expand All @@ -178,25 +178,26 @@ class AlignmentRecordRDDFunctions(rdd: RDD[AlignmentRecord])
def saveAsParquet(args: ADAMSaveAnyArgs,
sd: SequenceDictionary,
rgd: RecordGroupDictionary) = {
// convert sequence dictionary and record group dictionaries to avro form

// save rdd itself as parquet
rdd.adamParquetSave(args)

// then convert sequence dictionary and record group dictionaries to avro form
val contigs = sd.records
.map(SequenceRecord.toADAMContig)
.toSeq
val rgMetadata = rgd.recordGroups
.map(_.toMetadata)

// write the sequence dictionary and record group dictionary to disk
saveAvro("%s.seqdict".format(args.outputPath),
// and write the sequence dictionary and record group dictionary to disk
saveAvro("%s/.seqdict.avro".format(args.outputPath),
rdd.context,
Contig.SCHEMA$,
contigs)
saveAvro("%s.rgdict".format(args.outputPath),
saveAvro("%s/.rgdict.avro".format(args.outputPath),
rdd.context,
RecordGroupMetadata.SCHEMA$,
rgMetadata)

// save rdd itself as parquet
rdd.adamParquetSave(args)
}

/**
Expand Down
Expand Up @@ -24,50 +24,8 @@ import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.rdd.TestSaveArgs
import org.bdgenomics.adam.util.{ ParquetLogger, ADAMFunSuite }
import org.bdgenomics.formats.avro.AlignmentRecord
import org.scalatest.BeforeAndAfter

class FieldEnumerationSuite extends ADAMFunSuite with BeforeAndAfter {

var readsFilepath: String = null
var readsParquetFile: File = null

sparkBefore("fieldenumerationsuite_before") {
ParquetLogger.hadoopLoggerLevel(Level.SEVERE)

readsFilepath = resourcePath("reads12.sam")
val file = new File(readsFilepath)

readsParquetFile = new File(file.getParentFile, "test_reads12_parquet.adam")

// Erase old test files, if they exist.
if (readsParquetFile.exists())
cleanParquet(readsParquetFile)

// Convert the reads12.sam file into a parquet file
val rRdd = sc.loadBam(readsFilepath)
val bamReads = rRdd.rdd
val sd = rRdd.sequences
val rgd = rRdd.recordGroups
bamReads.saveAsParquet(TestSaveArgs(readsParquetFile.getAbsolutePath), sd, rgd)
}

after {
cleanParquet(readsParquetFile)
}

/**
* We can't just file.delete() a parquet "file", since it's often a directory.
* @param dir The directory (or, possibly, file) to delete
*/
def cleanParquet(dir: File) {
if (dir.isDirectory) {
dir.listFiles().foreach(file =>
file.delete())
dir.delete()
} else {
dir.delete()
}
}
class FieldEnumerationSuite extends ADAMFunSuite {

test("Empty projections are illegal") {
intercept[AssertionError] {
Expand All @@ -76,21 +34,26 @@ class FieldEnumerationSuite extends ADAMFunSuite with BeforeAndAfter {
}

sparkTest("Simple projection on Read works") {
val readsFilepath = resourcePath("reads12.sam")
val readsParquetFilepath = tmpFile("reads12.adam")

val p1 = Projection(AlignmentRecordField.readName)

val reads1: RDD[AlignmentRecord] = sc.loadAlignments(readsParquetFile.getAbsolutePath, projection = Some(p1))
// Convert the reads12.sam file into a parquet file
val rRdd = sc.loadBam(readsFilepath)
val bamReads = rRdd.rdd
val sd = rRdd.sequences
val rgd = rRdd.recordGroups
bamReads.saveAsParquet(TestSaveArgs(readsParquetFilepath), sd, rgd)

val p1 = Projection(AlignmentRecordField.readName)
val reads1: RDD[AlignmentRecord] = sc.loadAlignments(readsParquetFilepath, projection = Some(p1))
assert(reads1.count() === 200)

val first1 = reads1.first()
assert(first1.getReadName.toString === "simread:1:26472783:false")
assert(first1.getReadMapped === false)

val p2 = Projection(AlignmentRecordField.readName, AlignmentRecordField.readMapped)

val reads2: RDD[AlignmentRecord] = sc.loadAlignments(readsParquetFile.getAbsolutePath, projection = Some(p2))

val reads2: RDD[AlignmentRecord] = sc.loadAlignments(readsParquetFilepath, projection = Some(p2))
assert(reads2.count() === 200)

val first2 = reads2.first()
Expand Down
Expand Up @@ -17,6 +17,8 @@
*/
package org.bdgenomics.adam.util

import java.io.File

import java.nio.file.Files

import com.google.common.io.Resources
Expand Down

0 comments on commit 04b601e

Please sign in to comment.