Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[STREAMING] SPARK-4986 Wait for receivers to deregister and receiver job to terminate #3868

Closed
wants to merge 3 commits into from

Conversation

cleaton
Copy link

@cleaton cleaton commented Jan 1, 2015

Spark Streaming does not wait for all receivers to deregister and the receiver job to terminate before killing spark context which can cause data loss. This PR solves this by waiting for all receivers in ReceiverTracker stop function when using graceful shutdown.

This together with #3857 fixes SPARK-4986.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@tdas
Copy link
Contributor

tdas commented Jan 4, 2015

Jenkins, this is ok to test

@JoshRosen
Copy link
Contributor

Jenkins, test this please.

@SparkQA
Copy link

SparkQA commented Jan 4, 2015

Test build #25037 has started for PR 3868 at commit 8f0b636.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Jan 4, 2015

Test build #25037 has finished for PR 3868 at commit 8f0b636.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/25037/
Test PASSed.

@tdas
Copy link
Contributor

tdas commented Jan 4, 2015

Could you add an unit test that tests this functionality. Something like this is easy undergo regression silently.

@cleaton
Copy link
Author

cleaton commented Jan 5, 2015

@tdas yes I think I can, I will take a look at it. Thanks for the feedback.

@@ -18,6 +18,8 @@
package org.apache.spark.streaming.scheduler


import org.apache.spark.streaming.util.TimeoutUtils
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor style-nit, but this import should be grouped with the other Spark ones down near line 31.

@JoshRosen
Copy link
Contributor

As long as you're planning to do more work on this, I left a round of (kind of nitpicky) style comments (https://cwiki.apache.org/confluence/display/SPARK/Spark+Code+Style+Guide).

@cleaton
Copy link
Author

cleaton commented Jan 5, 2015

@JoshRosen Thank you for all the comments. I will update accordingly :).
Regarding the waitUntilDone function, yes I assume there must be something like this in Scala library already but I couldn't find it. Using ScalaTest's eventually seems to create a weird dependency. Any suggestions? Thanks.

@JoshRosen
Copy link
Contributor

Yeah, we can't / shouldn't use ScalaTest's eventually here; I was just noting the similarity. Maybe there's a library that has something like this, but I don't know of one offhand (I checked Scalactic and it's not there).


// Wait until all the received blocks in the network input tracker has
// been consumed by network input DStreams, and jobs have been generated with them
logInfo("Waiting for all received blocks to be consumed for job generation")
while(!hasTimedOut && jobScheduler.receiverTracker.hasUnallocatedBlocks) {
Thread.sleep(pollTime)
if (!TimeoutUtils.waitUntilDone(stopTimeout,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, instead of this utility, it might be a better idea to use the clock in this class, which has the waitForTime method. In fact using that clock allows us to check stuff in unit tests using manual clock. Might help you in fact in implementing a unit test for this.

@tdas
Copy link
Contributor

tdas commented Jan 5, 2015

I think I get the idea at a high-level what this patch is trying to do. The key change essentially is in the ReceiverLauncher - the stop is made to use the config spark.streaming.gracefulStopTimeout for tunability. However, this needs to be done carefully such that the behavior of the system is predictable.

The existing code is that if the timeout is set to 10 seconds, the context.stop will exit after 10 seconds. So that timeout applies across all the different waits (wait for receiver stop, wait for job generation, wait for job completion). This is makes it easy for a developer to understand what is going to happen if he/she sets the configuration to 60 seconds.

The current patch changes this to use the timeout at every level. The problem with that approach is that if the developer sets the timeout as 60 seconds, the system may take upto 60 * 3 = 180 seconds to stop gracefully. That's confusing.

So we need to wait intelligently such that max time the whole system waits is predictable and easy to understand. The current code already does that, but only partially as the receiver-stop-wait is fixed to 10 seconds and is not configurable. Maybe there is a better way of refactoring the code to achieve this, and the current patch is not the right way because of aforementioned reasons.

Here are two approaches.

  1. if processAllReceivedData = true, then JobSchduler.stop() calls ReceiverTracker.stop() and JobGenerator.stop() with timeouts which will depend on the how much time has passed since JobScheduler.stop() was called. The overall timeout is the configurable parameter, and ReceiverTracker, ReceiverLauncher, and JobGenerator deal without whatever stop timeout has been given to them.
  2. The whole sequence of stop()s is evaluated in a Future (with its own execution context, not the global one), and the configurable timeout is a timed wait on hte future. This may simplify code, but introduces another thread in the system. Probably okay to do so.

I suggest we try the second approach. What do you think, @cleaton @JoshRosen ?

@cleaton
Copy link
Author

cleaton commented Jan 5, 2015

@tdas Thank you for the input.
Yes, the main purpose of this patch is to make ReceiverTracker graceful by waiting for ssc.sparkContext.runJob(tempRDD, ssc.sparkContext.clean(startReceiver)) to terminate and all receivers to de-register (possible redundant?). I borrowed the aproach used in JobGenerator and you are right I forgot to keep timeWhenStopStarted global.

The second approach sounds good to me. Would make it easier to follow the shutdown sequence if it is consolidated in one place.

And for unit test my idea is to create a dummy receiver implementation that blocks on shutdown while still producing a fixed number of records.

Do you think you or someone else working more closely with spark streaming should take over this patch? Seems it is about deciding which approach is best suited for spark in the long run. I can still try to provide a unit test for this though.

@tdas
Copy link
Contributor

tdas commented Jan 5, 2015

I am happy to review the code if you take a pass on implementing (2). I can jump in if things get too hairy. And the plan for unit test sounds good. The existing unit test does that already. Update and/or extend it as necessary,

@cleaton
Copy link
Author

cleaton commented Jan 5, 2015

OK sounds great. 👍
I can prepare an implementation of (2). Bit busy now, but I think I can have something to review in a week.

Any specific unit test you can suggest for me to take a look at? The existing receiver tests? Thanks :)

@cleaton
Copy link
Author

cleaton commented Jan 25, 2015

@tdas I've added a unit test now covering slow receiver shutdown. What do you think about the approach now? Thanks

@cleaton
Copy link
Author

cleaton commented Jan 28, 2015

Jenkins, test this please.


// First, stop receiving
receiverTracker.stop()
val shutdown = Future {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shutdown --> shutdownFuture

@tdas
Copy link
Contributor

tdas commented Jan 28, 2015

This patch is definitely heading in the right direction. But as I was reading it, I realize that it affects the whole shutdown behavior of all the components pretty heavily. Since the whole purpose of this PR was to fix the bug that graceful shutdown was not waiting receivers to stop, I think it is better to split this attempt into two parts.

  1. PR1 - Fixes the bugs with very surgical local changes. This probably should touch only ReceiverTracker. This is will be small enough to include in Spark 1.3 which is very soon.
  2. PR2 - Refactors the code like this PR does and makes sure that the overall behavior is what is desired.

For PR2, I was thinking what is the desired behavior. This was not at all documented in the code, so I thought of first documenting this. Then verifying whether any PR satisfies the desired behavior will become easy. Here are what I think is the desired behavior.

Overall, ssc.stop(stopGracefully = false) should allow all the internal components enough time to stop immediately but cleanly so that the all resources of each component are cleared. If this requires taking some minimum amount of time, so be it. But each component is responsible for keep this time small. For ssc.stop(stopGracefully = true), the overall system should use whatever the configured timeout is. But here too each component should ensure that all resources are cleared when the stopping thread is interrupted after timeout. So overall the jobScheduler.stop(graceful = true) should do something like

if (graceful) {
    // call stop(graceful = true) on all components inside future, and wait for future with timeout
    // if timeout occurred (so graceful stop was not completed), call stop(graceful = false) on all components to clean all resources
} else {
    // call stop(graceful = false) on all components
}

Anyways, to summarize, it would great if you can submit a more surgical PR1 that affects only receiversTracker (and units to test for this), and then make this current PR refactor stuff to implement this behavior. I would love to have the surgical fix as part of Spark 1.3.

@cleaton
Copy link
Author

cleaton commented Jan 29, 2015

Thank you for the comments @tdas . I think the diff file for this PR has become a bit misleading. It looks like there is more change than there actually is. This is a summary of the PR in its current state.

    1. Propagate the graceful shutdown information to receiverTracker / receiverExecutors stop function.
    1. Add a Future to wrap the whole shutdown sequence.

The second part is because before "spark.streaming.gracefulStopTimeout" was used to timeout the check for "jobScheduler.receiverTracker.hasUnallocatedBlocks" inside JobGenerator.scala . This is now removed from jobGenerator (only remove the timeout from the while loop) and instead use gracefulStopTimeout configuration to change the wrapping Future timeout.

Other than that I have not modified the original shutdown logic, it just shows up as removed and re-added in the diff file.

My purpose with this patch has always been achieve something like your PR1. But because of the "spark.streaming.gracefulStopTimeout" variable it is not so easy to add another place to gracefully wait without modifying other parts a little bit.


About the default timeout change from 10x batch duration to 100x batch duration. Don't you believe 10x is quite small when the batch duration is 1 second or less? 30 seconds might not be enough if a receiver is slow to shutdown for some reason.

I think the whole idea to have a "spark.streaming.gracefulStopTimeout" configuration option outside the code is a bit weird. I had no idea there was such option before I started looking into this code and I can not find it mentioned in the official documentation. I believe most developers are unaware of this timeout and just assumes graceful == "possibly wait forever and have to manually force kill if too slow" (at least this was my initial thought.)

Isn't it better to expose this timeout from the ssc.stop() function instead, making it more visible for developers and giving them an option to actually wait forever if necessary?

When does the merge window for Spark 1.3 close?

Thanks

@tdas
Copy link
Contributor

tdas commented Jan 29, 2015

  1. The merge window for features closes this weekend. However this is a bug fix so we can make it to early next week as well. But the change has to be surgical.
  2. Default timeout - if batch sizes are less than 1 second, then i see your point. Ideally, the receivers should shutdown immediately, and stop receiving data, so only few batches should be generated and executed after the graceful shutdown command has been given. In practice that is probably a wrong assumption. However, this is also confusing if we wait for 100x batches. Most people run ~10s batches, which means the system will wait for 1000 seconds. That's kind a confusing as well if the system is stuck for a while even with default configuration. So I guess the most desirable thing to do that balances both short and large batches is.

i Default timeout max(10x batch size, 1 minute)
ii Log warning if there is a timeout.
iii Expose the timeout configuration

For iii, i do not want to add another configuration parameter to stop(), there are already a large combination of optional parameters in stop() and each addition gets harder to maintain in the Java and Python API. I want to just expose the SparkConf parameter.

Regarding this patch, I agree that the change is probably not much, but since this behavior of the code is not very tested, I am unsure of doing this so late in the release cycle for 1.3. For example, right now in this patch, you are shutting down the threadpool. That would throw a InterruptedException in the thread doing the shutdown. We dont know in which step of the shutdown would that exception, and which module will get cleaned up and which wont. We probably need to catch the exception and then call stop(graceful=true) to ensure full cleanup. But such changes are easy to get wrong without proper testing and can cause significant regressions.

So for this release, it would be great if we can do a surgical fix that solves the bug and expose the timeout to public. And then have a separate PR that cleans this up with refactoring and proper tests testing all aspects of the behavior.

(BTW, I confess that I could have done a better job when I wrote all of that a year ago :) ).

Does that make sense? Will be able to do the surgical fix that ensures that graceful stop waits for the receiver launcher job to finish?

@cleaton
Copy link
Author

cleaton commented Jan 30, 2015

Thanks @tdas

I'll create a new minimal PR with only the receiver shutdown part and the unit test. The main problem is how to deal with the timeout then. The two most simple approaches I can think of is:

    1. Propagate the time it took to shutdown the receivers and deduct it from the waiting time inside JobGenerator.scala
    1. Directly deduct the time by setting the "gracefulStopTimeout" conf parameter to the left over time after successfully stopping the receivers.

What do you think is the best approach for now?

@cleaton
Copy link
Author

cleaton commented Jan 30, 2015

@tdas I have a new branch here: cleaton/spark@apache:branch-1.2...receiverstop

Still missing code to handle gracefulStopTimeout but this diff should be much easier to read.

@tdas
Copy link
Contributor

tdas commented Feb 2, 2015

Yes, the branch looks good. Can you please submit that as a PR. There are a few minor comments that I want to make. Also, please update your patch to use Spark master branch (not branch 1.2 as it seems from the diff).

@cleaton
Copy link
Author

cleaton commented Feb 2, 2015

I will rebase to master tomorrow and submit it as a new PR for SPARK-4986. There is still the problem of how to handle the gracefulStopTimeout option, any thoughts about that?

Thanks

@tdas
Copy link
Contributor

tdas commented Feb 2, 2015

It is fine to not address the timeout in that PR. That PR is about correctness. With that PR, the system will not loose data when gracefully stopped. The time it may take to stop will be up to { [time taken for all the receivers to stop] + [gracefulStopTime] }. Ideally it should take upto [gracefulStopTimeout]. Addressing that requires quite a bit of refactoring, which should be the next PR.

@cleaton
Copy link
Author

cleaton commented Feb 3, 2015

@tdas I have created a new master branch PR. You can find it here: #4338

@tdas
Copy link
Contributor

tdas commented Feb 3, 2015

Thanks! @cleaton Could you close this PR, and then create a new PR with the new stuff. Also create a new JIRA saying something like "Make the gracefulStopTimeout more intuitive to use" and the new PR should have that name.

@cleaton cleaton closed this Feb 3, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
5 participants