New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-30666][Core][WIP] Reliable single-stage accumulators #27377
Conversation
Can one of the admins verify this patch? |
Inviting some contributors of AccumulatorV2: @cloud-fan @rxin @HyukjinKwon @zsxwing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is a more specific solution to a more general problem, and I'm not sure it works in all cases.
AccumulatorMode.Max
doesn't seem to have an analogy for all Accumulator
types. I don't think it's appropriate for this API.
AccumulatorMode.Last
doesn't actually know what value is used, does it? It is making the assumption that the last task was the one used by the partition, but I'm not sure that's the case.
I think the core issue is that we send a CompletionEvent
for tasks, even if we don't end up recalculating that partition.
I can see the use case for ALL
when counting the total number of bytes written out, but for any sort of statistics based on the data it seems like LAST
is the only idea that makes sense. And to make that work properly I think we need a solution that isn't implemented specially for each Accumulator.
fragmentId.foreach(_fragments(_) = (maxSum, maxCount)) | ||
case AccumulatorMode.Last => | ||
val (fragmentSum, fragmentCount) = | ||
fragmentId |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When is the fragmentId
None?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If AccumulatorV2.merge
is only used in DAGScheduler.scala
, then fragmentId
can be an Int
rather than an Option[Int]
.
Is this mostly for speculative tasks where we can get duplicated accumulator updates for the same partition? |
@cloud-fan This is not restricted to speculative tasks, as there is a multitude of reasons for when we get multiple accumulator updates per partition. This should handle any of them (as long as the accumulator updates are deterministic). @databricks-david-lewis can you give an example of an (deterministic) accumulator type that does not work with |
@EnricoMi Let's say we are aggregating a column of longs, but the values are not all positive. In that case taking the "max" would not give the right value. |
@databricks-david-lewis you are right, in that case When an |
3001d82
to
6b89ecc
Compare
@EnricoMi Thank you for continuing to work on this! I appreciate all the time and thought you've put into it. I worry that your solution will lead to lots of duplicated work. Is there some way to move the The only exception I can think of is counting the total number of bytes read or written, which is unreliable anyway because certain failures mean that information never makes it back to the driver. |
- removed Option[] from merge fragmentId argument - introduced First AccumulatorMode - moved merge mode implementations into methods (Long and Double Accum)
- provdes traits that implement the modes - case classes calling into the traits
Implements first accumulator mode only. Provides reliable Long, Double and Collection accumulator implementations.
837efaa
to
575f104
Compare
@databricks-david-lewis I have stripped out the notion of |
I have had a quick chat with @holdenk and we found two use cases where this approach will not work:
I will look into these, so changing this to WIP. More feedback welcome. |
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. |
What changes were proposed in this pull request?
This PR introduces a reliable accumulator which only merges the first accumulator from each partition. Together with
AccumulatorMetadata.countFailedValues == false
, this makes accumulator values reliable in the presence of reprocessing partitions.The current implementation has no means to identify which partition a remote accumulator value is merged from. When partitions are executed multiple times (e.g. re-run on failure or cache eviction, rerunning actions, usage of a stage in multiple actions), accumulators currently consider all partitions' accumulator values.
The
ReliableAccumulator
keeps the first accumulator per partition only. For accumulators registered withcountFailedValues == false
, this provides the accumulator value equivalent to a single successful stage, which does not change any more.The reliable accumulator on the driver uses additional memory in the order of the number of partitions.
Why are the changes needed?
With the current behaviour, only a very limited class of accumulator use cases can be implemented, those that count across partition executions. Counting read and written bytes is a good example where this behaviour is desired. Counting the number of rows of the dataset, or rows that meet a certain condition cannot be implemented with this behaviour. The accumulator over-counts and thus only provides a pessimistic upper bound of the true value. With this PR, exact reliable numbers can be extracted.
Does this PR introduce any user-facing change?
Yes, it introduces the method
ReliableAccumulator.merge(AccumulatorV2, int)
that is called by DAGScheduler to provide the partition id.How was this patch tested?
Unit tests in the
ReliableAccumulatorSuite
.