-
Notifications
You must be signed in to change notification settings - Fork 29k
[Spark-13374][Streaming] make it possible to create recoverable accumulator for streaming application #11249
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #51473 has finished for PR 11249 at commit
|
| * checkpoint | ||
| * @param name name is required as identity to find corresponding accumulator. | ||
| */ | ||
| def accumulator[T](initialValue: T, name: String)(implicit param: AccumulatorParam[T]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getOrCreateRecoverableAccumulator[T](createFunc: () => T, name: String)(...)? Will it also explicitly to tell developer that only accumulators created via this API can be recoverable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm... change func name might me a good idea to explicitly emphasize the recoverable feature. I will change it.
About the input parameter, I don't think createFunchere is necessary, initialValue should be enough.
|
Test build #51511 has finished for PR 11249 at commit
|
|
Test build #51746 has finished for PR 11249 at commit
|
|
|
||
| val newInitialValue: T = if (isCheckpointPresent) { | ||
| _cp.trackedAccs.find(_.name == name).map(_.value).getOrElse(initialValue).asInstanceOf[T] | ||
| } else initialValue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit:
if (...) {
...
} else {
...
}|
Test build #52033 has finished for PR 11249 at commit
|
|
@tdas could you help me review it? |
Currently accumulator is not recoverable from Checkpoint, So that if the streaming application is restarted or recovery from broken, the value in accumulator will be lost. I would like to create new accumulator interface in StreamingContext to make it possible to create recoverable accumulator for streaming application.
And I create an example to demonstrate how to use it:
examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableAccumulator.scala