New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Hive-style partitioning #651

Closed
tomwhite opened this Issue Apr 15, 2015 · 24 comments

Comments

5 participants
@tomwhite
Member

tomwhite commented Apr 15, 2015

It's common to partition sequence data by locus. This change would make it possible to partition genotypes (alignments, etc) using a Hive-compatible directory structure like chr=M/pos=N (N is something like floor(position/10^K)).

Querying would then be more efficient since the SQL engine would only need to read files in the partitions of interest (typically one partition when doing a point query).

@fnothaft

This comment has been minimized.

Member

fnothaft commented Apr 15, 2015

Don't these partitioners satisfy the requirements?

@tomwhite

This comment has been minimized.

Member

tomwhite commented Apr 15, 2015

Possibly, although I think there's more work to write out data in a Hive-compatible directory structure. Or have you done that kind of thing already?

@fnothaft

This comment has been minimized.

Member

fnothaft commented Apr 15, 2015

Ah, no, haven't done that!

@tomwhite

This comment has been minimized.

Member

tomwhite commented Apr 15, 2015

OK, I'll take a look to see how to do that.

@tomwhite

This comment has been minimized.

Member

tomwhite commented Apr 16, 2015

Here's an initial go that uses Kite to do partitioning:

master...tomwhite:ADAM-651-hive-partitions

The idea is to add a partition command that takes a partition strategy file that defines which fields are used in constructing the partitions, and how the records map to partition values. For example, this partition strategy partitions by chromosome and bucketed position.

This is not quite ready yet, as it depends on a snapshot version of Kite for https://issues.cloudera.org/browse/CDK-986 and https://issues.cloudera.org/browse/CDK-988.

@fnothaft

This comment has been minimized.

Member

fnothaft commented Apr 16, 2015

Nice; looks good! I hadn't seen Kite before; I'll need to take more of a look.

@laserson

This comment has been minimized.

Contributor

laserson commented Apr 16, 2015

As discussed, let's add command line options that specify a few different baked in partition schemes, such as locus-partitioned or sample-partitioned.

@tomwhite

This comment has been minimized.

Member

tomwhite commented Apr 21, 2015

I updated my branch to use a partitioner to ensure that each reducer only writes to a small number of Parquet files. This makes sure that they reducers don't run out of memory.

When I tried this on a cluster against the 1000 genomes chr22 file (and a partition range size of 10^6) I got FetchFailedExceptions, as described here: https://issues.apache.org/jira/browse/SPARK-5928. I tried running with ADAM_OPTS='--conf spark.io.compression.codec=lz4' as this was reported to have helped, but the job still failed in the same way. Does anyone have any suggestions for how to get around this problem?

@tomwhite

This comment has been minimized.

Member

tomwhite commented Apr 27, 2015

The problem here turns out to be #599: when data is read from Parquet it blows up in the shuffle to such an extent that shuffle blocks exceed 2GB even for modest sized input splits. For example, one 7.6MB input file resulted in roughly 6.8GB of shuffle data, a 1000x increase in size.

Matt is working on a fix for #599, which will solve this problem.

@fnothaft

This comment has been minimized.

Member

fnothaft commented Apr 27, 2015

@tomwhite was that on genotype data?

@tomwhite

This comment has been minimized.

Member

tomwhite commented Apr 27, 2015

@fnothaft yes

@fnothaft

This comment has been minimized.

Member

fnothaft commented Apr 27, 2015

Out of curiosity, do you know what your shuffle size/performance is if you use GZIP compression for shuffle (e.g., https://github.com/bigdatagenomics/utils/blob/master/utils-serialization/src/main/scala/org/bdgenomics/utils/serialization/compression/GzipCompressionCodec.scala).

@tomwhite

This comment has been minimized.

Member

tomwhite commented Apr 27, 2015

Good suggestion, I didn't realize there was a gzip codec for Spark. With gzip, the shuffle data is 478MB, so 60x, but it's 3x slower. I need to try this out on a cluster to see if the fetch failure problem is avoided.

@tomwhite

This comment has been minimized.

Member

tomwhite commented Apr 28, 2015

I tried running on a cluster with the 1000 genomes chr22 file, but it didn't finish after >2 hours. So I think we need #599.

@fnothaft

This comment has been minimized.

Member

fnothaft commented Apr 28, 2015

Agreed; the gzip codec is just a stopgap until the solution to #599 is ready.

@heuermh

This comment has been minimized.

Member

heuermh commented Jul 21, 2016

@tomwhite We have a workaround implemented for #599, the Kite issues mentioned above were resolved in version 1.1.0, and your branch looks generic enough that it shouldn't run into too many problems on a rebase.

By eyeball I think the only necessary change would be
variant.contig.contigNamevariant.contigName

Might you have some time to update your branch and make a pull request?

@tomwhite

This comment has been minimized.

Member

tomwhite commented Jul 22, 2016

@heuermh I would actually favour the Spark dataframes/datasets route to doing partitioning, since it's better supported than Kite. Also, flattening is no longer necessary since the major Hadoop SQL engines support nested types now (Impala didn't when I wrote my branch).

BTW what's the workaround for #599?

@heuermh

This comment has been minimized.

Member

heuermh commented Jul 22, 2016

@tomwhite Thank you for the reply. It is my understanding that we're not going to push too hard in the direction of Spark 2.0 dataframes/datasets until after we get our version 1.0 out. Should we keep this issue open to revisit at that time?

what's the workaround for #599?

We write sequence dictionary, record group, and sample metadata out to disk in avro format separate from the parquet files, see merged pull requests #906 leading to #1051.

@fnothaft

This comment has been minimized.

Member

fnothaft commented Mar 3, 2017

This should be easy to do on top of #1216 or #1391.

@fnothaft fnothaft modified the milestones: 0.24.0, 0.23.0 Mar 3, 2017

@heuermh heuermh added this to Triage in Release 0.23.0 Mar 8, 2017

@fnothaft fnothaft added this to the 0.24.0 milestone May 12, 2017

@fnothaft fnothaft removed this from the 0.23.0 milestone May 12, 2017

@fnothaft fnothaft removed this from Triage in Release 0.23.0 May 12, 2017

@jpdna

This comment has been minimized.

Member

jpdna commented Jul 10, 2017

Playing with this at the shell, I find that adding bucketing dir hierarchy on parquet write where we write parquet from spark-sql like at:
https://github.com/fnothaft/adam/blob/issues/1018-dataset-api/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDD.scala#L193

such as:

rdd.dataset.toDF()
.withColumn("posBin",floor(x.dataset.col("start")/10000))
.write.partitionBy("contigName","posBin").format("parquet")
.option("spark.sql.parquet.compression.codec", "GZIP".toString.toLowerCase()).save("jptest33")

seems to produce the desired bucketed output by chr and then by 10 KB chunk, and interestingly still passes existing tests.
I'll experiment more and see if we can see a performance improvement when querying a small region by position. Note: Spark 2.1.- required

@jpdna

This comment has been minimized.

Member

jpdna commented Jul 10, 2017

So - it looks like if we add a posBin column to dataframe, which bins into 10,000 bp chunks with floor we are going to need to add a posBin field to Avro bdg-formats like to AlignmentRecord. This could be a price worth paying if there is a performance improvement, thoughts?

@fnothaft

This comment has been minimized.

Member

fnothaft commented Jul 10, 2017

Interesting! Let me think about it for a bit. An alternative would be to force a load through the SQL APIs and to drop the posBin column before converting to RDD form.

@jpdna

This comment has been minimized.

Member

jpdna commented Jul 10, 2017

First attempt can be seen here: fnothaft#17

@fnothaft

This comment has been minimized.

Member

fnothaft commented Jul 10, 2017

Nice! I will take a deeper look later.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment