add hive style partitioning for contigName #1620
Conversation
Test FAILed. Build result: FAILURE[...truncated 15 lines...] > /home/jenkins/git2/bin/git fetch --tags --progress https://github.com/bigdatagenomics/adam.git +refs/pull/:refs/remotes/origin/pr/ # timeout=15 > /home/jenkins/git2/bin/git rev-parse origin/pr/1620/merge^{commit} # timeout=10 > /home/jenkins/git2/bin/git branch -a -v --no-abbrev --contains 95dbf5a # timeout=10Checking out Revision 95dbf5a (origin/pr/1620/merge) > /home/jenkins/git2/bin/git config core.sparsecheckout # timeout=10 > /home/jenkins/git2/bin/git checkout -f 95dbf5aef4bb06aa247d1dcb00b0fc9b47fd61e5First time build. Skipping changelog.Triggering ADAM-prb ? 2.3.0,2.10,1.6.1,centosTriggering ADAM-prb ? 2.3.0,2.10,2.1.0,centosTriggering ADAM-prb ? 2.6.0,2.10,2.1.0,centosTriggering ADAM-prb ? 2.6.0,2.11,1.6.1,centosTriggering ADAM-prb ? 2.6.0,2.11,2.1.0,centosTriggering ADAM-prb ? 2.3.0,2.11,2.1.0,centosTriggering ADAM-prb ? 2.6.0,2.10,1.6.1,centosTriggering ADAM-prb ? 2.3.0,2.11,1.6.1,centosADAM-prb ? 2.3.0,2.10,1.6.1,centos completed with result FAILUREADAM-prb ? 2.3.0,2.10,2.1.0,centos completed with result FAILUREADAM-prb ? 2.6.0,2.10,2.1.0,centos completed with result FAILUREADAM-prb ? 2.6.0,2.11,1.6.1,centos completed with result FAILUREADAM-prb ? 2.6.0,2.11,2.1.0,centos completed with result FAILUREADAM-prb ? 2.3.0,2.11,2.1.0,centos completed with result FAILUREADAM-prb ? 2.6.0,2.10,1.6.1,centos completed with result FAILUREADAM-prb ? 2.3.0,2.11,1.6.1,centos completed with result FAILURENotifying endpoint 'HTTP:https://webhooks.gitter.im/e/ac8bb6e9f53357bc8aa8'Test FAILed. |
Test FAILed. Build result: FAILURE[...truncated 15 lines...] > /home/jenkins/git2/bin/git fetch --tags --progress https://github.com/bigdatagenomics/adam.git +refs/pull/:refs/remotes/origin/pr/ # timeout=15 > /home/jenkins/git2/bin/git rev-parse origin/pr/1620/merge^{commit} # timeout=10 > /home/jenkins/git2/bin/git branch -a -v --no-abbrev --contains 71e9796 # timeout=10Checking out Revision 71e9796 (origin/pr/1620/merge) > /home/jenkins/git2/bin/git config core.sparsecheckout # timeout=10 > /home/jenkins/git2/bin/git checkout -f 71e97965e48af15edb6a231d108d04d9bbd208dcFirst time build. Skipping changelog.Triggering ADAM-prb ? 2.3.0,2.10,1.6.1,centosTriggering ADAM-prb ? 2.3.0,2.10,2.1.0,centosTriggering ADAM-prb ? 2.6.0,2.10,2.1.0,centosTriggering ADAM-prb ? 2.6.0,2.11,1.6.1,centosTriggering ADAM-prb ? 2.6.0,2.11,2.1.0,centosTriggering ADAM-prb ? 2.3.0,2.11,2.1.0,centosTriggering ADAM-prb ? 2.6.0,2.10,1.6.1,centosTriggering ADAM-prb ? 2.3.0,2.11,1.6.1,centosADAM-prb ? 2.3.0,2.10,1.6.1,centos completed with result FAILUREADAM-prb ? 2.3.0,2.10,2.1.0,centos completed with result FAILUREADAM-prb ? 2.6.0,2.10,2.1.0,centos completed with result FAILUREADAM-prb ? 2.6.0,2.11,1.6.1,centos completed with result FAILUREADAM-prb ? 2.6.0,2.11,2.1.0,centos completed with result FAILUREADAM-prb ? 2.3.0,2.11,2.1.0,centos completed with result FAILUREADAM-prb ? 2.6.0,2.10,1.6.1,centos completed with result FAILUREADAM-prb ? 2.3.0,2.11,1.6.1,centos completed with result FAILURENotifying endpoint 'HTTP:https://webhooks.gitter.im/e/ac8bb6e9f53357bc8aa8'Test FAILed. |
I've loaded a chr20 BAM file for a sample from 1000 genomes. Hive partition dataset output to parquet with contig/1MB bins writing and reading seems to be working fine. Each 1 megabase bin (1..60) under contig 20 has one parquet file in it. Example queries which are working below:
Next, I will do some performance testing to see if retrieval of various range sizes is faster or slower through the SQL path and the contigName/posBin columns compared to: @fnothaft - note. the problem with posBin column that I thought I'd encountered earlier appears to not be a problem after all, I think previously I was not actually using the dataset I had added that posBin column to - happily its seeming to just work now. |
Some preliminary results: Parsing the original BAM file and writing to paritioned parquet takes 2.5 minuets, while the existing non-paritioned parquet write takes 3.5 minutes, so surprising to me a bit of gain on write too. The improvement in region filtering time for Hive-style partitioned data I think can be attributed to eliminating the scan time to touch and reject partitions based on min max in the original parquet method. Increased number of executors relative to the number of partitions in the of input dataset should reduce difference between methods in the limit of where this scan becomes fully parallel. Single node or small clusters should see the most gains from the Hive partition strategy, though there seems to be little downside even on a larger cluster. I'm writing up in detail the test's so far, and adding more. @akmorrow13 - I look forward to seeing if this will be useful for retrieving region slices with lower latency for visualization in Mango. |
can someone point out to me in the jenkins output why my tests are failing? - and suggest fixes. This is successfully building with tests when I run |
@jpdna as an aside, I was thinking about the |
In that case, do we need to have a flag or otherwise explicitly detect when loading from Hive-Partitioned files? Right now when reading, the partitioning is just auto-detected I believe. |
Yeah, my thought is that we'd either touch some file (e.g., |
Test FAILed. |
I reverted pom back to scala 2_10 Spark 1.x as @fnothaft suggested, but still all fails in Jenkins. Suggestions? |
Jenkins, retest this please. |
@jpdna it was a jenkins glitch, I've just kicked off another test. |
Test PASSed. |
I've implemented this |
Test PASSed. |
Test PASSed. |
Ready for initial review by others. I added a parameter to
This is nice because in absence of explicitly enabling hivePartitioning here, behavior should not be altered. As discussed above, previous commit includes a todo:
|
Thanks @jpdna! This looks like a great start! I dropped a few nits inline; the File -> Hadoop Path/FileSystem change will need to be propagated across many files. One thing I was noticing is that I'm not sure that the correct behavior happens if you save an |
@@ -1777,8 +1777,15 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log | |||
|
|||
(optPredicate, optProjection) match { | |||
case (None, None) => { | |||
ParquetUnboundAlignmentRecordRDD(sc, pathName, sd, rgd, pgs) | |||
val hiveFlagFile = new File(pathName, "_hivePartitioned") |
fnothaft
Aug 8, 2017
Member
This will only work for a file that is stored on a "local" (read: mounted POSIX compatible) file system. I'd suggest factoring this out into a function:
private def checkHiveFlag(fileName: String): Boolean = {
val hivePath = new Path(pathName, "_hivePartitioned")
val fs = hivePath.getFileSystem(sc.hadoopConfiguration)
fs.exists(hivePath)
}
This will only work for a file that is stored on a "local" (read: mounted POSIX compatible) file system. I'd suggest factoring this out into a function:
private def checkHiveFlag(fileName: String): Boolean = {
val hivePath = new Path(pathName, "_hivePartitioned")
val fs = hivePath.getFileSystem(sc.hadoopConfiguration)
fs.exists(hivePath)
}
disableDictionaryEncoding: Boolean = false) { | ||
disableDictionaryEncoding: Boolean = false, | ||
enableHivePartitioning: Boolean = false, | ||
hivePartitioningGenomicBinSize: Int = 1000000) { |
fnothaft
Aug 8, 2017
Member
I would roll enableHivePartitioning
and hivePartitioningGenomicBinSize
into one parameter optHivePartitioningBinSize
I would roll enableHivePartitioning
and hivePartitioningGenomicBinSize
into one parameter optHivePartitioningBinSize
.option("spark.sql.parquet.compression.codec", compressCodec.toString.toLowerCase()) | ||
.save(filePath) | ||
|
||
val hiveFlagFile = new File(filePath, "_hivePartitioned") |
fnothaft
Aug 8, 2017
Member
These will also need to be rewritten to use either the Hadoop FileSystem API or the java.nio libraries.
These will also need to be rewritten to use either the Hadoop FileSystem API or the java.nio libraries.
@fnothaft can you elaborate a bit here, are you suggesting that instead of calling |
As a first high level review, I think all references to "Hive" should be dropped. Then I'm in agreement with @fnothaft in that boolean flags make babies cry. I would prefer methods Should the marker file be called I assume any new command line options will conflict with Finally, is it possible to implement this as an external Maven module? Feels invasive to have to modify/extend all of the |
As best as I can tell - because I cut and pasted it anyhow, the code in *RDD classes implementing the over-ridden function saveAsParquet is the same in every *RDD class of ours, and I am wondering if the implementation can be factored out into the base class or elsewhere to be more DRY. If so maybe it won't be so invasive. I am not sure how to go about making this functionality a separate module - and if it works well I'd like to think partitioning parquet like this may become the standard mode when saving from a Dataset, as so far I don't see much of a performance penalty - I'll proceed with the changes suggested above still within the current classes unless others suggest another way forward. |
-1. This is very reasonable functionality to have in the GenomicRDDs. Just to clarify in case there's a misunderstanding, this functionality isn't only usable with Hive, the "Hive" nomenclature comes from the fact that Hive was the tool that introduced this type of partitioning scheme.
+1! |
Agreed, in the Spark docs it is referred to as, "[an] approach used in systems like Hive." Thus I don't see a need to mention Hive anywhere in field or method or file names.
Feels like there may be pathological cases where this falls down, say lots of unaligned reads, but perhaps it will be no worse than the default. |
update wrt last push:
todo:
|
Test FAILed. Build result: FAILURE[...truncated 15 lines...] > /home/jenkins/git2/bin/git fetch --tags --progress https://github.com/bigdatagenomics/adam.git +refs/pull/:refs/remotes/origin/pr/ # timeout=15 > /home/jenkins/git2/bin/git rev-parse origin/pr/1620/merge^{commit} # timeout=10 > /home/jenkins/git2/bin/git branch -a -v --no-abbrev --contains 98154a7 # timeout=10Checking out Revision 98154a7 (origin/pr/1620/merge) > /home/jenkins/git2/bin/git config core.sparsecheckout # timeout=10 > /home/jenkins/git2/bin/git checkout -f 98154a7 > /home/jenkins/git2/bin/git rev-list fcecb6b803c52a88747c55f934d79553e7e8bc1e # timeout=10Triggering ADAM-prb ? 2.6.2,2.11,1.6.3,centosTriggering ADAM-prb ? 2.7.3,2.11,1.6.3,centosTriggering ADAM-prb ? 2.7.3,2.11,2.2.0,centosTriggering ADAM-prb ? 2.7.3,2.10,1.6.3,centosTriggering ADAM-prb ? 2.6.2,2.10,1.6.3,centosTriggering ADAM-prb ? 2.6.2,2.10,2.2.0,centosTriggering ADAM-prb ? 2.7.3,2.10,2.2.0,centosTriggering ADAM-prb ? 2.6.2,2.11,2.2.0,centosADAM-prb ? 2.6.2,2.11,1.6.3,centos completed with result FAILUREADAM-prb ? 2.7.3,2.11,1.6.3,centos completed with result FAILUREADAM-prb ? 2.7.3,2.11,2.2.0,centos completed with result FAILUREADAM-prb ? 2.7.3,2.10,1.6.3,centos completed with result FAILUREADAM-prb ? 2.6.2,2.10,1.6.3,centos completed with result FAILUREADAM-prb ? 2.6.2,2.10,2.2.0,centos completed with result FAILUREADAM-prb ? 2.7.3,2.10,2.2.0,centos completed with result FAILUREADAM-prb ? 2.6.2,2.11,2.2.0,centos completed with result FAILURENotifying endpoint 'HTTP:https://webhooks.gitter.im/e/ac8bb6e9f53357bc8aa8'Test FAILed. |
Test FAILed. Build result: FAILURE[...truncated 15 lines...] > /home/jenkins/git2/bin/git fetch --tags --progress https://github.com/bigdatagenomics/adam.git +refs/pull/:refs/remotes/origin/pr/ # timeout=15 > /home/jenkins/git2/bin/git rev-parse origin/pr/1620/merge^{commit} # timeout=10 > /home/jenkins/git2/bin/git branch -a -v --no-abbrev --contains 86e20a6 # timeout=10Checking out Revision 86e20a6 (origin/pr/1620/merge) > /home/jenkins/git2/bin/git config core.sparsecheckout # timeout=10 > /home/jenkins/git2/bin/git checkout -f 86e20a6 > /home/jenkins/git2/bin/git rev-list 98154a7 # timeout=10Triggering ADAM-prb ? 2.6.2,2.11,1.6.3,centosTriggering ADAM-prb ? 2.7.3,2.11,1.6.3,centosTriggering ADAM-prb ? 2.7.3,2.11,2.2.0,centosTriggering ADAM-prb ? 2.7.3,2.10,1.6.3,centosTriggering ADAM-prb ? 2.6.2,2.10,1.6.3,centosTriggering ADAM-prb ? 2.6.2,2.10,2.2.0,centosTriggering ADAM-prb ? 2.7.3,2.10,2.2.0,centosTriggering ADAM-prb ? 2.6.2,2.11,2.2.0,centosADAM-prb ? 2.6.2,2.11,1.6.3,centos completed with result FAILUREADAM-prb ? 2.7.3,2.11,1.6.3,centos completed with result FAILUREADAM-prb ? 2.7.3,2.11,2.2.0,centos completed with result SUCCESSADAM-prb ? 2.7.3,2.10,1.6.3,centos completed with result FAILUREADAM-prb ? 2.6.2,2.10,1.6.3,centos completed with result FAILUREADAM-prb ? 2.6.2,2.10,2.2.0,centos completed with result SUCCESSADAM-prb ? 2.7.3,2.10,2.2.0,centos completed with result SUCCESSADAM-prb ? 2.6.2,2.11,2.2.0,centos completed with result SUCCESSNotifying endpoint 'HTTP:https://webhooks.gitter.im/e/ac8bb6e9f53357bc8aa8'Test FAILed. |
If you know a better way to setup a dataset api query using dataset columns than building up this string like here https://github.com/jpdna/adam/blob/217183e397aa97a67043cec8fe155dd3df2e19bf/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala#L1808 |
@@ -2105,6 +2139,21 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log | |||
} | |||
} | |||
|
|||
def loadPartitionedParquetGenotypes(pathName: String): GenotypeRDD = { |
akmorrow13
Oct 11, 2017
Contributor
Is there a reason there is no regions parameter here?
Is there a reason there is no regions parameter here?
@@ -2138,6 +2187,35 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log | |||
} | |||
} | |||
|
|||
def loadPartitionedParquetVariants(pathName: String, regions: Option[Iterable[ReferenceRegion]] = None, partitionSize: Int = 1000000): VariantRDD = { |
akmorrow13
Oct 11, 2017
Contributor
Is there any interest in combining this functionality into the existing load functions? It may be nice so the user does not have to know whether the files are partitioned when loading.
Is there any interest in combining this functionality into the existing load functions? It may be nice so the user does not have to know whether the files are partitioned when loading.
jpdna
Oct 16, 2017
Author
Member
I looked at that earlier, using a flag file to detect if the input was partitioned or not, but the trouble is that the existing load functions have a predicate and projection parameter that we don't right now have a way to directly translate into the something spark-sql/dataset can use.
I looked at that earlier, using a flag file to detect if the input was partitioned or not, but the trouble is that the existing load functions have a predicate and projection parameter that we don't right now have a way to directly translate into the something spark-sql/dataset can use.
fnothaft
Nov 9, 2017
Member
I think its fine if the predicate/projection parameters can't be translated over. In the main loadParquet*
functions, what I would do is:
- Check for the partitionedParquetFlag
- If the flag is set, check if predicate/projection are provided. If projection is provided, log a warning. If predicate is provided, throw an exception.
- If we didn't throw an exception, call
loadPartitionedParquet*
I think its fine if the predicate/projection parameters can't be translated over. In the main loadParquet*
functions, what I would do is:
- Check for the partitionedParquetFlag
- If the flag is set, check if predicate/projection are provided. If projection is provided, log a warning. If predicate is provided, throw an exception.
- If we didn't throw an exception, call
loadPartitionedParquet*
I've done some single node benchmarking using a version of Mango that relies on this PR and uses a partitioned alignment dataset. https://docs.google.com/presentation/d/1p6nA_vhydW2J2O7iFTJbsohbC_FAFEhpqfmzNtb8jRU/edit?usp=sharing |
Awesome @jpdna! Can you make a PR in Mango so we can track these numbers there? |
Test FAILed. Build result: FAILURE[...truncated 15 lines...] > /home/jenkins/git2/bin/git fetch --tags --progress https://github.com/bigdatagenomics/adam.git +refs/pull/:refs/remotes/origin/pr/ # timeout=15 > /home/jenkins/git2/bin/git rev-parse origin/pr/1620/merge^{commit} # timeout=10 > /home/jenkins/git2/bin/git branch -a -v --no-abbrev --contains 88d64fd # timeout=10Checking out Revision 88d64fd (origin/pr/1620/merge) > /home/jenkins/git2/bin/git config core.sparsecheckout # timeout=10 > /home/jenkins/git2/bin/git checkout -f 88d64fd > /home/jenkins/git2/bin/git rev-list 86e20a6 # timeout=10Triggering ADAM-prb ? 2.6.2,2.11,1.6.3,centosTriggering ADAM-prb ? 2.6.2,2.11,2.2.0,centosTriggering ADAM-prb ? 2.6.2,2.10,1.6.3,centosTriggering ADAM-prb ? 2.6.2,2.10,2.2.0,centosTriggering ADAM-prb ? 2.7.3,2.10,2.2.0,centosTriggering ADAM-prb ? 2.7.3,2.10,1.6.3,centosTriggering ADAM-prb ? 2.7.3,2.11,1.6.3,centosTriggering ADAM-prb ? 2.7.3,2.11,2.2.0,centosADAM-prb ? 2.6.2,2.11,1.6.3,centos completed with result FAILUREADAM-prb ? 2.6.2,2.11,2.2.0,centos completed with result SUCCESSADAM-prb ? 2.6.2,2.10,1.6.3,centos completed with result FAILUREADAM-prb ? 2.6.2,2.10,2.2.0,centos completed with result SUCCESSADAM-prb ? 2.7.3,2.10,2.2.0,centos completed with result SUCCESSADAM-prb ? 2.7.3,2.10,1.6.3,centos completed with result FAILUREADAM-prb ? 2.7.3,2.11,1.6.3,centos completed with result FAILUREADAM-prb ? 2.7.3,2.11,2.2.0,centos completed with result SUCCESSNotifying endpoint 'HTTP:https://webhooks.gitter.im/e/ac8bb6e9f53357bc8aa8'Test FAILed. |
A couple of misc comments. Looks good in general; I'm really excited about where this is going. Can you add Scaladoc to all new methods? |
@@ -18,15 +18,10 @@ | |||
package org.bdgenomics.adam.rdd | |||
|
|||
import java.io.{ File, FileNotFoundException, InputStream } | |||
|
fnothaft
Nov 9, 2017
Member
Can you remove this whitespace?
Can you remove this whitespace?
VCFHeaderLine, | ||
VCFInfoHeaderLine | ||
} | ||
import htsjdk.variant.vcf.{ VCFCompoundHeaderLine, VCFFormatHeaderLine, VCFHeader, VCFHeaderLine, VCFInfoHeaderLine } |
fnothaft
Nov 9, 2017
Member
Can you undo this? We break at 80 chars or 4 items.
Can you undo this? We break at 80 chars or 4 items.
RDDBoundNucleotideContigFragmentRDD | ||
} | ||
import org.bdgenomics.adam.projections.{ FeatureField, Projection } | ||
import org.bdgenomics.adam.rdd.contig.{ DatasetBoundNucleotideContigFragmentRDD, NucleotideContigFragmentRDD, ParquetUnboundNucleotideContigFragmentRDD, RDDBoundNucleotideContigFragmentRDD } |
fnothaft
Nov 9, 2017
Member
Can you break this line? We indent at 80 chars/4 imports.
Can you break this line? We indent at 80 chars/4 imports.
RDDBoundAlignmentRecordRDD | ||
} | ||
import org.bdgenomics.adam.rdd.fragment.{ DatasetBoundFragmentRDD, FragmentRDD, ParquetUnboundFragmentRDD, RDDBoundFragmentRDD } | ||
import org.bdgenomics.adam.rdd.read.{ AlignmentRecordRDD, DatasetBoundAlignmentRecordRDD, ParquetUnboundAlignmentRecordRDD, RDDBoundAlignmentRecordRDD, RepairPartitions } |
fnothaft
Nov 9, 2017
Member
Can you break this line? We indent at 80 chars/4 imports.
Can you break this line? We indent at 80 chars/4 imports.
NucleotideContigFragment => NucleotideContigFragmentProduct, | ||
Variant => VariantProduct | ||
} | ||
import org.bdgenomics.adam.sql.{ AlignmentRecord => AlignmentRecordProduct, Feature => FeatureProduct, Fragment => FragmentProduct, Genotype => GenotypeProduct, NucleotideContigFragment => NucleotideContigFragmentProduct, Variant => VariantProduct } |
fnothaft
Nov 9, 2017
Member
Can you break this line? We indent at 80 chars/4 imports.
Can you break this line? We indent at 80 chars/4 imports.
|
||
val genotypes = ParquetUnboundGenotypeRDD(sc, pathName, sd, samples, headers) | ||
|
||
val datasetBoundGenotypeRDD: GenotypeRDD = regions match { |
fnothaft
Nov 9, 2017
Member
Instead of having regions: Option[Iterable[ReferenceRegion]]
and running:
regions match {
case Some(x) => DatasetBoundGenotypeRDD(genotypes.dataset.filter(referenceRegionsToDatasetQueryString(x))
DatasetBoundGenotypeRDD(genotypes.dataset)
}
Change the type of regions
to Iterable[ReferenceRegion]
and then apply the region filters by running:
regions.foldLeft(genotypes)(p => p._1.transformDataset(_.filter(referenceRegionsToDatasetQueryString(p._2))))
Instead of having regions: Option[Iterable[ReferenceRegion]]
and running:
regions match {
case Some(x) => DatasetBoundGenotypeRDD(genotypes.dataset.filter(referenceRegionsToDatasetQueryString(x))
DatasetBoundGenotypeRDD(genotypes.dataset)
}
Change the type of regions
to Iterable[ReferenceRegion]
and then apply the region filters by running:
regions.foldLeft(genotypes)(p => p._1.transformDataset(_.filter(referenceRegionsToDatasetQueryString(p._2))))
jpdna
Nov 10, 2017
Author
Member
I think I understand you want to use the more elegant fold left over a potentially empty regions list, with default being the unfiltered dataset if regions
is empty.
However, I'm not sure that the code you suggest implements the OR logic that is intended for this region filter, as won't the consecutive applications of filter as the list is folded result in AND
logic?
I'm incline to leave the clunky regions match
code for now if we think it is correct, as it seems to work.
I think I understand you want to use the more elegant fold left over a potentially empty regions list, with default being the unfiltered dataset if regions
is empty.
However, I'm not sure that the code you suggest implements the OR logic that is intended for this region filter, as won't the consecutive applications of filter as the list is folded result in AND
logic?
I'm incline to leave the clunky regions match
code for now if we think it is correct, as it seems to work.
@@ -2904,4 +2939,29 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log | |||
loadParquetFragments(pathName, optPredicate = optPredicate, optProjection = optProjection) | |||
} | |||
} | |||
|
|||
def writePartitionedParquetFlag(filePath: String): Boolean = { |
fnothaft
Nov 9, 2017
Member
This should be private (and go in GenomicRDD.scala
, methinks?)
This should be private (and go in GenomicRDD.scala
, methinks?)
fs.createNewFile(path) | ||
} | ||
|
||
def checkPartitionedParquetFlag(filePath: String): Boolean = { |
fnothaft
Nov 9, 2017
Member
This should be private.
This should be private.
jpdna
Nov 14, 2017
Author
Member
I actually find a need to sue this from Mango library in order to determine if dataset is partitioned to see if it should use the partitioned read functions, since we needed to keep them separate. Perhaps a better option would be some wrapper function like maybeReadPartitioned
but for now I prefer to keep it public and give client code chance to explicitly check.
I actually find a need to sue this from Mango library in order to determine if dataset is partitioned to see if it should use the partitioned read functions, since we needed to keep them separate. Perhaps a better option would be some wrapper function like maybeReadPartitioned
but for now I prefer to keep it public and give client code chance to explicitly check.
fs.exists(path) | ||
} | ||
|
||
def referenceRegionsToDatasetQueryString(x: Iterable[ReferenceRegion], partitionSize: Int = 1000000): String = { |
fnothaft
Nov 9, 2017
Member
This should be private.
This should be private.
} | ||
|
||
def referenceRegionsToDatasetQueryString(x: Iterable[ReferenceRegion], partitionSize: Int = 1000000): String = { | ||
var regionQueryString = "(contigName=" + "\'" + x.head.referenceName.replaceAll("chr", "") + "\' and posBin >= \'" + |
fnothaft
Nov 9, 2017
Member
Why are we replacing "chr"s?
Why are we replacing "chr"s?
Test FAILed. Build result: FAILURE[...truncated 15 lines...] > /home/jenkins/git2/bin/git fetch --tags --progress https://github.com/bigdatagenomics/adam.git +refs/pull/:refs/remotes/origin/pr/ # timeout=15 > /home/jenkins/git2/bin/git rev-parse origin/pr/1620/merge^{commit} # timeout=10 > /home/jenkins/git2/bin/git branch -a -v --no-abbrev --contains 8fca515 # timeout=10Checking out Revision 8fca515 (origin/pr/1620/merge) > /home/jenkins/git2/bin/git config core.sparsecheckout # timeout=10 > /home/jenkins/git2/bin/git checkout -f 8fca515 > /home/jenkins/git2/bin/git rev-list 88d64fd # timeout=10Triggering ADAM-prb ? 2.6.2,2.11,1.6.3,centosTriggering ADAM-prb ? 2.7.3,2.10,1.6.3,centosTriggering ADAM-prb ? 2.7.3,2.10,2.2.0,centosTriggering ADAM-prb ? 2.7.3,2.11,1.6.3,centosTriggering ADAM-prb ? 2.6.2,2.10,2.2.0,centosTriggering ADAM-prb ? 2.6.2,2.10,1.6.3,centosTriggering ADAM-prb ? 2.6.2,2.11,2.2.0,centosTriggering ADAM-prb ? 2.7.3,2.11,2.2.0,centosADAM-prb ? 2.6.2,2.11,1.6.3,centos completed with result FAILUREADAM-prb ? 2.7.3,2.10,1.6.3,centos completed with result FAILUREADAM-prb ? 2.7.3,2.10,2.2.0,centos completed with result SUCCESSADAM-prb ? 2.7.3,2.11,1.6.3,centos completed with result FAILUREADAM-prb ? 2.6.2,2.10,2.2.0,centos completed with result SUCCESSADAM-prb ? 2.6.2,2.10,1.6.3,centos completed with result FAILUREADAM-prb ? 2.6.2,2.11,2.2.0,centos completed with result SUCCESSADAM-prb ? 2.7.3,2.11,2.2.0,centos completed with result SUCCESSNotifying endpoint 'HTTP:https://webhooks.gitter.im/e/ac8bb6e9f53357bc8aa8'Test FAILed. |
Test FAILed. Build result: FAILURE[...truncated 15 lines...] > /home/jenkins/git2/bin/git fetch --tags --progress https://github.com/bigdatagenomics/adam.git +refs/pull/:refs/remotes/origin/pr/ # timeout=15 > /home/jenkins/git2/bin/git rev-parse origin/pr/1620/merge^{commit} # timeout=10 > /home/jenkins/git2/bin/git branch -a -v --no-abbrev --contains df2c07b # timeout=10Checking out Revision df2c07b (origin/pr/1620/merge) > /home/jenkins/git2/bin/git config core.sparsecheckout # timeout=10 > /home/jenkins/git2/bin/git checkout -f df2c07b > /home/jenkins/git2/bin/git rev-list 8fca515 # timeout=10Triggering ADAM-prb ? 2.6.2,2.11,1.6.3,centosTriggering ADAM-prb ? 2.7.3,2.10,1.6.3,centosTriggering ADAM-prb ? 2.7.3,2.10,2.2.0,centosTriggering ADAM-prb ? 2.7.3,2.11,1.6.3,centosTriggering ADAM-prb ? 2.6.2,2.10,2.2.0,centosTriggering ADAM-prb ? 2.6.2,2.10,1.6.3,centosTriggering ADAM-prb ? 2.6.2,2.11,2.2.0,centosTriggering ADAM-prb ? 2.7.3,2.11,2.2.0,centosADAM-prb ? 2.6.2,2.11,1.6.3,centos completed with result FAILUREADAM-prb ? 2.7.3,2.10,1.6.3,centos completed with result FAILUREADAM-prb ? 2.7.3,2.10,2.2.0,centos completed with result FAILUREADAM-prb ? 2.7.3,2.11,1.6.3,centos completed with result FAILUREADAM-prb ? 2.6.2,2.10,2.2.0,centos completed with result FAILUREADAM-prb ? 2.6.2,2.10,1.6.3,centos completed with result FAILUREADAM-prb ? 2.6.2,2.11,2.2.0,centos completed with result FAILUREADAM-prb ? 2.7.3,2.11,2.2.0,centos completed with result FAILURENotifying endpoint 'HTTP:https://webhooks.gitter.im/e/ac8bb6e9f53357bc8aa8'Test FAILed. |
Test FAILed. Build result: FAILURE[...truncated 15 lines...] > /home/jenkins/git2/bin/git fetch --tags --progress https://github.com/bigdatagenomics/adam.git +refs/pull/:refs/remotes/origin/pr/ # timeout=15 > /home/jenkins/git2/bin/git rev-parse 41a271c^{commit} # timeout=10 > /home/jenkins/git2/bin/git branch -a -v --no-abbrev --contains 41a271c # timeout=10Checking out Revision 41a271c (origin/pr/1620/head) > /home/jenkins/git2/bin/git config core.sparsecheckout # timeout=10 > /home/jenkins/git2/bin/git checkout -f 41a271c9f805874032acde57fb2666482fca5ff0First time build. Skipping changelog.Triggering ADAM-prb ? 2.6.2,2.11,1.6.3,centosTriggering ADAM-prb ? 2.7.3,2.10,1.6.3,centosTriggering ADAM-prb ? 2.7.3,2.10,2.2.0,centosTriggering ADAM-prb ? 2.7.3,2.11,1.6.3,centosTriggering ADAM-prb ? 2.6.2,2.10,2.2.0,centosTriggering ADAM-prb ? 2.6.2,2.10,1.6.3,centosTriggering ADAM-prb ? 2.6.2,2.11,2.2.0,centosTriggering ADAM-prb ? 2.7.3,2.11,2.2.0,centosADAM-prb ? 2.6.2,2.11,1.6.3,centos completed with result FAILUREADAM-prb ? 2.7.3,2.10,1.6.3,centos completed with result FAILUREADAM-prb ? 2.7.3,2.10,2.2.0,centos completed with result SUCCESSADAM-prb ? 2.7.3,2.11,1.6.3,centos completed with result FAILUREADAM-prb ? 2.6.2,2.10,2.2.0,centos completed with result SUCCESSADAM-prb ? 2.6.2,2.10,1.6.3,centos completed with result FAILUREADAM-prb ? 2.6.2,2.11,2.2.0,centos completed with result SUCCESSADAM-prb ? 2.7.3,2.11,2.2.0,centos completed with result SUCCESSNotifying endpoint 'HTTP:https://webhooks.gitter.im/e/ac8bb6e9f53357bc8aa8'Test FAILed. |
Test FAILed. Build result: FAILURE[...truncated 15 lines...] > /home/jenkins/git2/bin/git fetch --tags --progress https://github.com/bigdatagenomics/adam.git +refs/pull/:refs/remotes/origin/pr/ # timeout=15 > /home/jenkins/git2/bin/git rev-parse 8f6c0be^{commit} # timeout=10 > /home/jenkins/git2/bin/git branch -a -v --no-abbrev --contains 8f6c0be # timeout=10Checking out Revision 8f6c0be (origin/pr/1620/head) > /home/jenkins/git2/bin/git config core.sparsecheckout # timeout=10 > /home/jenkins/git2/bin/git checkout -f 8f6c0be > /home/jenkins/git2/bin/git rev-list 41a271c # timeout=10Triggering ADAM-prb ? 2.6.2,2.11,1.6.3,centosTriggering ADAM-prb ? 2.7.3,2.10,1.6.3,centosTriggering ADAM-prb ? 2.7.3,2.10,2.2.0,centosTriggering ADAM-prb ? 2.7.3,2.11,1.6.3,centosTriggering ADAM-prb ? 2.6.2,2.10,2.2.0,centosTriggering ADAM-prb ? 2.6.2,2.10,1.6.3,centosTriggering ADAM-prb ? 2.6.2,2.11,2.2.0,centosTriggering ADAM-prb ? 2.7.3,2.11,2.2.0,centosADAM-prb ? 2.6.2,2.11,1.6.3,centos completed with result FAILUREADAM-prb ? 2.7.3,2.10,1.6.3,centos completed with result FAILUREADAM-prb ? 2.7.3,2.10,2.2.0,centos completed with result FAILUREADAM-prb ? 2.7.3,2.11,1.6.3,centos completed with result FAILUREADAM-prb ? 2.6.2,2.10,2.2.0,centos completed with result FAILUREADAM-prb ? 2.6.2,2.10,1.6.3,centos completed with result FAILUREADAM-prb ? 2.6.2,2.11,2.2.0,centos completed with result FAILUREADAM-prb ? 2.7.3,2.11,2.2.0,centos completed with result FAILURENotifying endpoint 'HTTP:https://webhooks.gitter.im/e/ac8bb6e9f53357bc8aa8'Test FAILed. |
This comment has been minimized.
This comment has been minimized.
heuermh
commented on adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala
in 41a271c
Nov 28, 2017
This is not correct, even for human (e.g., |
Test FAILed. Build result: FAILURE[...truncated 15 lines...] > /home/jenkins/git2/bin/git fetch --tags --progress https://github.com/bigdatagenomics/adam.git +refs/pull/:refs/remotes/origin/pr/ # timeout=15 > /home/jenkins/git2/bin/git rev-parse bea3dfb^{commit} # timeout=10 > /home/jenkins/git2/bin/git branch -a -v --no-abbrev --contains bea3dfb # timeout=10Checking out Revision bea3dfb (origin/pr/1620/head) > /home/jenkins/git2/bin/git config core.sparsecheckout # timeout=10 > /home/jenkins/git2/bin/git checkout -f bea3dfb > /home/jenkins/git2/bin/git rev-list 8f6c0be # timeout=10Triggering ADAM-prb ? 2.6.2,2.11,1.6.3,centosTriggering ADAM-prb ? 2.7.3,2.10,1.6.3,centosTriggering ADAM-prb ? 2.7.3,2.10,2.2.0,centosTriggering ADAM-prb ? 2.7.3,2.11,1.6.3,centosTriggering ADAM-prb ? 2.6.2,2.10,2.2.0,centosTriggering ADAM-prb ? 2.6.2,2.10,1.6.3,centosTriggering ADAM-prb ? 2.6.2,2.11,2.2.0,centosTriggering ADAM-prb ? 2.7.3,2.11,2.2.0,centosADAM-prb ? 2.6.2,2.11,1.6.3,centos completed with result FAILUREADAM-prb ? 2.7.3,2.10,1.6.3,centos completed with result FAILUREADAM-prb ? 2.7.3,2.10,2.2.0,centos completed with result FAILUREADAM-prb ? 2.7.3,2.11,1.6.3,centos completed with result FAILUREADAM-prb ? 2.6.2,2.10,2.2.0,centos completed with result FAILUREADAM-prb ? 2.6.2,2.10,1.6.3,centos completed with result FAILUREADAM-prb ? 2.6.2,2.11,2.2.0,centos completed with result FAILUREADAM-prb ? 2.7.3,2.11,2.2.0,centos completed with result FAILURENotifying endpoint 'HTTP:https://webhooks.gitter.im/e/ac8bb6e9f53357bc8aa8'Test FAILed. |
@@ -2476,4 +2478,29 @@ abstract class AvroGenomicRDD[T <% IndexedRecord: Manifest, U <: Product, V <: A | |||
def saveAsParquet(filePath: java.lang.String) { | |||
saveAsParquet(new JavaSaveArgs(filePath)) | |||
} | |||
|
|||
def writePartitionedParquetFlag(filePath: String): Boolean = { | |||
val path = new Path(filePath, "_isPartitionedByStartPos") |
akmorrow13
Dec 12, 2017
Contributor
would there be a way to write a _partitioned file with the fields it is partitioned by, instead of hard coding to start position?
would there be a way to write a _partitioned file with the fields it is partitioned by, instead of hard coding to start position?
Closing this PR in favor of: |
The relevant changes to review are in file:
AlignmentRecordRDD.scala
the other changes are caused by our scripts to change to Spark2/scala 2.11This PR implements Hive-style partitioning by
contigName
for AlignmentRecords when written to parquet through the Dataset/SparkSQL write path. As described at hereThe output directory when the dataset is saved to Parquet looks like the following:
where the Parquet files for each contigName are written inside the
contigName=N
directories.Note: in the future we will add another layer of hierarchy with 10 megabase bins within each chromosome as further subdirs
As per discussion in #651
such binning should allow a more efficient predicate pushdown of range queries than we may currently get from Parquet.
Below is described how to write and read back this data in the spark-shell,
and an error which occurs currently when reading back directly as an RDD is discussed.
Load data from SAM file, convert to Dataset, and save as Hive-partitioned parquet
Read back in from disk as a Dataset
Read back in directly as an RDD (this results incorrect contigName=null)
PROBLEM: Note that above when read as an RDD the contigName field is NULL. This is incorrect, the contigName=1 here.
We can get the correct
RDD[org.bdgenomics.formats.avro.AlignmentRecord]
though another way, by reading as a dataset and then converting to an RDD:Conclusion:
When using Hive style partitioning as implemented in this PR, the direct RDD read path
sc.loadAlignments("myParitionedSparkSQLParquetDir").rdd
will result in contigName erroneously being null because the direct RDD reading code is not handling the Hive-style partitioning correctly. My guess is that the RDD read-path just isn't fully partition-aware and/or when partitioned parquet files are written by Spark-SQL they may null out the info that is redundant in the partitioning.
The cost of having to read Hive-partitioned parquet data in as a Dataset but then convert to an RDD seems like it may be a reasonable price to pay if indeed we find enough other benefits to the Hive-style partitioning, but we need to then disallow the direct rdd read-path so that users don't get this error of contigName=null