Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-6691][Streaming][WIP] Add a dynamic RateLimiter for Spark Streaming #5385

Closed
wants to merge 7 commits into from

Conversation

jerryshao
Copy link
Contributor

This proposal add a dynamic RateLimiter for Spark Streaming's both receiver and direct based input streams. The details can be seen in SPARK-6691.

@SparkQA
Copy link

SparkQA commented Apr 7, 2015

Test build #29781 has finished for PR 5385 at commit d2382dc.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
  • This patch does not change any dependencies.

@SparkQA
Copy link

SparkQA commented Apr 7, 2015

Test build #29787 has finished for PR 5385 at commit 180dfdf.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
  • This patch does not change any dependencies.

@SparkQA
Copy link

SparkQA commented Apr 8, 2015

Test build #29843 has finished for PR 5385 at commit 82ae607.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
  • This patch does not change any dependencies.

@jerryshao
Copy link
Contributor Author

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Apr 9, 2015

Test build #29926 has finished for PR 5385 at commit 82ae607.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
  • This patch does not change any dependencies.

@SparkQA
Copy link

SparkQA commented Apr 9, 2015

Test build #29927 has finished for PR 5385 at commit b576d9e.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
  • This patch does not change any dependencies.

@andrewor14
Copy link
Contributor

@jerryshao is this still WIP? Maybe @tdas has some thoughts on this. An alternative by the way would be to implement a version of dynamic allocation that integrates well with streaming.

@harishreedharan
Copy link
Contributor

On that note, not a whole lot should change between spark streaming and spark for dynamic allocation I think. The only thing of concern is the timeouts - because we don't want to lose blocks that have not been processed yet or ones that are required for windowing. I am wondering whether it makes sense to add some way of "pinning" blocks so executors don't get removed.

I guess it is alleviated to a very large extent with the direct kafka dstream which should work out of the box with dynamic allocation, since no data is actually downloaded until it is ready to be processed, which happens within tasks which means it won't be killed by ExecutorAllocationManager while downloading data.

@andrewor14
Copy link
Contributor

Well, there are at least two other important considerations:

  • We should use the batch queue instead of the task queue
  • We should never kill executors with receivers

The first is important because dynamic allocation currently doesn't really do anything in most streaming workloads. The second is crucial because we don't want dynamic allocation to disrupt the basic function of the application. I actually think it will be a non-trivial amount of work in addition to ensuring that we don't lose the blocks for windowing.

@harishreedharan
Copy link
Contributor

On Thursday, June 18, 2015, andrewor14 notifications@github.com wrote:

Well, there are at least two other important considerations:

  • We should use the batch queue instead of the task queue

Not sure what you mean here - can you explain a bit? In the receiver case
partition count depends on amount of data received while in the diret
stream case it is fixed, so that we'd have to rethink since the task count
is constant.

  • We should never kill executors with receivers

This is not an issue since the receiver is basically executed as a long
running task.

The first is important because dynamic allocation currently doesn't really
do anything in most streaming workloads. The second is crucial because we
don't want dynamic allocation to disrupt the basic function of the
application.


Reply to this email directly or view it on GitHub
#5385 (comment).

Thanks,
Hari

@jerryshao
Copy link
Contributor Author

Hi @andrewor14 , this is a kind of proposal to show a back pressure mechanism to do the flow control, I think if this solution is accepted, still we need to have a lot of refactoring work, if there are any other better solutions I will close it.

Besides this solution is little different from dynamic allocation, dynamic allocation is used for better resource control, but here it is used for better flow control specifically in streaming area (especially for receiver-based stream), I think current dynamic allocation maybe could not well address this problem.

@andrewor14
Copy link
Contributor

@jerryshao Yes, TD and I discussed this also recently. Both of us agree that we should have some kind of solution for handling back pressure in streaming. Some notion of flow control is good, but additionally we could also solve this with dynamic allocation. I think moving forward it makes sense to have both solutions because flow control is necessary if we cannot acquire more resources in the cluster.

@andrewor14
Copy link
Contributor

@harishreedharan Yes, receiver is a long-running task, and so we don't want to disrupt it. According to TD the main reason why the batch queue builds up is because the execution can't catch up with the receiving, not because we have too few receivers. I think it's simplest to limit dynamic allocation in streaming to adding / removing executors without receivers.

Not sure what you mean here - can you explain a bit? In the receiver case
partition count depends on amount of data received while in the diret
stream case it is fixed, so that we'd have to rethink since the task count
is constant.

Having a long batch queue means execution can't keep up with receiving, so we need more executors to speed up the execution. Note that in streaming, the scheduler task queue may fluctuate quite a bit because the batch interval may not be long enough for the timeouts to elapse continuously, i.e. the task queue probably won't sustain across batches. Maybe @tdas can explain this better.

@andrewor14
Copy link
Contributor

@tdas @dragos is this patch still relevant after the latest back pressure work?

@dragos
Copy link
Contributor

dragos commented Sep 2, 2015

I'm afraid not. The corresponding Jira ticket has been closed as a duplicate by @tdas on July 27, so we should close this PR as well. @jerryshao do you think there's something in your approach that would make sense to be ported to the existing backpressure implementation?

@jerryshao
Copy link
Contributor Author

OK, thanks a lot. Will port if still make sense.

@jerryshao jerryshao closed this Sep 2, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
5 participants