Skip to content

Commit

Permalink
Merge aee7c32 into 9ea870c
Browse files Browse the repository at this point in the history
  • Loading branch information
fnothaft committed Mar 9, 2018
2 parents 9ea870c + aee7c32 commit 0367f44
Show file tree
Hide file tree
Showing 21 changed files with 549 additions and 18 deletions.
Expand Up @@ -131,6 +131,10 @@ class TransformAlignmentsArgs extends Args4jBase with ADAMSaveAnyArgs with Parqu
var storageLevel: String = "MEMORY_ONLY"
@Args4jOption(required = false, name = "-disable_pg", usage = "Disable writing a new @PG line.")
var disableProcessingStep = false
@Args4jOption(required = false, name = "-partition_by_start_pos", usage = "Save the data partitioned by genomic range bins based on start pos using Hive-style partitioning.")
var partitionByStartPos: Boolean = false
@Args4jOption(required = false, name = "-partition_bin_size", usage = "Partition bin size used in Hive-style partitioning. Defaults to 1Mbp (1,000,000) base pairs).")
var partitionedBinSize = 1000000

var command: String = null
}
Expand Down Expand Up @@ -562,8 +566,15 @@ class TransformAlignments(protected val args: TransformAlignmentsArgs) extends B
mergedSd
}

outputRdd.save(args,
isSorted = args.sortReads || args.sortLexicographically)
if (args.partitionByStartPos) {
if (outputRdd.sequences.isEmpty) {
log.warn("This dataset is not aligned and therefore will not benefit from being saved as a partitioned dataset")
}
outputRdd.saveAsPartitionedParquet(args.outputPath, partitionSize = args.partitionedBinSize)
} else {
outputRdd.save(args,
isSorted = args.sortReads || args.sortLexicographically)
}
}

private def createKnownSnpsTable(sc: SparkContext): SnpTable = {
Expand Down
Expand Up @@ -68,6 +68,12 @@ class TransformGenotypesArgs extends Args4jBase with ADAMSaveAnyArgs with Parque
@Args4jOption(required = false, name = "-stringency", usage = "Stringency level for various checks; can be SILENT, LENIENT, or STRICT. Defaults to STRICT.")
var stringency: String = "STRICT"

@Args4jOption(required = false, name = "-partition_by_start_pos", usage = "Save the data partitioned by genomic range bins based on start pos using Hive-style partitioning.")
var partitionByStartPos: Boolean = false

@Args4jOption(required = false, name = "-partition_bin_size", usage = "Partition bin size used in Hive-style partitioning. Defaults to 1Mbp (1,000,000) base pairs).")
var partitionedBinSize = 1000000

// must be defined due to ADAMSaveAnyArgs, but unused here
var sortFastqOutput: Boolean = false
}
Expand Down Expand Up @@ -135,7 +141,12 @@ class TransformGenotypes(val args: TransformGenotypesArgs)
if (args.outputPath.endsWith(".vcf")) {
maybeSort(maybeCoalesce(genotypes.toVariantContexts)).saveAsVcf(args)
} else {
maybeSort(maybeCoalesce(genotypes)).saveAsParquet(args)
if (args.partitionByStartPos) {
maybeSort(maybeCoalesce(genotypes)).saveAsPartitionedParquet(args.outputPath, partitionSize = args.partitionedBinSize)
} else {
maybeSort(maybeCoalesce(genotypes)).saveAsParquet(args)
}

}
}
}
185 changes: 185 additions & 0 deletions adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala
Expand Up @@ -1867,6 +1867,40 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
}
}

/**
* Load a path name with range binned partitioned Parquet + Avro format into an AlignmentRecordRDD.
*
* @note The sequence dictionary is read from an Avro file stored at
* pathName/_seqdict.avro and the record group dictionary is read from an
* Avro file stored at pathName/_rgdict.avro. These files are pure Avro,
* not Parquet + Avro.
*
* @param pathName The path name to load alignment records from.
* Globs/directories are supported.
* @param regions Optional list of genomic regions to load.
* @param optLookbackPartitions Number of partitions to lookback to find beginning of an overlapping
* region when using the filterByOverlappingRegions function on the returned dataset.
* Defaults to one partition.
* @return Returns an AlignmentRecordRDD.
*/
def loadPartitionedParquetAlignments(pathName: String,
regions: Iterable[ReferenceRegion] = Iterable.empty,
optLookbackPartitions: Option[Int] = Some(1)): AlignmentRecordRDD = {

val partitionBinSize = getPartitionBinSize(pathName)
val reads = loadParquetAlignments(pathName)
val alignmentsDatasetBound = DatasetBoundAlignmentRecordRDD(reads.dataset,
reads.sequences,
reads.recordGroups,
reads.processingSteps,
true,
Some(partitionBinSize),
optLookbackPartitions
)

if (regions.nonEmpty) alignmentsDatasetBound.filterByOverlappingRegions(regions) else alignmentsDatasetBound
}

/**
* Load unaligned alignment records from interleaved FASTQ into an AlignmentRecordRDD.
*
Expand Down Expand Up @@ -2250,6 +2284,35 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
}
}

/**
* Load a path name with range binned partitioned Parquet + Avro format into GenotypeRDD
*
* @param pathName The path name to load alignment records from.
* Globs/directories are supported.
* @param regions Optional list of genomic regions to load.
* @param optLookbackPartitions Number of partitions to lookback to find beginning of an overlapping
* region when using the filterByOverlappingRegions function on the returned dataset.
* Defaults to one partition.
* @return Returns a GenotypeRDD.
*/
def loadPartitionedParquetGenotypes(pathName: String,
regions: Iterable[ReferenceRegion] = Iterable.empty,
optLookbackPartitions: Option[Int] = Some(1)): GenotypeRDD = {

val partitionedBinSize = getPartitionBinSize(pathName)
val genotypes = loadParquetGenotypes(pathName)
val genotypesDatasetBound = DatasetBoundGenotypeRDD(genotypes.dataset,
genotypes.sequences,
genotypes.samples,
genotypes.headerLines,
true,
Some(partitionedBinSize),
optLookbackPartitions
)

if (regions.nonEmpty) genotypesDatasetBound.filterByOverlappingRegions(regions) else genotypesDatasetBound
}

/**
* Load a path name in Parquet + Avro format into a VariantRDD.
*
Expand Down Expand Up @@ -2283,6 +2346,34 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
}
}

/**
* Load a path name with range binned partitioned Parquet + Avro format into an VariantRDD.
*
* @param pathName The path name to load alignment records from.
* Globs/directories are supported.
* @param regions Optional list of genomic regions to load.
* @param optLookbackPartitions Number of partitions to lookback to find beginning of an overlapping
* region when using the filterByOverlappingRegions function on the returned dataset.
* Defaults to one partition.
* @return Returns a VariantRDD
*/
def loadPartitionedParquetVariants(pathName: String,
regions: Iterable[ReferenceRegion] = Iterable.empty,
optLookbackPartitions: Option[Int] = Some(1)): VariantRDD = {

val partitionedBinSize = getPartitionBinSize(pathName)
val variants = loadParquetVariants(pathName)
val variantsDatsetBound = DatasetBoundVariantRDD(variants.dataset,
variants.sequences,
variants.headerLines,
true,
Some(partitionedBinSize),
optLookbackPartitions
)

if (regions.nonEmpty) variantsDatsetBound.filterByOverlappingRegions(regions) else variantsDatsetBound
}

/**
* Load nucleotide contig fragments from FASTA into a NucleotideContigFragmentRDD.
*
Expand Down Expand Up @@ -2599,6 +2690,33 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
}
}

/**
* Load a path name with range binned partitioned Parquet + Avro format into a FeatureRDD.
*
* @param pathName The path name to load alignment records from.
* Globs/directories are supported.
* @param regions Optional list of genomic regions to load.
* @param optLookbackPartitions Number of partitions to lookback to find beginning of an overlapping
* region when using the filterByOverlappingRegions function on the returned dataset.
* Defaults to one partition.
* @return Returns a FeatureRDD.
*/
def loadPartitionedParquetFeatures(pathName: String,
regions: Iterable[ReferenceRegion] = Iterable.empty,
optLookbackPartitions: Option[Int] = Some(1)): FeatureRDD = {

val partitionedBinSize = getPartitionBinSize(pathName)
val features = loadParquetFeatures(pathName)
val featureDatasetBound = DatasetBoundFeatureRDD(features.dataset,
features.sequences,
true,
Some(partitionedBinSize),
optLookbackPartitions
)

if (regions.nonEmpty) featureDatasetBound.filterByOverlappingRegions(regions) else featureDatasetBound
}

/**
* Load a path name in Parquet + Avro format into a NucleotideContigFragmentRDD.
*
Expand Down Expand Up @@ -2631,6 +2749,33 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
}
}

/**
* Load a path name with range binned partitioned Parquet + Avro format into a NucleotideContigFragmentRDD.
*
* @param pathName The path name to load alignment records from.
* Globs/directories are supported.
* @param regions Optional list of genomic regions to load.
* @param optLookbackPartitions Number of partitions to lookback to find beginning of an overlapping
* region when using the filterByOverlappingRegions function on the returned dataset.
* Defaults to one partition.
* @return Returns a NucleotideContigFragmentRDD
*/
def loadPartitionedParquetContigFragments(pathName: String,
regions: Iterable[ReferenceRegion] = Iterable.empty,
optLookbackPartitions: Option[Int] = Some(1)): NucleotideContigFragmentRDD = {

val partitionedBinSize = getPartitionBinSize(pathName)
val contigs = loadParquetContigFragments(pathName)
val contigsDatasetBound = DatasetBoundNucleotideContigFragmentRDD(contigs.dataset,
contigs.sequences,
true,
Some(partitionedBinSize),
optLookbackPartitions
)

if (regions.nonEmpty) contigsDatasetBound.filterByOverlappingRegions(regions) else contigsDatasetBound
}

/**
* Load a path name in Parquet + Avro format into a FragmentRDD.
*
Expand Down Expand Up @@ -3052,4 +3197,44 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
loadParquetFragments(pathName, optPredicate = optPredicate, optProjection = optProjection)
}
}

/**
* Return length of partitions in base pairs if the specified path of Parquet + Avro files is partitioned.
*
* @param pathName Path in which to look for partitioned flag.
* @return Return length of partitions in base pairs if file is partitioned
*
* If a glob is used, all directories within the blog must be partitioned, and must have been saved
* using the same partitioned bin size. Behavior is undefined if this requirement is not met.
*/
private def getPartitionBinSize(pathName: String): Int = {

val partitionSizes = getFsAndFilesWithFilter(pathName, new FileFilter("_partitionedByStartPos")).map(f => {
val is = f.getFileSystem(sc.hadoopConfiguration).open(f)
val partitionSize = is.readInt
is.close()
partitionSize
}).toSet

require(partitionSizes.nonEmpty, "Input Parquet files (%s) are not partitioned.".format(pathName))
require(partitionSizes.size == 1, "Found multiple partition sizes (%s).".format(partitionSizes.mkString(", ")))
partitionSizes.head
}

/**
* Return true if the specified path of Parquet + Avro files is partitioned.
*
* @param pathName Path in which to look for partitioned flag.
* @return Return true if the specified path of Parquet + Avro files is partitioned.
* Behavior is undefined if some paths in glob contain _partitionedByStartPos flag file and some do not.
*/
def isPartitioned(pathName: String): Boolean = {
try {
getPartitionBinSize(pathName)
true
} catch {
case e: FileNotFoundException => false
}
}

}
Expand Up @@ -28,14 +28,15 @@ import htsjdk.variant.vcf.{
VCFHeaderLineType
}
import org.apache.avro.generic.IndexedRecord
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.{ FSDataOutputStream, FileSystem, Path }
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.apache.spark.SparkFiles
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.api.java.function.{ Function => JFunction, Function2 }
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{ DataFrame, Dataset, SQLContext }
import org.apache.spark.sql.functions._
import org.apache.spark.storage.StorageLevel
import org.bdgenomics.adam.instrumentation.Timers._
import org.bdgenomics.adam.models.{
Expand All @@ -62,6 +63,7 @@ import scala.collection.JavaConversions._
import scala.math.min
import scala.reflect.ClassTag
import scala.reflect.runtime.universe.TypeTag
import scala.util.Try

private[rdd] class JavaSaveArgs(var outputPath: String,
var blockSize: Int = 128 * 1024 * 1024,
Expand Down Expand Up @@ -2204,6 +2206,35 @@ trait DatasetBoundGenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]]
transformDataset(_.unpersist())
}

val isPartitioned: Boolean
val optPartitionBinSize: Option[Int]
val optLookbackPartitions: Option[Int]

private def referenceRegionsToDatasetQueryString(regions: Iterable[ReferenceRegion]): String = {

if (Try(dataset("positionBin")).isSuccess) {
regions.map(r =>
s"(contigName=\'${r.referenceName}\' and positionBin >= " +
s"\'${(scala.math.floor(r.start / optPartitionBinSize.get).toInt) - optLookbackPartitions.get}\'" +
s" and positionBin < \'${(scala.math.floor(r.end / optPartitionBinSize.get).toInt + 1)}\'" +
s" and (end > ${r.start} and start < ${r.end}))"
).mkString(" or ")
} else { // if no positionBin field is found then construct query without bin optimization
regions.map(r =>
s"(contigName=\'${r.referenceName} \' " +
s"and (end > ${r.start} and end < ${r.end}))")
.mkString(" or ")
}
}

override def filterByOverlappingRegions(querys: Iterable[ReferenceRegion]): V = {
if (isPartitioned) {
transformDataset((d: Dataset[U]) =>
d.filter(referenceRegionsToDatasetQueryString(querys)))
} else {
super[GenomicDataset].filterByOverlappingRegions(querys)
}
}
}

/**
Expand Down Expand Up @@ -2875,4 +2906,43 @@ abstract class AvroGenomicRDD[T <% IndexedRecord: Manifest, U <: Product, V <: A
def saveAsParquet(filePath: java.lang.String) {
saveAsParquet(new JavaSaveArgs(filePath))
}

/**
* Save partition size into the partitioned Parquet flag file.
*
* @param filePath Path to save the file at.
* @param partitionSize Partition bin size, in base pairs, used in Hive-style partitioning.
*
*/
private def writePartitionedParquetFlag(filePath: String, partitionSize: Int): Unit = {
val path = new Path(filePath, "_partitionedByStartPos")
val fs: FileSystem = path.getFileSystem(rdd.context.hadoopConfiguration)
val f = fs.create(path)
f.writeInt(partitionSize)
f.close()
}

/**
* Saves this RDD to disk in range binned partitioned Parquet + Avro format
*
* @param filePath Path to save the file at.
* @param compressCodec Name of the compression codec to use.
* @param partitionSize size of partitions used when writing parquet, in base pairs. Defaults to 1000000.
*/
def saveAsPartitionedParquet(filePath: String,
compressCodec: CompressionCodecName = CompressionCodecName.GZIP,
partitionSize: Int = 1000000) {
log.warn("Saving directly as Hive-partitioned Parquet from SQL. " +
"Options other than compression codec are ignored.")
val df = toDF()
df.withColumn("positionBin", floor(df("start") / partitionSize))
.write
.partitionBy("contigName", "positionBin")
.format("parquet")
.option("spark.sql.parquet.compression.codec", compressCodec.toString.toLowerCase())
.save(filePath)
writePartitionedParquetFlag(filePath, partitionSize)
saveMetadata(filePath)
}

}
Expand Up @@ -137,7 +137,10 @@ case class ParquetUnboundNucleotideContigFragmentRDD private[rdd] (

case class DatasetBoundNucleotideContigFragmentRDD private[rdd] (
dataset: Dataset[NucleotideContigFragmentProduct],
sequences: SequenceDictionary) extends NucleotideContigFragmentRDD
sequences: SequenceDictionary,
override val isPartitioned: Boolean = true,
override val optPartitionBinSize: Option[Int] = Some(1000000),
override val optLookbackPartitions: Option[Int] = Some(1)) extends NucleotideContigFragmentRDD
with DatasetBoundGenomicDataset[NucleotideContigFragment, NucleotideContigFragmentProduct, NucleotideContigFragmentRDD] {

lazy val rdd: RDD[NucleotideContigFragment] = dataset.rdd.map(_.toAvro)
Expand Down

0 comments on commit 0367f44

Please sign in to comment.