Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SPARK-3461. Support external groupByKey using repartitionAndSortWithinPa... #3198

Closed
wants to merge 1 commit into from

Conversation

sryza
Copy link
Contributor

@sryza sryza commented Nov 11, 2014

...rtitions

This is a WIP. It still needs tests and probably a better name for the transformation, but I wanted validation on the approach first.

@SparkQA
Copy link

SparkQA commented Nov 11, 2014

Test build #23181 has started for PR 3198 at commit 891ef83.

  • This patch merges cleanly.

@davies
Copy link
Contributor

davies commented Nov 11, 2014

As we discussed in PR #1977, If we should keep the same semantics as groupByKey(), user should can access the results in any pattern as before (such as fetch the key and values, put them into a list, them return them). Also, the returned RDD should be cachable.

In order to do this, we should have a ExternalIterator (similar to ExternSorter), which will spill the values into disk if it's too large to hold in memory.

In meanwhile, we should minimize the overhead of this, it should have similar performance as before (for small/medium dataset).

If we just add this as a new API, then maybe we still need to fix the groupByKey() in to future, to support hot key in it (or support join on hot key).

@davies
Copy link
Contributor

davies commented Nov 11, 2014

@sryza Could you help to review #1977 ?

@SparkQA
Copy link

SparkQA commented Nov 11, 2014

Test build #23181 has finished for PR 3198 at commit 891ef83.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/23181/
Test PASSed.

@sryza
Copy link
Contributor Author

sryza commented Nov 11, 2014

Will take a look at #1977.

I believe that the most common uses for groupByKey, like writing out partitioned tables, involve iterating over each group a single time and without backtracking to previous groups. For this access pattern, we can achieve better performance by avoiding the extra serialization/deserialization and trips to disk that an ExternalIterator would require.

I understand that we can't change the semantics of groupByKey, and agree that adding spilling there could be useful. But if we can add a transformation that supports the common case without the extra I/O overhead, I think that's equally worthwhile.

@pwendell
Copy link
Contributor

Hey Sandy - so one open question here is how this would work if .cache() is called on the returned RDD. I sort of punted on that in the original JIRA but I think it might complicate the design.

@mateiz
Copy link
Contributor

mateiz commented Nov 11, 2014

Hey Sandy, unfortunately, I agree with Davies that this should support multiple accesses to the RDD in potentially different pattern. The problem is that your Iterable is not actually an Iterable, it can only be iterated once. Here are examples of where it will break:

  • Caching -- if you cache this RDD, you'll keep these once-only-iterable objects in the cache, which will then show no data the next time you read them
  • Even if the RDD itself is not cached, anything produced by a map(), filter(), etc on it might be, leading to the same problem
  • Joins -- if you join() this RDD with another one, you might see the same value in multiple key-value pairs even without doing caching

We should design a solution for these that allows the iterables to be reused multiple times. It's annoying that it would have to spill to disk but it's better than giving these semantics. Users are already super-confused because hadoopFile reuses Writable objects.

@mateiz
Copy link
Contributor

mateiz commented Nov 11, 2014

Another thing to consider is whether the Iterables returned should be Serializable; obviously that will fail if they're too big, but maybe for small ones. Pretty much all the other types we return are serializable, meaning you can chain together any Spark operations on serializable data, so it would be weird to get something here that you can't shuffle or collect after.

@sryza
Copy link
Contributor Author

sryza commented Nov 11, 2014

All good points. Will close this for now.

Longer term, it worries me that Spark wouldn't be able to provide an operator that gives comparable performance to what other ETL-focused frameworks like MR or Tez can do. The hit is not just extra I/O, but memory and GC pressure from keeping a large group in memory.

It seems like there must be a general solution to this and the hadoopFile problem that would allow us to unroll single-sequential-access collections before they're serialized or cached.

@sryza sryza closed this Nov 11, 2014
@davies
Copy link
Contributor

davies commented Nov 11, 2014

I think the approach in PR #1977 already addressed all of access patterns without introducing serialization overhead until the dataset can not be hold in memory. So I want to merge that PR first then so similar things in Scala.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
6 participants