Skip to content

Commit

Permalink
Address PR comments - part 1
Browse files Browse the repository at this point in the history
  • Loading branch information
jpdna authored and Paschall committed Jan 23, 2018
1 parent 3249d97 commit 40d43b8
Show file tree
Hide file tree
Showing 6 changed files with 108 additions and 125 deletions.
205 changes: 92 additions & 113 deletions adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala
Expand Up @@ -18,20 +18,15 @@
package org.bdgenomics.adam.rdd

import java.io.{ File, FileNotFoundException, InputStream }

import htsjdk.samtools.{ SAMFileHeader, SAMProgramRecord, ValidationStringency }
import htsjdk.samtools.util.Locatable
import htsjdk.variant.vcf.{
VCFHeader,
VCFCompoundHeaderLine,
VCFFormatHeaderLine,
VCFHeaderLine,
VCFInfoHeaderLine
}
import htsjdk.variant.vcf.{ VCFCompoundHeaderLine, VCFFormatHeaderLine, VCFHeader, VCFHeaderLine, VCFInfoHeaderLine }
import org.apache.avro.Schema
import org.apache.avro.file.DataFileStream
import org.apache.avro.generic.{ GenericDatumReader, GenericRecord, IndexedRecord }
import org.apache.avro.specific.{ SpecificDatumReader, SpecificRecord, SpecificRecordBase }
import org.apache.hadoop.fs.{ FileSystem, Path, PathFilter }
import org.apache.hadoop.fs.{ FileStatus, FileSystem, Path, PathFilter }
import org.apache.hadoop.io.{ LongWritable, Text }
import org.apache.hadoop.io.compress.CompressionCodecFactory
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
Expand All @@ -42,72 +37,30 @@ import org.apache.parquet.hadoop.util.ContextUtil
import org.apache.spark.SparkContext
import org.apache.spark.rdd.MetricsContext._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{ SparkSession, Dataset }
import org.apache.spark.sql.{ Dataset, SparkSession }
import org.bdgenomics.adam.converters._
import org.bdgenomics.adam.instrumentation.Timers._
import org.bdgenomics.adam.io._
import org.bdgenomics.adam.models._
import org.bdgenomics.adam.projections.{
FeatureField,
Projection
}
import org.bdgenomics.adam.rdd.contig.{
DatasetBoundNucleotideContigFragmentRDD,
NucleotideContigFragmentRDD,
ParquetUnboundNucleotideContigFragmentRDD,
RDDBoundNucleotideContigFragmentRDD
}
import org.bdgenomics.adam.projections.{ FeatureField, Projection }
import org.bdgenomics.adam.rdd.contig.{ DatasetBoundNucleotideContigFragmentRDD, NucleotideContigFragmentRDD, ParquetUnboundNucleotideContigFragmentRDD, RDDBoundNucleotideContigFragmentRDD }
import org.bdgenomics.adam.rdd.feature._
import org.bdgenomics.adam.rdd.fragment.{
DatasetBoundFragmentRDD,
FragmentRDD,
ParquetUnboundFragmentRDD,
RDDBoundFragmentRDD
}
import org.bdgenomics.adam.rdd.read.{
AlignmentRecordRDD,
DatasetBoundAlignmentRecordRDD,
RepairPartitions,
ParquetUnboundAlignmentRecordRDD,
RDDBoundAlignmentRecordRDD
}
import org.bdgenomics.adam.rdd.fragment.{ DatasetBoundFragmentRDD, FragmentRDD, ParquetUnboundFragmentRDD, RDDBoundFragmentRDD }
import org.bdgenomics.adam.rdd.read.{ AlignmentRecordRDD, DatasetBoundAlignmentRecordRDD, ParquetUnboundAlignmentRecordRDD, RDDBoundAlignmentRecordRDD, RepairPartitions }
import org.bdgenomics.adam.rdd.variant._
import org.bdgenomics.adam.rich.RichAlignmentRecord
import org.bdgenomics.adam.sql.{
AlignmentRecord => AlignmentRecordProduct,
Feature => FeatureProduct,
Fragment => FragmentProduct,
Genotype => GenotypeProduct,
NucleotideContigFragment => NucleotideContigFragmentProduct,
Variant => VariantProduct
}
import org.bdgenomics.adam.sql.{ AlignmentRecord => AlignmentRecordProduct, Feature => FeatureProduct, Fragment => FragmentProduct, Genotype => GenotypeProduct, NucleotideContigFragment => NucleotideContigFragmentProduct, Variant => VariantProduct }
import org.bdgenomics.adam.util.FileExtensions._
import org.bdgenomics.adam.util.{
GenomeFileReader,
ReferenceContigMap,
ReferenceFile,
SequenceDictionaryReader,
TwoBitFile
}
import org.bdgenomics.formats.avro.{
AlignmentRecord,
Contig,
Feature,
Fragment,
Genotype,
NucleotideContigFragment,
ProcessingStep,
RecordGroup => RecordGroupMetadata,
Sample,
Variant
}
import org.bdgenomics.adam.util.{ GenomeFileReader, ReferenceContigMap, ReferenceFile, SequenceDictionaryReader, TwoBitFile }
import org.bdgenomics.formats.avro.{ AlignmentRecord, Contig, Feature, Fragment, Genotype, NucleotideContigFragment, ProcessingStep, Sample, Variant, RecordGroup => RecordGroupMetadata }
import org.bdgenomics.utils.instrumentation.Metrics
import org.bdgenomics.utils.io.LocalFileByteAccess
import org.bdgenomics.utils.misc.{ HadoopUtil, Logging }
import org.json4s.DefaultFormats
import org.json4s.jackson.JsonMethods._
import org.seqdoop.hadoop_bam._
import org.seqdoop.hadoop_bam.util._

import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag
Expand Down Expand Up @@ -1369,7 +1322,7 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
}
fs.listStatus(path, filter)
} else {
val paths = fs.globStatus(path)
val paths: Array[FileStatus] = fs.globStatus(path)
if (paths == null || paths.isEmpty) {
throw new FileNotFoundException(
s"Couldn't find any files matching ${path.toUri}"
Expand Down Expand Up @@ -1874,10 +1827,9 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
* @param pathName The path name to load alignment records from.
* Globs/directories are supported.
* @param regions Optional list of genomic regions to load.
* @param addChrPrefix Flag to add "chr" prefix to contigs
* @return Returns an AlignmentRecordRDD.
*/
def loadPartitionedParquetAlignments(pathName: String, regions: Option[Iterable[ReferenceRegion]] = None): AlignmentRecordRDD = {
def loadPartitionedParquetAlignments(pathName: String, regions: Iterable[ReferenceRegion] = Iterable.empty): AlignmentRecordRDD = {

require(checkPartitionedParquetFlag(pathName),
"Input Parquet files (%s) are not partitioned.".format(pathName))
Expand All @@ -1891,10 +1843,15 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
val pgs = loadAvroPrograms(pathName)
val reads: AlignmentRecordRDD = ParquetUnboundAlignmentRecordRDD(sc, pathName, sd, rgd, pgs)

val datasetBoundAlignmentRecordRDD: AlignmentRecordRDD = regions match {
case Some(x) => DatasetBoundAlignmentRecordRDD(reads.dataset.filter(referenceRegionsToDatasetQueryString(x)), reads.sequences, reads.recordGroups, reads.processingSteps)
case _ => DatasetBoundAlignmentRecordRDD(reads.dataset, reads.sequences, reads.recordGroups, reads.processingSteps)
}
val datasetBoundAlignmentRecordRDD =
if (regions.nonEmpty) {
DatasetBoundAlignmentRecordRDD(reads.dataset.filter(referenceRegionsToDatasetQueryString(regions)),
reads.sequences,
reads.recordGroups,
reads.processingSteps)
} else {
DatasetBoundAlignmentRecordRDD(reads.dataset, reads.sequences, reads.recordGroups, reads.processingSteps)
}
datasetBoundAlignmentRecordRDD
}

Expand Down Expand Up @@ -2287,10 +2244,9 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
* @param pathName The path name to load alignment records from.
* Globs/directories are supported.
* @param regions Optional list of genomic regions to load.
* @param addChrPrefix Flag to add "chr" prefix to contigs
* @return Returns a GenotypeRDD.
*/
def loadPartitionedParquetGenotypes(pathName: String, regions: Option[Iterable[ReferenceRegion]] = None): GenotypeRDD = {
def loadPartitionedParquetGenotypes(pathName: String, regions: Iterable[ReferenceRegion] = Iterable.empty): GenotypeRDD = {
require(checkPartitionedParquetFlag(pathName),
"Input Parquet files (%s) are not partitioned.".format(pathName))
// load header lines
Expand All @@ -2302,11 +2258,15 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log

val genotypes = ParquetUnboundGenotypeRDD(sc, pathName, sd, samples, headers)

val datasetBoundGenotypeRDD: GenotypeRDD = regions match {
case Some(x) => DatasetBoundGenotypeRDD(genotypes.dataset
.filter(referenceRegionsToDatasetQueryString(x)), genotypes.sequences, genotypes.samples, genotypes.headerLines)
case _ => DatasetBoundGenotypeRDD(genotypes.dataset, genotypes.sequences, genotypes.samples, genotypes.headerLines)
}
val datasetBoundGenotypeRDD =
if (regions.nonEmpty) {
DatasetBoundGenotypeRDD(genotypes.dataset.filter(referenceRegionsToDatasetQueryString(regions)),
genotypes.sequences,
genotypes.samples,
genotypes.headerLines)
} else {
DatasetBoundGenotypeRDD(genotypes.dataset, genotypes.sequences, genotypes.samples, genotypes.headerLines)
}
datasetBoundGenotypeRDD
}

Expand Down Expand Up @@ -2349,11 +2309,10 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
* @param pathName The path name to load alignment records from.
* Globs/directories are supported.
* @param regions Optional list of genomic regions to load.
* @param addChrPrefix Flag to add "chr" prefix to contigs
* @return Returns a VariantRDD
*/

def loadPartitionedParquetVariants(pathName: String, regions: Option[Iterable[ReferenceRegion]] = None): VariantRDD = {
def loadPartitionedParquetVariants(pathName: String, regions: Iterable[ReferenceRegion] = Iterable.empty): VariantRDD = {
require(checkPartitionedParquetFlag(pathName),
"Input Parquet files (%s) are not partitioned.".format(pathName))
val sd = loadAvroSequenceDictionary(pathName)
Expand All @@ -2363,11 +2322,14 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log

val variants = ParquetUnboundVariantRDD(sc, pathName, sd, headers)

val datasetBoundVariantRDD: VariantRDD = regions match {
case Some(x) => DatasetBoundVariantRDD(variants.dataset
.filter(referenceRegionsToDatasetQueryString(x)), variants.sequences, headers)
case _ => DatasetBoundVariantRDD(variants.dataset, variants.sequences, headers)
}
val datasetBoundVariantRDD =
if (regions.nonEmpty) {
DatasetBoundVariantRDD(variants.dataset.filter(referenceRegionsToDatasetQueryString(regions)),
variants.sequences,
headers)
} else {
DatasetBoundVariantRDD(variants.dataset, variants.sequences, headers)
}
datasetBoundVariantRDD
}

Expand Down Expand Up @@ -2693,24 +2655,22 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
* @param pathName The path name to load alignment records from.
* Globs/directories are supported.
* @param regions Optional list of genomic regions to load.
* @param addChrPrefix Flag to add "chr" prefix to contigs
* @return Returns a FeatureRDD.
*/

def loadPartitionedParquetFeatures(pathName: String, regions: Option[Iterable[ReferenceRegion]] = None): FeatureRDD = {
def loadPartitionedParquetFeatures(pathName: String, regions: Iterable[ReferenceRegion] = Iterable.empty): FeatureRDD = {
require(checkPartitionedParquetFlag(pathName),
"Input Parquet files (%s) are not partitioned.".format(pathName))
val sd = loadAvroSequenceDictionary(pathName)
val features = ParquetUnboundFeatureRDD(sc, pathName, sd)

val datasetBoundFeatureRDD: FeatureRDD = regions match {
case Some(x) => DatasetBoundFeatureRDD(features.dataset
.filter(referenceRegionsToDatasetQueryString(x)), features.sequences)
case _ => DatasetBoundFeatureRDD(features.dataset, features.sequences)
}

val datasetBoundFeatureRDD =
if (!regions.isEmpty) {
DatasetBoundFeatureRDD(features.dataset.filter(referenceRegionsToDatasetQueryString(regions)),
features.sequences)
} else {
DatasetBoundFeatureRDD(features.dataset, features.sequences)
}
datasetBoundFeatureRDD

}

/**
Expand Down Expand Up @@ -2751,21 +2711,23 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
* @param pathName The path name to load alignment records from.
* Globs/directories are supported.
* @param regions Optional list of genomic regions to load.
* @param addChrPrefix Flag to add "chr" prefix to contigs
* @return Returns a NucleotideContigFragmentRDD
*/
def loadPartitionedParquetFragments(pathName: String, regions: Option[Iterable[ReferenceRegion]] = None): NucleotideContigFragmentRDD = {
def loadPartitionedParquetFragments(pathName: String, regions: Iterable[ReferenceRegion] = Iterable.empty): NucleotideContigFragmentRDD = {
require(checkPartitionedParquetFlag(pathName),
"Input Parquet files (%s) are not partitioned.".format(pathName))
val sd = loadAvroSequenceDictionary(pathName)
val nucleotideContigFragments = ParquetUnboundNucleotideContigFragmentRDD(sc, pathName, sd)

val datasetBoundNucleotideContigFragmentRDD: NucleotideContigFragmentRDD = regions match {
case Some(x) => DatasetBoundNucleotideContigFragmentRDD(nucleotideContigFragments.dataset
.filter(referenceRegionsToDatasetQueryString(x)), nucleotideContigFragments.sequences)
case _ => DatasetBoundNucleotideContigFragmentRDD(nucleotideContigFragments.dataset, nucleotideContigFragments.sequences)
}

val datasetBoundNucleotideContigFragmentRDD =
if (regions.nonEmpty) {
DatasetBoundNucleotideContigFragmentRDD(nucleotideContigFragments
.dataset.filter(referenceRegionsToDatasetQueryString(regions)),
nucleotideContigFragments.sequences)
} else {
DatasetBoundNucleotideContigFragmentRDD(nucleotideContigFragments.dataset,
nucleotideContigFragments.sequences)
}
datasetBoundNucleotideContigFragmentRDD
}

Expand Down Expand Up @@ -3192,39 +3154,56 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
}

/**
* Test if Parquet files are partitioned
* Return true if the specified path of Parquet + Avro files is partitioned.
*
* @param filePath Path in which to look for partitioned flag.
* @return Return True if partitioned flag found, False otherwise.
* @return Return true if the specified path of Parquet + Avro files is partitioned.
* Behavior is undefined if some paths in glob are contain paritioned flag and some do not.
*/

def checkPartitionedParquetFlag(filePath: String): Boolean = {
val path = new Path(filePath, "_isPartitionedByStartPos")
val fs = path.getFileSystem(sc.hadoopConfiguration)
fs.exists(path)

val files: Array[Path] = getFsAndFilesWithFilter(filePath, new FileFilter("_isPartitionedByStartPos"))

// if getFsAndFilesWithFilter calls succeeds without an exception then _isPartitionedByStartPos was found
// so we return true
return true

}

/**
* Returns a query string used to filter a dataset based on an input list of ReferenceRegion
* Returns a query string used to filter a dataset based on zero or more ReferenceRegions
*
* @param regions list of regions to include in query
* @param partitionSize size of genomic range partitions used when writing parquet
* @param addChrPrefix add the prefix "chr" to the contig names - use if the original loaded data and sequence
* dictionary used "chr" prefix
* @return
* @param regions Zero or more regions to include in a query.
* @param partitionSize size of of partitions used when writing parquet, in base pairs. Defaults to 1000000
* @return Returns a query string used to filter a dataset based on zero or more ReferenceRegions
*/

def referenceRegionsToDatasetQueryString(regions: Iterable[ReferenceRegion], partitionSize: Int = 1000000): String = {

var regionQueryString = "(contigName=" + "\'" + regions.head.referenceName + "\' and posBin >= \'" +
scala.math.floor(regions.head.start / partitionSize).toInt + "\' and posBin < \'" + (scala.math.floor(regions.head.end / partitionSize).toInt + 1) + "\' and start >= " + regions.head.start + " and end <= " + regions.head.end + ")"
println("This is input regions: ")
regions.foreach(x => println(x))

val regionQueryString = regions.map(r => "(contigName=" + "\'" + r.referenceName + "\' and positionBin >= \'" +
scala.math.floor(r.start / partitionSize).toInt + "\' and positionBin < \'" +
(scala.math.floor(r.end / partitionSize).toInt + 1) +
"\' and start >= " + r.start + " and end <= " + r.end + ")")
.mkString(" or ")

var regionQueryString_old = "(contigName=" + "\'" + regions.head.referenceName + "\' and positionBin >= \'" +
scala.math.floor(regions.head.start / partitionSize).toInt + "\' and positionBin < \'" + (scala.math.floor(regions.head.end / partitionSize).toInt + 1) + "\' and start >= " + regions.head.start + " and end <= " + regions.head.end + ")"
if (regions.size > 1) {
regions.foreach((i) => {
regionQueryString = regionQueryString + " or " + "(contigName=" + "\'" +
i.referenceName + "\' and posBin >= \'" + scala.math.floor(i.start / partitionSize).toInt + "\' and posBin < \'" + (scala.math.floor(i.end / partitionSize).toInt + 1) + "\' and start >= " + i.start + " and end <= " + i.end + ")"
regionQueryString_old = regionQueryString_old + " or " + "(contigName=" + "\'" +
i.referenceName + "\' and positionBin >= \'" + scala.math.floor(i.start / partitionSize).toInt + "\' and positionBin < \'" + (scala.math.floor(i.end / partitionSize).toInt + 1) + "\' and start >= " + i.start + " and end <= " + i.end + ")"
})
}

println("new region QueryStirng: " + regionQueryString)
println("old region Query Stirng: " + regionQueryString_old)

regionQueryString
}

}
}
18 changes: 11 additions & 7 deletions adam-core/src/main/scala/org/bdgenomics/adam/rdd/GenomicRDD.scala
Expand Up @@ -2519,27 +2519,31 @@ abstract class AvroGenomicRDD[T <% IndexedRecord: Manifest, U <: Product, V <: A
saveAsParquet(new JavaSaveArgs(filePath))
}

/**
* Saves this RDD to disk in range binned partitioned Parquet + Avro format
*
* @param filePath Path to save the file at.
*/
def writePartitionedParquetFlag(filePath: String): Boolean = {
val path = new Path(filePath, "_isPartitionedByStartPos")
val fs = path.getFileSystem(toDF().sqlContext.sparkContext.hadoopConfiguration)
val fs = path.getFileSystem(rdd.context.hadoopConfiguration)
fs.createNewFile(path)
}

def saveAsPartitionedParquet(filePath: String,
compressCodec: CompressionCodecName = CompressionCodecName.GZIP,
partitionSize: Int = 1000000) {
private[rdd] def saveAsPartitionedParquet(filePath: String,
compressCodec: CompressionCodecName = CompressionCodecName.GZIP,
partitionSize: Int = 1000000) {
log.warn("Saving directly as Hive-partitioned Parquet from SQL. " +
"Options other than compression codec are ignored.")
val df = toDF()

df.withColumn("posBin", floor(df("start") / partitionSize))
df.withColumn("positionBin", floor(df("start") / partitionSize))
.write
.partitionBy("contigName", "posBin")
.partitionBy("contigName", "positionBin")
.format("parquet")
.option("spark.sql.parquet.compression.codec", compressCodec.toString.toLowerCase())
.save(filePath)
writePartitionedParquetFlag(filePath)
//rdd.context.writePartitionedParquetFlag(filePath)
saveMetadata(filePath)
}

Expand Down
Expand Up @@ -925,7 +925,7 @@ class FeatureRDDSuite extends ADAMFunSuite {
assert(rdd3.dataset.count === 4)
}

sparkTest("load paritioned parquet to sql, save, re-read from avro") {
sparkTest("load partitioned parquet to sql, save, re-read from avro") {
def testMetadata(fRdd: FeatureRDD) {
val sequenceRdd = fRdd.addSequence(SequenceRecord("aSequence", 1000L))
assert(sequenceRdd.sequences.containsRefName("aSequence"))
Expand Down

0 comments on commit 40d43b8

Please sign in to comment.