New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Investigate sorted join in dataset api #1223

Closed
jpdna opened this Issue Oct 24, 2016 · 21 comments

Comments

Projects
None yet
4 participants
@jpdna
Member

jpdna commented Oct 24, 2016

Related to #1216

It looks like dataframe/dataset api has, or will have, some built in support for sorted join operations:
https://issues.apache.org/jira/browse/SPARK-12394

Needs further investigation to see if this is useful for ADAM goals of persisting and joining pre-sorted data.

@jpdna

This comment has been minimized.

Show comment
Hide comment
@jpdna

jpdna Oct 24, 2016

Member

@devin-petersohn
This answer claims that sortedness will be preserved by "bucketing" in Spark 2.0 sparkSQL
https://forums.databricks.com/questions/7717/how-can-i-preserve-a-sorted-table.html

More on this:
https://issues.apache.org/jira/browse/SPARK-11512

This sort-merge-bucket join definitely seems to be a real thing at least in SparkSQL - I 'd sure like to see or make a simple demo of it in action....

Member

jpdna commented Oct 24, 2016

@devin-petersohn
This answer claims that sortedness will be preserved by "bucketing" in Spark 2.0 sparkSQL
https://forums.databricks.com/questions/7717/how-can-i-preserve-a-sorted-table.html

More on this:
https://issues.apache.org/jira/browse/SPARK-11512

This sort-merge-bucket join definitely seems to be a real thing at least in SparkSQL - I 'd sure like to see or make a simple demo of it in action....

@jpdna

This comment has been minimized.

Show comment
Hide comment
@jpdna

jpdna Oct 24, 2016

Member

I guess the trouble is we likely would like an interval-join, not a join on exact key matches.

Member

jpdna commented Oct 24, 2016

I guess the trouble is we likely would like an interval-join, not a join on exact key matches.

@devin-petersohn

This comment has been minimized.

Show comment
Hide comment
@devin-petersohn

devin-petersohn Oct 25, 2016

Member

The current build's ShuffleRegionJoin uses a bucketing method.

The new method #1216 I am working on is a bit different. Instead of using a bucketing, if we know that one of the RDDs is sorted, we just repartition the other one to match the regions on that partition once it's sorted. We move things around in sorted blocks rather than individually to maintain the sort post shuffle (and possibly improve runtime of the shuffle). We can make some optimizations on this model, but I think the core idea (maintaining sort through shuffle) should outperform something like this because we have state information about the RDD.

Member

devin-petersohn commented Oct 25, 2016

The current build's ShuffleRegionJoin uses a bucketing method.

The new method #1216 I am working on is a bit different. Instead of using a bucketing, if we know that one of the RDDs is sorted, we just repartition the other one to match the regions on that partition once it's sorted. We move things around in sorted blocks rather than individually to maintain the sort post shuffle (and possibly improve runtime of the shuffle). We can make some optimizations on this model, but I think the core idea (maintaining sort through shuffle) should outperform something like this because we have state information about the RDD.

@fnothaft

This comment has been minimized.

Show comment
Hide comment
@fnothaft

fnothaft Oct 25, 2016

Member

OOC, what do you mean by "sorted blocks rather than individually". Just wanted to check my understanding.

Member

fnothaft commented Oct 25, 2016

OOC, what do you mean by "sorted blocks rather than individually". Just wanted to check my understanding.

@devin-petersohn

This comment has been minimized.

Show comment
Hide comment
@devin-petersohn

devin-petersohn Oct 25, 2016

Member

Once we know where the data on a partition is going, we can easily split the partition into "blocks" or lists such that each block is still sorted and all going to the same destination. To merge the blocks on arrival, we simply order by the source partition number (there are actually a number of things we could sort on that would result in the same outcome), such that the sort is at most on the number of partitions.

The general idea here is that because we want a global sort, the first (or last, or some combination of the two) n-k tuples will be shuffled, where n is the number on a partition and k is the number of tuples that are staying on that node.

This will hopefully improve performance over sending tuples individually and sorting after the shuffle. It will likely be data dependent, but that is my next test.

Member

devin-petersohn commented Oct 25, 2016

Once we know where the data on a partition is going, we can easily split the partition into "blocks" or lists such that each block is still sorted and all going to the same destination. To merge the blocks on arrival, we simply order by the source partition number (there are actually a number of things we could sort on that would result in the same outcome), such that the sort is at most on the number of partitions.

The general idea here is that because we want a global sort, the first (or last, or some combination of the two) n-k tuples will be shuffled, where n is the number on a partition and k is the number of tuples that are staying on that node.

This will hopefully improve performance over sending tuples individually and sorting after the shuffle. It will likely be data dependent, but that is my next test.

@devin-petersohn

This comment has been minimized.

Show comment
Hide comment
@devin-petersohn

devin-petersohn Oct 26, 2016

Member

@fnothaft To further explain: the problem with repartitioning a sorted RDD is that there is no way of maintaining the sort if individual tuples are sent. Another sort must be done on each partition if that is the case. I have worked to reduce the sort cost (because a sort is required no matter what) such that the worst case sort is on the number of partitions. The worst case will only be met if you are repartitioning everything into a single node.

Member

devin-petersohn commented Oct 26, 2016

@fnothaft To further explain: the problem with repartitioning a sorted RDD is that there is no way of maintaining the sort if individual tuples are sent. Another sort must be done on each partition if that is the case. I have worked to reduce the sort cost (because a sort is required no matter what) such that the worst case sort is on the number of partitions. The worst case will only be met if you are repartitioning everything into a single node.

@fnothaft

This comment has been minimized.

Show comment
Hide comment
@fnothaft

fnothaft Oct 26, 2016

Member

Ah, thanks for explaining it further! That makes a fair bit of sense to me.

Member

fnothaft commented Oct 26, 2016

Ah, thanks for explaining it further! That makes a fair bit of sense to me.

@ryan-williams

This comment has been minimized.

Show comment
Hide comment
@ryan-williams

ryan-williams Oct 26, 2016

Member

I'm missing context here, but a few possibly-related things:

  • an order-preserving repartition of an RDD (whether it starts out sorted or not) is possible like this:

      rdd
        .zipWithIndex
        .map(_.swap)
        .sortByKey(numPartitions = n)
        .values

    cf. magic-rdds.

  • Guacamole uses the .repartitionAndSortWIthinPartitions API to send reads to designated partitions and have them end up sorted by start-coordinate.

  • If you have control over the upstream sortByKey, passing the later-desired numPartitions there could also be a good optimization.

Sorry if these are not relevant to the problem at hand here; I'd be curious to see an example of the expected inputs and outputs if so!

Member

ryan-williams commented Oct 26, 2016

I'm missing context here, but a few possibly-related things:

  • an order-preserving repartition of an RDD (whether it starts out sorted or not) is possible like this:

      rdd
        .zipWithIndex
        .map(_.swap)
        .sortByKey(numPartitions = n)
        .values

    cf. magic-rdds.

  • Guacamole uses the .repartitionAndSortWIthinPartitions API to send reads to designated partitions and have them end up sorted by start-coordinate.

  • If you have control over the upstream sortByKey, passing the later-desired numPartitions there could also be a good optimization.

Sorry if these are not relevant to the problem at hand here; I'd be curious to see an example of the expected inputs and outputs if so!

@devin-petersohn

This comment has been minimized.

Show comment
Hide comment
@devin-petersohn

devin-petersohn Oct 26, 2016

Member

@ryan-williams That is definitely relevant to the conversation.

The code you wrote that would repartition a sorted RDD would definitely work, however it would have to perform a sort on each partition's data once it all arrives because there is no guarantee that the data will arrive in the sorted order. My work is just an optimization that decreases the amount of sorting done once all the data has arrived.

I think I will need to create some kind of animation to illustrate what is happening. It is difficult to put into words.

Member

devin-petersohn commented Oct 26, 2016

@ryan-williams That is definitely relevant to the conversation.

The code you wrote that would repartition a sorted RDD would definitely work, however it would have to perform a sort on each partition's data once it all arrives because there is no guarantee that the data will arrive in the sorted order. My work is just an optimization that decreases the amount of sorting done once all the data has arrived.

I think I will need to create some kind of animation to illustrate what is happening. It is difficult to put into words.

@ryan-williams

This comment has been minimized.

Show comment
Hide comment
@ryan-williams

ryan-williams Oct 26, 2016

Member

Cool, I think I understand what you're proposing: on each partition P, group the elements that are going to each other partition Q (into an Array, say), and shuffle those Arrays, and concatenate them in order of their source partition P on the other side?

Or are you trying to do something else/further in terms of recognizing which data will not even end up on a different partition than they started on?

Member

ryan-williams commented Oct 26, 2016

Cool, I think I understand what you're proposing: on each partition P, group the elements that are going to each other partition Q (into an Array, say), and shuffle those Arrays, and concatenate them in order of their source partition P on the other side?

Or are you trying to do something else/further in terms of recognizing which data will not even end up on a different partition than they started on?

@devin-petersohn

This comment has been minimized.

Show comment
Hide comment
@devin-petersohn

devin-petersohn Oct 26, 2016

Member

Currently, the repartitioner doesn't do anything special with the data that isn't moving. I will have to dig deeper into Spark's repartitionBy method to see if there is any optimization that we can do on that front.

Member

devin-petersohn commented Oct 26, 2016

Currently, the repartitioner doesn't do anything special with the data that isn't moving. I will have to dig deeper into Spark's repartitionBy method to see if there is any optimization that we can do on that front.

@ryan-williams

This comment has been minimized.

Show comment
Hide comment
@ryan-williams

ryan-williams Oct 26, 2016

Member

Off-hand, I'm pretty bearish on the potential for optimizations based on a presumption that some data will stay on the same executor (or node, or partition index), having attempted to make some optimizations based on that in the past.

The short story is that Spark very much does not guarantee any relationship between the {node,executor} any two tasks run on; I think some wishful assumptions related to that on our part led to some of the staggering amount of headache on #676; specificailly, #676 (comment) feels relevant.

So, is my first paragraph above what you are imagining? Or is something more restricted to a "join" context in play here? Is there a small code sample of the naive/"before" case that you're trying to optimize that I could look at?

Member

ryan-williams commented Oct 26, 2016

Off-hand, I'm pretty bearish on the potential for optimizations based on a presumption that some data will stay on the same executor (or node, or partition index), having attempted to make some optimizations based on that in the past.

The short story is that Spark very much does not guarantee any relationship between the {node,executor} any two tasks run on; I think some wishful assumptions related to that on our part led to some of the staggering amount of headache on #676; specificailly, #676 (comment) feels relevant.

So, is my first paragraph above what you are imagining? Or is something more restricted to a "join" context in play here? Is there a small code sample of the naive/"before" case that you're trying to optimize that I could look at?

@devin-petersohn

This comment has been minimized.

Show comment
Hide comment
@devin-petersohn

devin-petersohn Oct 26, 2016

Member

This functionality wont be specific to joins, but I am doing work to optimize the joins. There are a lot of optimizations that we can do with the knowledge that RDDs are sorted (#1216). I just pushed my most recent work:

https://github.com/devin-petersohn/adam/blob/partitioner/adam-core/src/main/scala/org/bdgenomics/adam/rdd/SortedGenomicRDD.scala#L62

I tried to adequately document and comment things, so please let me know if something isn't clear.

Member

devin-petersohn commented Oct 26, 2016

This functionality wont be specific to joins, but I am doing work to optimize the joins. There are a lot of optimizations that we can do with the knowledge that RDDs are sorted (#1216). I just pushed my most recent work:

https://github.com/devin-petersohn/adam/blob/partitioner/adam-core/src/main/scala/org/bdgenomics/adam/rdd/SortedGenomicRDD.scala#L62

I tried to adequately document and comment things, so please let me know if something isn't clear.

@ryan-williams

This comment has been minimized.

Show comment
Hide comment
@ryan-williams

ryan-williams Oct 27, 2016

Member

Cool; if you want any comments on specific pieces of the code, let me know where to do that (e.g. on a PR you could open).

One comment, and I am happy to be proven wrong about this, is that I would think the evenlyRepartition function you pointed me to above would perform worse than the equivalent .map(_ -> null).sortByKey(partitions).values.

At a first glance it is triggering at least 5 Spark jobs (collect, zipWithIndex (=2), partitionBy, collect, with a couple .maps left over for the next stage), the first 4 of which will each require full passes over the data, and several of which will pull all the data in to memory at once.

Sorry if I am missing something but given some of our past Spark woes I have developed some heuristics that this runs afoul of, so I'm happy to discuss further if you like / if I'm not totally missing the mark here!

Member

ryan-williams commented Oct 27, 2016

Cool; if you want any comments on specific pieces of the code, let me know where to do that (e.g. on a PR you could open).

One comment, and I am happy to be proven wrong about this, is that I would think the evenlyRepartition function you pointed me to above would perform worse than the equivalent .map(_ -> null).sortByKey(partitions).values.

At a first glance it is triggering at least 5 Spark jobs (collect, zipWithIndex (=2), partitionBy, collect, with a couple .maps left over for the next stage), the first 4 of which will each require full passes over the data, and several of which will pull all the data in to memory at once.

Sorry if I am missing something but given some of our past Spark woes I have developed some heuristics that this runs afoul of, so I'm happy to discuss further if you like / if I'm not totally missing the mark here!

@devin-petersohn

This comment has been minimized.

Show comment
Hide comment
@devin-petersohn

devin-petersohn Oct 27, 2016

Member

It might be worse performing, that is one of the things I am testing. As it is now, it is definitely not perfect or optimal. There are a couple of things that I think could be better, but it have to work first.

There are other considerations we have also though, such as data skew. If this is an expensive up-front cost that saves a lot of downstream time, we can live with that too.

Member

devin-petersohn commented Oct 27, 2016

It might be worse performing, that is one of the things I am testing. As it is now, it is definitely not perfect or optimal. There are a couple of things that I think could be better, but it have to work first.

There are other considerations we have also though, such as data skew. If this is an expensive up-front cost that saves a lot of downstream time, we can live with that too.

@jpdna

This comment has been minimized.

Show comment
Hide comment
@jpdna

jpdna Oct 27, 2016

Member

May be tangential, but I remain a fan of ideas around both broadcast and
map side joins using map partition and data access to another source from
inside that map partition. Thus we do work parallelized by partition but
otherwise escape the main spark framework if appropriate once we have
what's needed in each partition
On Oct 26, 2016 8:02 PM, "Ryan Williams" notifications@github.com wrote:

Cool; if you want any comments on specific pieces of the code, let me know
where to do that (e.g. on a PR you could open).

One comment, and I am happy to be proven wrong about this, is that I would
think the evenlyRepartition function you pointed me to above would
perform worse than the equivalent .map(_ -> null).sortByKey(partitions).
values.

At a first glance it is triggering at least 5 Spark jobs (collect,
zipWithIndex (=2), partitionBy, collect, with a couple .maps left over
for the next stage), the first 4 of which will each require full passes
over the data, and several of which will pull all the data in to memory at
once.

Sorry if I am missing something but given some of our past Spark woes I
have developed some heuristics that this runs afoul of, so I'm happy to
discuss further if you like / if I'm not totally missing the mark here!


You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub
#1223 (comment),
or mute the thread
https://github.com/notifications/unsubscribe-auth/AHFFQKySXY5KQq6TiPQ-pHPX3imAb2C4ks5q3-oRgaJpZM4KfPCi
.

Member

jpdna commented Oct 27, 2016

May be tangential, but I remain a fan of ideas around both broadcast and
map side joins using map partition and data access to another source from
inside that map partition. Thus we do work parallelized by partition but
otherwise escape the main spark framework if appropriate once we have
what's needed in each partition
On Oct 26, 2016 8:02 PM, "Ryan Williams" notifications@github.com wrote:

Cool; if you want any comments on specific pieces of the code, let me know
where to do that (e.g. on a PR you could open).

One comment, and I am happy to be proven wrong about this, is that I would
think the evenlyRepartition function you pointed me to above would
perform worse than the equivalent .map(_ -> null).sortByKey(partitions).
values.

At a first glance it is triggering at least 5 Spark jobs (collect,
zipWithIndex (=2), partitionBy, collect, with a couple .maps left over
for the next stage), the first 4 of which will each require full passes
over the data, and several of which will pull all the data in to memory at
once.

Sorry if I am missing something but given some of our past Spark woes I
have developed some heuristics that this runs afoul of, so I'm happy to
discuss further if you like / if I'm not totally missing the mark here!


You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub
#1223 (comment),
or mute the thread
https://github.com/notifications/unsubscribe-auth/AHFFQKySXY5KQq6TiPQ-pHPX3imAb2C4ks5q3-oRgaJpZM4KfPCi
.

@fnothaft

This comment has been minimized.

Show comment
Hide comment
@fnothaft

fnothaft Oct 27, 2016

Member

Broadcast joins, yes, but I look at Broadcast joins as a way to eliminate shuffles when joining against a low cardinality table. If you're doing a map-side joins where you are doing data access outside of Spark inside a mapPartitions call, you're really fighting against both the Spark machinery and the RDD-based abstractions that we provide in the ADAM core. For context, I'll quote what I'd written recently in an email:

TINSTAAFL; if you move the load into the map, you still have to do a remote call/read if the data you are accessing is not on the node you’re running on. If Spark can schedule the logically co-partitioned tasks on the same node, it will. There’s a variety of reasons (data placement, being blocked during scheduling, etc) that Spark will not do that. You get the illusion of control if you move the load into the mapPartitions call, but actually, all you’re doing is fighting against Spark’s scheduler which knows where data is stored (for data in HDFS, that is).

Also, the performance costs at play here depend totally on what your network looks like, how evenly data is distributed in HDFS, how long your tasks are, etc. In https://arxiv.org/pdf/1507.03325.pdf / https://amplab.cs.berkeley.edu/wp-content/uploads/2016/08/Kira-19.pdf, we saw decreased end-to-end performance on astro workloads when task locality went from 92%—>99% because the delay scheduling penalty to get the additional 7% read locality was longer than the cost of doing a remote read from HDFS.

Member

fnothaft commented Oct 27, 2016

Broadcast joins, yes, but I look at Broadcast joins as a way to eliminate shuffles when joining against a low cardinality table. If you're doing a map-side joins where you are doing data access outside of Spark inside a mapPartitions call, you're really fighting against both the Spark machinery and the RDD-based abstractions that we provide in the ADAM core. For context, I'll quote what I'd written recently in an email:

TINSTAAFL; if you move the load into the map, you still have to do a remote call/read if the data you are accessing is not on the node you’re running on. If Spark can schedule the logically co-partitioned tasks on the same node, it will. There’s a variety of reasons (data placement, being blocked during scheduling, etc) that Spark will not do that. You get the illusion of control if you move the load into the mapPartitions call, but actually, all you’re doing is fighting against Spark’s scheduler which knows where data is stored (for data in HDFS, that is).

Also, the performance costs at play here depend totally on what your network looks like, how evenly data is distributed in HDFS, how long your tasks are, etc. In https://arxiv.org/pdf/1507.03325.pdf / https://amplab.cs.berkeley.edu/wp-content/uploads/2016/08/Kira-19.pdf, we saw decreased end-to-end performance on astro workloads when task locality went from 92%—>99% because the delay scheduling penalty to get the additional 7% read locality was longer than the cost of doing a remote read from HDFS.

@jpdna

This comment has been minimized.

Show comment
Hide comment
@jpdna

jpdna Oct 27, 2016

Member

Broadcast joins, yes, but I look at Broadcast joins as a way to eliminate shuffles when joining against a low cardinality table

Thanks for your feedback @fnothaft

Would you say the set of all exons in the genome would be such a low cardinality table you'd consider for broadcast? I guess the type of "join" I'm thinking of is any one that involves joining a 10-1000s+GB read read/genotype dataset to a 1-20 MB, or maybe even up to 500 MB genomic annotation dataset. This would account for 99% of all traditional genome annotation "track" data and the reference genome itself. My thinking here is very focused on the use case of joining to and filtering on genome features (genes, exons, transcription factor binding sites)

My main point is to make sure we consider broadcast ( and a more efficient one as discussed in #1224 ) when appropriate - which may be most genome feature annotation. If the data fits comfortably on my laptop, they can probably be broadcast....

If we can solve sorting/shuffle problem and ensure that such a join to a small RDD is cheap, then fine - otherwise broadcast then join type methods are a good way to control adding in this annotation data prior to a filter, while avoiding shuffles/sorts.

As for the map-side data access, I'd argue that the data locality doesn't matter much as the data being joined is small anyway and thus by definition will likely NOT be co-located with a large distributed genotype/read dataset. I'm not sure what other costs there are to this within-partition data access. I see the within-partition data access are more "targeted broadcast" (I guess an oxymoron) that is a minor optimization if you know what external data the partition needs exactly, but you don't want to do a RDD level join.

Member

jpdna commented Oct 27, 2016

Broadcast joins, yes, but I look at Broadcast joins as a way to eliminate shuffles when joining against a low cardinality table

Thanks for your feedback @fnothaft

Would you say the set of all exons in the genome would be such a low cardinality table you'd consider for broadcast? I guess the type of "join" I'm thinking of is any one that involves joining a 10-1000s+GB read read/genotype dataset to a 1-20 MB, or maybe even up to 500 MB genomic annotation dataset. This would account for 99% of all traditional genome annotation "track" data and the reference genome itself. My thinking here is very focused on the use case of joining to and filtering on genome features (genes, exons, transcription factor binding sites)

My main point is to make sure we consider broadcast ( and a more efficient one as discussed in #1224 ) when appropriate - which may be most genome feature annotation. If the data fits comfortably on my laptop, they can probably be broadcast....

If we can solve sorting/shuffle problem and ensure that such a join to a small RDD is cheap, then fine - otherwise broadcast then join type methods are a good way to control adding in this annotation data prior to a filter, while avoiding shuffles/sorts.

As for the map-side data access, I'd argue that the data locality doesn't matter much as the data being joined is small anyway and thus by definition will likely NOT be co-located with a large distributed genotype/read dataset. I'm not sure what other costs there are to this within-partition data access. I see the within-partition data access are more "targeted broadcast" (I guess an oxymoron) that is a minor optimization if you know what external data the partition needs exactly, but you don't want to do a RDD level join.

@fnothaft

This comment has been minimized.

Show comment
Hide comment
@fnothaft

fnothaft Oct 28, 2016

Member

Would you say the set of all exons in the genome would be such a low cardinality table you'd consider for broadcast? I guess the type of "join" I'm thinking of is any one that involves joining a 10-1000s+GB read read/genotype dataset to a 1-20 MB, or maybe even up to 500 MB genomic annotation dataset. This would account for 99% of all traditional genome annotation "track" data and the reference genome itself. My thinking here is very focused on the use case of joining to and filtering on genome features (genes, exons, transcription factor binding sites)

I consider broadcast joins appropriate on RDDs <10GB in size (i.e., the size of a dbSNP sites-only VCF). Obviously, this depends on the heap space on each machine, but we routinely broadcast an ~O(10GB) table for BQSR.

As for the map-side data access, I'd argue that the data locality doesn't matter much as the data being joined is small anyway and thus by definition will likely NOT be co-located with a large distributed genotype/read dataset. I'm not sure what other costs there are to this within-partition data access. I see the within-partition data access are more "targeted broadcast" (I guess an oxymoron) that is a minor optimization if you know what external data the partition needs exactly, but you don't want to do a RDD level join.

I guess I just don't really see any wins to this approach. If the data is so small that locality is irrelevant, then it's so small that the costs of doing a collect followed by a broadcast are trivial. If this is the case, then both approaches (map-side load vs. broadcast join) have equivalent performance, but the map-side load sacrifices the join abstraction and is less general (you need novel map-side load code for each data type/source you want to load from). I would also add that you aren't going to get partition level data locality that is necessary for:

that is a minor optimization if you know what external data the partition needs exactly

unless you do a sort first, and if you've already sorted your data, a sort-merge join is likely to beat the performance of a broadcast join anyways. Thoughts? Let me know if I'm missing something.

Member

fnothaft commented Oct 28, 2016

Would you say the set of all exons in the genome would be such a low cardinality table you'd consider for broadcast? I guess the type of "join" I'm thinking of is any one that involves joining a 10-1000s+GB read read/genotype dataset to a 1-20 MB, or maybe even up to 500 MB genomic annotation dataset. This would account for 99% of all traditional genome annotation "track" data and the reference genome itself. My thinking here is very focused on the use case of joining to and filtering on genome features (genes, exons, transcription factor binding sites)

I consider broadcast joins appropriate on RDDs <10GB in size (i.e., the size of a dbSNP sites-only VCF). Obviously, this depends on the heap space on each machine, but we routinely broadcast an ~O(10GB) table for BQSR.

As for the map-side data access, I'd argue that the data locality doesn't matter much as the data being joined is small anyway and thus by definition will likely NOT be co-located with a large distributed genotype/read dataset. I'm not sure what other costs there are to this within-partition data access. I see the within-partition data access are more "targeted broadcast" (I guess an oxymoron) that is a minor optimization if you know what external data the partition needs exactly, but you don't want to do a RDD level join.

I guess I just don't really see any wins to this approach. If the data is so small that locality is irrelevant, then it's so small that the costs of doing a collect followed by a broadcast are trivial. If this is the case, then both approaches (map-side load vs. broadcast join) have equivalent performance, but the map-side load sacrifices the join abstraction and is less general (you need novel map-side load code for each data type/source you want to load from). I would also add that you aren't going to get partition level data locality that is necessary for:

that is a minor optimization if you know what external data the partition needs exactly

unless you do a sort first, and if you've already sorted your data, a sort-merge join is likely to beat the performance of a broadcast join anyways. Thoughts? Let me know if I'm missing something.

@fnothaft

This comment has been minimized.

Show comment
Hide comment
@fnothaft

fnothaft Mar 3, 2017

Member

Realistically, I don't think we'd be able to make progress on this without involving the spark-sql team. Our approach requires a bit more logic beyond a plain sort-merge equijoin. I motion that we close this ticket for now in favor of #1216/#1324; any objections?

Member

fnothaft commented Mar 3, 2017

Realistically, I don't think we'd be able to make progress on this without involving the spark-sql team. Our approach requires a bit more logic beyond a plain sort-merge equijoin. I motion that we close this ticket for now in favor of #1216/#1324; any objections?

@fnothaft

This comment has been minimized.

Show comment
Hide comment
@fnothaft

fnothaft Mar 3, 2017

Member

Actually, I'm just going to close this for now. We can reopen at a later date, as needed.

Member

fnothaft commented Mar 3, 2017

Actually, I'm just going to close this for now. We can reopen at a later date, as needed.

@fnothaft fnothaft closed this Mar 3, 2017

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment