Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use .adam/_{seq,rg}dict.avro paths for Avro-formatted dictionaries #978

Merged
merged 1 commit into from
Mar 29, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -459,8 +459,8 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
* if the reads are aligned, and the record group dictionary for the reads
* if one is available.
* @note The sequence dictionary is read from an avro file stored at
* filePath.seqdict and the record group dictionary is read from an
* avro file stored at filePath.rgdict. These files are pure avro,
* filePath/_seqdict.avro and the record group dictionary is read from an
* avro file stored at filePath/_rgdict.avro. These files are pure avro,
* not Parquet.
* @see loadAlignments
*/
Expand All @@ -471,9 +471,9 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log

// load from disk
val rdd = loadParquet[AlignmentRecord](filePath, predicate, projection)
val avroSd = loadAvro[Contig]("%s.seqdict".format(filePath),
val avroSd = loadAvro[Contig]("%s/_seqdict.avro".format(filePath),
Contig.SCHEMA$)
val avroRgd = loadAvro[RecordGroupMetadata]("%s.rgdict".format(filePath),
val avroRgd = loadAvro[RecordGroupMetadata]("%s/_rgdict.avro".format(filePath),
RecordGroupMetadata.SCHEMA$)

// convert avro to sequence dictionary
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,9 @@ class AlignmentRecordRDDFunctions(rdd: RDD[AlignmentRecord])
*
* The RDD is written as a directory of Parquet files, with
* Parquet configuration described by the input param args.
* The provided sequence dictionary is written at args.outputPath.seqdict
* The provided sequence dictionary is written at args.outputPath/_seqdict.avro
* while the provided record group dictionary is written at
* args.outputPath.rgdict. These two files are written as Avro binary.
* args.outputPath/_rgdict.avro. These two files are written as Avro binary.
*
* @param args Save configuration arguments.
* @param sd Sequence dictionary describing the contigs these reads are
Expand All @@ -178,25 +178,26 @@ class AlignmentRecordRDDFunctions(rdd: RDD[AlignmentRecord])
def saveAsParquet(args: ADAMSaveAnyArgs,
sd: SequenceDictionary,
rgd: RecordGroupDictionary) = {
// convert sequence dictionary and record group dictionaries to avro form

// save rdd itself as parquet
rdd.adamParquetSave(args)

// then convert sequence dictionary and record group dictionaries to avro form
val contigs = sd.records
.map(SequenceRecord.toADAMContig)
.toSeq
val rgMetadata = rgd.recordGroups
.map(_.toMetadata)

// write the sequence dictionary and record group dictionary to disk
saveAvro("%s.seqdict".format(args.outputPath),
// and write the sequence dictionary and record group dictionary to disk
saveAvro("%s/_seqdict.avro".format(args.outputPath),
rdd.context,
Contig.SCHEMA$,
contigs)
saveAvro("%s.rgdict".format(args.outputPath),
saveAvro("%s/_rgdict.avro".format(args.outputPath),
rdd.context,
RecordGroupMetadata.SCHEMA$,
rgMetadata)

// save rdd itself as parquet
rdd.adamParquetSave(args)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,50 +24,8 @@ import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.rdd.TestSaveArgs
import org.bdgenomics.adam.util.{ ParquetLogger, ADAMFunSuite }
import org.bdgenomics.formats.avro.AlignmentRecord
import org.scalatest.BeforeAndAfter

class FieldEnumerationSuite extends ADAMFunSuite with BeforeAndAfter {

var readsFilepath: String = null
var readsParquetFile: File = null

sparkBefore("fieldenumerationsuite_before") {
ParquetLogger.hadoopLoggerLevel(Level.SEVERE)

readsFilepath = resourcePath("reads12.sam")
val file = new File(readsFilepath)

readsParquetFile = new File(file.getParentFile, "test_reads12_parquet.adam")

// Erase old test files, if they exist.
if (readsParquetFile.exists())
cleanParquet(readsParquetFile)

// Convert the reads12.sam file into a parquet file
val rRdd = sc.loadBam(readsFilepath)
val bamReads = rRdd.rdd
val sd = rRdd.sequences
val rgd = rRdd.recordGroups
bamReads.saveAsParquet(TestSaveArgs(readsParquetFile.getAbsolutePath), sd, rgd)
}

after {
cleanParquet(readsParquetFile)
}

/**
* We can't just file.delete() a parquet "file", since it's often a directory.
* @param dir The directory (or, possibly, file) to delete
*/
def cleanParquet(dir: File) {
if (dir.isDirectory) {
dir.listFiles().foreach(file =>
file.delete())
dir.delete()
} else {
dir.delete()
}
}
class FieldEnumerationSuite extends ADAMFunSuite {

test("Empty projections are illegal") {
intercept[AssertionError] {
Expand All @@ -76,21 +34,26 @@ class FieldEnumerationSuite extends ADAMFunSuite with BeforeAndAfter {
}

sparkTest("Simple projection on Read works") {
val readsFilepath = resourcePath("reads12.sam")
val readsParquetFilepath = tmpFile("reads12.adam")

val p1 = Projection(AlignmentRecordField.readName)

val reads1: RDD[AlignmentRecord] = sc.loadAlignments(readsParquetFile.getAbsolutePath, projection = Some(p1))
// Convert the reads12.sam file into a parquet file
val rRdd = sc.loadBam(readsFilepath)
val bamReads = rRdd.rdd
val sd = rRdd.sequences
val rgd = rRdd.recordGroups
bamReads.saveAsParquet(TestSaveArgs(readsParquetFilepath), sd, rgd)

val p1 = Projection(AlignmentRecordField.readName)
val reads1: RDD[AlignmentRecord] = sc.loadAlignments(readsParquetFilepath, projection = Some(p1))
assert(reads1.count() === 200)

val first1 = reads1.first()
assert(first1.getReadName.toString === "simread:1:26472783:false")
assert(first1.getReadMapped === false)

val p2 = Projection(AlignmentRecordField.readName, AlignmentRecordField.readMapped)

val reads2: RDD[AlignmentRecord] = sc.loadAlignments(readsParquetFile.getAbsolutePath, projection = Some(p2))

val reads2: RDD[AlignmentRecord] = sc.loadAlignments(readsParquetFilepath, projection = Some(p2))
assert(reads2.count() === 200)

val first2 = reads2.first()
Expand Down