[ADAM-651] Implement Hive-style partitioning by genomic range of Parquet backed datasets #1922
Conversation
I believe that all comments from #1911 have been addressed. @akmorrow13 @heuermh @fnothaft please review. |
1 similar comment
Test FAILed. Build result: FAILURE[...truncated 7 lines...] > /home/jenkins/git2/bin/git init /home/jenkins/workspace/ADAM-prb # timeout=10Fetching upstream changes from https://github.com/bigdatagenomics/adam.git > /home/jenkins/git2/bin/git --version # timeout=10 > /home/jenkins/git2/bin/git fetch --tags --progress https://github.com/bigdatagenomics/adam.git +refs/heads/:refs/remotes/origin/ # timeout=15 > /home/jenkins/git2/bin/git config remote.origin.url https://github.com/bigdatagenomics/adam.git # timeout=10 > /home/jenkins/git2/bin/git config --add remote.origin.fetch +refs/heads/:refs/remotes/origin/ # timeout=10 > /home/jenkins/git2/bin/git config remote.origin.url https://github.com/bigdatagenomics/adam.git # timeout=10Fetching upstream changes from https://github.com/bigdatagenomics/adam.git > /home/jenkins/git2/bin/git fetch --tags --progress https://github.com/bigdatagenomics/adam.git +refs/pull/:refs/remotes/origin/pr/ # timeout=15 > /home/jenkins/git2/bin/git rev-parse origin/pr/1922/merge^{commit} # timeout=10 > /home/jenkins/git2/bin/git branch -a -v --no-abbrev --contains a7c47ba # timeout=10Checking out Revision a7c47ba (origin/pr/1922/merge) > /home/jenkins/git2/bin/git config core.sparsecheckout # timeout=10 > /home/jenkins/git2/bin/git checkout -f a7c47ba458a354bc62d7b2a7de4e2b654c44b991First time build. Skipping changelog.Triggering ADAM-prb ? 2.6.2,2.10,2.2.1,centosTriggering ADAM-prb ? 2.6.2,2.11,2.2.1,centosTriggering ADAM-prb ? 2.7.3,2.10,2.2.1,centosTriggering ADAM-prb ? 2.7.3,2.11,2.2.1,centosADAM-prb ? 2.6.2,2.10,2.2.1,centos completed with result FAILUREADAM-prb ? 2.6.2,2.11,2.2.1,centos completed with result FAILUREADAM-prb ? 2.7.3,2.10,2.2.1,centos completed with result FAILUREADAM-prb ? 2.7.3,2.11,2.2.1,centos completed with result FAILURENotifying endpoint 'HTTP:https://webhooks.gitter.im/e/ac8bb6e9f53357bc8aa8'Test FAILed. |
I just double checked that e9e85a2 passes tests for me locally on on amp-bdg-master, I don't know why it is failing on Jenkins above. |
fixed typo updated scaladoc change var partitioning info in DatastBoundGenomicDataset to val and use override in constructor whitespace
Test PASSed. |
pinging for comment here |
Any action items for @jpdna here? |
* @param pathName The path name to load alignment records from. | ||
* Globs/directories are supported. | ||
* @param regions Optional list of genomic regions to load. | ||
* @param optQueryBinNumLookback Number of partitions to lookback to find beginning of an overlapping |
heuermh
Mar 5, 2018
Member
optQueryBinNumLookback
→ optLookbackPartitions
?
optQueryBinNumLookback
→ optLookbackPartitions
?
val partitionedBinSize = getPartitionedBinSize(pathName) | ||
val reads = loadParquetAlignments(pathName) | ||
|
||
val datasetBoundAlignmentRecordRDD = if (regions.nonEmpty) { |
heuermh
Mar 5, 2018
Member
Could this be rewritten as
val alignments = DatasetBoundAlignmentRecordRDD(reads.dataset,
reads.sequences,
reads.recordGroups,
reads.processingSteps,
true,
Some(partitionedBinSize),
optLookbackPartitions
)
if (regions.nonEmpty) alignments.filterByOverlappingRegions(regions) else alignments
Could this be rewritten as
val alignments = DatasetBoundAlignmentRecordRDD(reads.dataset,
reads.sequences,
reads.recordGroups,
reads.processingSteps,
true,
Some(partitionedBinSize),
optLookbackPartitions
)
if (regions.nonEmpty) alignments.filterByOverlappingRegions(regions) else alignments
*/ | ||
private def getPartitionedBinSize(pathName: String): Int = { | ||
|
||
val partitionSizes = getFsAndFilesWithFilter(pathName, new FileFilter("_isPartitionedByStartPos")).map(f => { |
heuermh
Mar 5, 2018
Member
_isPartitionedByStartPos
→ _partitionedByStartPos
_isPartitionedByStartPos
→ _partitionedByStartPos
* If a glob is used, all directories within the blog must be partitioned, and must have been saved | ||
* using the same partitioned bin size. Behavior is undefined if this requirement is not met. | ||
*/ | ||
private def getPartitionedBinSize(pathName: String): Int = { |
heuermh
Mar 5, 2018
Member
getPartitionedBinSize
→ getPartitionBinSize
getPartitionedBinSize
→ getPartitionBinSize
val partitionSizes = getFsAndFilesWithFilter(pathName, new FileFilter("_isPartitionedByStartPos")).map(f => { | ||
val is = f.getFileSystem(sc.hadoopConfiguration).open(f) | ||
val partitionSize = is.readInt | ||
is.close() |
heuermh
Mar 5, 2018
Member
Might want to try with resources this IO block. I hacked up TryWith.scala in another repo, and was thinking of bringing it over for #1719.
Might want to try with resources this IO block. I hacked up TryWith.scala in another repo, and was thinking of bringing it over for #1719.
getPartitionedBinSize(pathName) | ||
true | ||
} catch { | ||
case e: FileNotFoundException => false |
heuermh
Mar 5, 2018
Member
Try catch is probably not the most appropriate here, one could get clever with Success
/Failure
... I'm not that clever so I'll have to think a bit to come up with a good suggestion.
Try catch is probably not the most appropriate here, one could get clever with Success
/Failure
... I'm not that clever so I'll have to think a bit to come up with a good suggestion.
fnothaft
Mar 5, 2018
Member
I think this is OK for now, but agree that we should revisit it later.
I think this is OK for now, but agree that we should revisit it later.
* @param filePath Path to save the file at. | ||
*/ | ||
private def writePartitionedParquetFlag(filePath: String, partitionSize: Int): Unit = { | ||
val path = new Path(filePath, "_isPartitionedByStartPos") |
heuermh
Mar 5, 2018
Member
_isPartitionedByStartPos
→ _partitionedByStartPos
_isPartitionedByStartPos
→ _partitionedByStartPos
val fs: FileSystem = path.getFileSystem(rdd.context.hadoopConfiguration) | ||
val f = fs.create(path) | ||
f.writeInt(partitionSize) | ||
f.close() |
heuermh
Mar 5, 2018
Member
Might want to try with resources this IO block, see above.
Might want to try with resources this IO block, see above.
sequences: SequenceDictionary) extends NucleotideContigFragmentRDD | ||
sequences: SequenceDictionary, | ||
override val isPartitioned: Boolean = true, | ||
override val optPartitionedBinSize: Option[Int] = Some(1000000), |
heuermh
Mar 5, 2018
Member
optPartitionedBinSize
→ optPartitionBinSize
optPartitionedBinSize
→ optPartitionBinSize
sequences: SequenceDictionary, | ||
override val isPartitioned: Boolean = true, | ||
override val optPartitionedBinSize: Option[Int] = Some(1000000), | ||
override val optQueryLookbackNum: Option[Int] = Some(1)) extends NucleotideContigFragmentRDD |
heuermh
Mar 5, 2018
Member
optQueryLookbackNum
→ optLookbackPartitions
optQueryLookbackNum
→ optLookbackPartitions
@@ -131,6 +131,8 @@ class TransformAlignmentsArgs extends Args4jBase with ADAMSaveAnyArgs with Parqu | |||
var storageLevel: String = "MEMORY_ONLY" | |||
@Args4jOption(required = false, name = "-disable_pg", usage = "Disable writing a new @PG line.") | |||
var disableProcessingStep = false | |||
@Args4jOption(required = false, name = "-save_as_dataset", usage = "EXPERIMENTAL: Use the provided bin size in base pairs to save the data partitioned by genomic range bins using Hive-style partitioning.") |
heuermh
Mar 5, 2018
Member
How about two args, -partition_by_start_pos
Boolean flag and -partition_bin_size
Int with a default value?
How about two args, -partition_by_start_pos
Boolean flag and -partition_bin_size
Int with a default value?
Few small changes, but very close! |
@@ -131,6 +131,8 @@ class TransformAlignmentsArgs extends Args4jBase with ADAMSaveAnyArgs with Parqu | |||
var storageLevel: String = "MEMORY_ONLY" | |||
@Args4jOption(required = false, name = "-disable_pg", usage = "Disable writing a new @PG line.") | |||
var disableProcessingStep = false | |||
@Args4jOption(required = false, name = "-save_as_dataset", usage = "EXPERIMENTAL: Use the provided bin size in base pairs to save the data partitioned by genomic range bins using Hive-style partitioning.") |
maybeSort(maybeCoalesce(genotypes)).saveAsParquet(args) | ||
if (args.partitionedBinSize > 0) { | ||
if (genotypes.sequences.isEmpty) { | ||
log.warn("This dataset is not aligned and therefore will not benefit from being saved as a partitioned dataset") |
fnothaft
Mar 5, 2018
Member
Nit: Genotypes are aligned by definition, no? I don't have a strong preference as to what we do here, but this warning just reads a bit odd.
Nit: Genotypes are aligned by definition, no? I don't have a strong preference as to what we do here, but this warning just reads a bit odd.
* @param pathName The path name to load alignment records from. | ||
* Globs/directories are supported. | ||
* @param regions Optional list of genomic regions to load. | ||
* @param optQueryBinNumLookback Number of partitions to lookback to find beginning of an overlapping |
val partitionedBinSize = getPartitionedBinSize(pathName) | ||
val reads = loadParquetAlignments(pathName) | ||
|
||
val datasetBoundAlignmentRecordRDD = if (regions.nonEmpty) { |
@@ -3051,4 +3242,44 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log | |||
loadParquetFragments(pathName, optPredicate = optPredicate, optProjection = optProjection) | |||
} | |||
} | |||
|
|||
/** | |||
* Return integer size of partitions if the specified path of Parquet + Avro files is partitioned. |
fnothaft
Mar 5, 2018
Member
Suggest clarifying from:
Return integer size of partitions if the specified path of Parquet + Avro files is partitioned.
to:
Return length of partitions in base pairs if file is partitioned.
Otherwise, it is unclear as to wether this returns the number of partitions or the size of a single partition (and in the latter case, what the "size" means).
Suggest clarifying from:
Return integer size of partitions if the specified path of Parquet + Avro files is partitioned.
to:
Return length of partitions in base pairs if file is partitioned.
Otherwise, it is unclear as to wether this returns the number of partitions or the size of a single partition (and in the latter case, what the "size" means).
jpdna
Mar 6, 2018
Author
Member
Done
Done
val partitionSizes = getFsAndFilesWithFilter(pathName, new FileFilter("_isPartitionedByStartPos")).map(f => { | ||
val is = f.getFileSystem(sc.hadoopConfiguration).open(f) | ||
val partitionSize = is.readInt | ||
is.close() |
getPartitionedBinSize(pathName) | ||
true | ||
} catch { | ||
case e: FileNotFoundException => false |
fnothaft
Mar 5, 2018
Member
I think this is OK for now, but agree that we should revisit it later.
I think this is OK for now, but agree that we should revisit it later.
val fs: FileSystem = path.getFileSystem(rdd.context.hadoopConfiguration) | ||
val f = fs.create(path) | ||
f.writeInt(partitionSize) | ||
f.close() |
Test PASSed. |
@@ -131,6 +131,10 @@ class TransformAlignmentsArgs extends Args4jBase with ADAMSaveAnyArgs with Parqu | |||
var storageLevel: String = "MEMORY_ONLY" | |||
@Args4jOption(required = false, name = "-disable_pg", usage = "Disable writing a new @PG line.") | |||
var disableProcessingStep = false | |||
@Args4jOption(required = false, name = "-partition_by_start_pos", usage = "EXPERIMENTAL: Save the data partitioned by genomic range bins based on start pos using Hive-style partitioning.") |
@@ -74,7 +76,7 @@ private[rdd] object GenomicRDD { | |||
* @see pipe | |||
* | |||
* @param cmd Command to replace references in. | |||
* @param files List of paths to files. | |||
* @param files List of paths to files.f |
heuermh
Mar 6, 2018
Member
small typo
small typo
Test FAILed. Build result: ABORTED[...truncated 7 lines...] > /home/jenkins/git2/bin/git init /home/jenkins/workspace/ADAM-prb # timeout=10Fetching upstream changes from https://github.com/bigdatagenomics/adam.git > /home/jenkins/git2/bin/git --version # timeout=10 > /home/jenkins/git2/bin/git fetch --tags --progress https://github.com/bigdatagenomics/adam.git +refs/heads/:refs/remotes/origin/ # timeout=15 > /home/jenkins/git2/bin/git config remote.origin.url https://github.com/bigdatagenomics/adam.git # timeout=10 > /home/jenkins/git2/bin/git config --add remote.origin.fetch +refs/heads/:refs/remotes/origin/ # timeout=10 > /home/jenkins/git2/bin/git config remote.origin.url https://github.com/bigdatagenomics/adam.git # timeout=10Fetching upstream changes from https://github.com/bigdatagenomics/adam.git > /home/jenkins/git2/bin/git fetch --tags --progress https://github.com/bigdatagenomics/adam.git +refs/pull/:refs/remotes/origin/pr/ # timeout=15 > /home/jenkins/git2/bin/git rev-parse origin/pr/1922/merge^{commit} # timeout=10 > /home/jenkins/git2/bin/git branch -a -v --no-abbrev --contains 281c7f9 # timeout=10Checking out Revision 281c7f9 (origin/pr/1922/merge) > /home/jenkins/git2/bin/git config core.sparsecheckout # timeout=10 > /home/jenkins/git2/bin/git checkout -f 281c7f96a48edec25e520266772c200def947a09First time build. Skipping changelog.Triggering ADAM-prb ? 2.6.2,2.10,2.2.1,centosTriggering ADAM-prb ? 2.6.2,2.11,2.2.1,centosTriggering ADAM-prb ? 2.7.3,2.10,2.2.1,centosTriggering ADAM-prb ? 2.7.3,2.11,2.2.1,centosADAM-prb ? 2.6.2,2.10,2.2.1,centos completed with result SUCCESSADAM-prb ? 2.6.2,2.11,2.2.1,centos completed with result SUCCESSADAM-prb ? 2.7.3,2.10,2.2.1,centos completed with result SUCCESSADAM-prb ? 2.7.3,2.11,2.2.1,centos completed with result ABORTEDNotifying endpoint 'HTTP:https://webhooks.gitter.im/e/ac8bb6e9f53357bc8aa8'Test FAILed. |
Jenkins, retest this please. |
Test PASSed. |
Close in favor of #1948. |
Replaces #1911
Implements hive-style partitioning by genomic range of parquet backed datasets, improving latency of region filtering for genomic coordinate based data types.
Refactored such that
filterByOverlappingRegions
function has been pulled up to theDatasetBoundGenomicDataset
trait.