Skip to content

[SPARK-32597][CORE] Tune Event Drop in Async Event Queue#29413

Closed
SaurabhChawla100 wants to merge 5 commits intoapache:masterfrom
SaurabhChawla100:SPARK-32597
Closed

[SPARK-32597][CORE] Tune Event Drop in Async Event Queue#29413
SaurabhChawla100 wants to merge 5 commits intoapache:masterfrom
SaurabhChawla100:SPARK-32597

Conversation

@SaurabhChawla100
Copy link
Contributor

@SaurabhChawla100 SaurabhChawla100 commented Aug 12, 2020

What changes were proposed in this pull request?

There are scenarios where we have seen the event drop in spark, resulting in the inconsistent state for the spark Application(some time application is hung state).

For example - This can be due to the large number of parallel task processing. Producer thread keeps on adding the events to the Queue and Consumer thread is consuming the events at the slower rate compare to the Producer adding in the queue. Resulting the Queue to reach max size and events get dropped from that.

There are times if Queue Size would be little bit higher like 10 percent (or 20 percent) extra of the existing Queue size, the events can be processed preventing the event drop at that point of time.

As per the current architecture size of event Queue can be configured at start of the application by setting spark.scheduler.listenerbus.eventqueue.capacity. Once this is set there is fixed size event Queue(LinkedBlockingQueue), which cannot be changed at run time to accommodate some extra events before dropping event from the Queue

This Jira for adding a support of VariableLinkedBlockingQueue to tune the dropping of the events.

VariableLinkedBlockingQueue -> https://www.rabbitmq.com/releases/rabbitmq-java-client/v3.5.4/rabbitmq-java-client-javadoc-3.5.4/com/rabbitmq/client/impl/VariableLinkedBlockingQueue.html

As the part of this PR, we can provide the support for VariableLinkedBlockingQueue.

  1. We need to add those Queue where we are seeing the event drop and add to spark.set.optmized.event.queue.
    set --conf spark.set.optmized.event.queue=appStatus,executorManagement -> Now these two queues will be
    VariableLinkedBlockingQueue and size of the queue can be increased at run time.
    By default its empty string so all the Queues will use the old behavior LinkedBlockingQueue
  2. We can set threshold of the size of the queue that we want to increase -> spark.set.optmized.event.queue.threshold
  3. We can increase the capacity of the queue, if the driverMemoryUsed is less than 90 percent of max heap size.

Now if we use VariableLinkedBlockingQueue, at the time when the size of the queue reach the size initialized at start of the job , Queue will try to increase the size of the Queue to threshold (spark.set.optmized.event.queue.threshold) and prevent the Event drop at that point of time.

Why are the changes needed?

Now if we use VariableLinkedBlockingQueue, at the time when the size of the queue reach the size initialised at start of the job , Queue will try to increase the size of the Queue to threshold (spark.set.optmized.event.queue.threshold) and prevent the Event drop at that point of time. Tune Event Drop in Async Event Queue

Does this PR introduce any user-facing change?

No

How was this patch tested?

Added the Unit test and tested the functionality using the spark-submit

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@SaurabhChawla100
Copy link
Contributor Author

cc @vanzin, @cloud-fan , @dongjoon-hyun , @zsxwing for review

@Ngone51
Copy link
Member

Ngone51 commented Aug 12, 2020

cc @tgravescs @jiangxb1987 @squito

@tgravescs
Copy link
Contributor

so one question, here, you are seeing events dropped that cause hangs in the application code? What queue were they in?

@tgravescs
Copy link
Contributor

so I definitely get the point here and it would definitely be nice to handle this better somehow, but if you are setting it be queue size + some threshold then the application has to have the memory to handle, so how is that different then just setting queue size to be larger? The one benefit is user doesn't have to try to pick an exact size, but the downside is they don't necessarily know how it affects the memory so you ideally add memory for worst case.

The reason I asked which queue dropped is that normally the event log queue fills up but it should only affect the history. If the executor management queue filled up then could affect dynamic allocation which is definitely bad, but in my opinion we should change that not to rely on events. The other queues ideally don't affect the application running but I am wondering if something is.

Copy link
Member

@Ngone51 Ngone51 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering if we don't want to drop any events, why don't we just set the capacity to Integer.MAX_VALUE? The LinkedBlockingQueue doesn't really allocate that much memory at the beginning.

And I find there's concern about it:

// Cap the capacity of the queue so we get an explicit error (rather than an OOM exception) if
// it's perpetually being added to more quickly than it's being drained.

But I doubt it now. Because we never throw such explicit error but only drop events when offer returns false, right?

And even if we want to throw the error, OOM is still possible when users set a huge capacity, right?

Another thought is, the default value(10000) of spark.scheduler.listenerbus.eventqueue.capacity is probably too small.

@SaurabhChawla100
Copy link
Contributor Author

SaurabhChawla100 commented Aug 12, 2020

so one question, here, you are seeing events dropped that cause hangs in the application code? What queue were they in?

So we have seen the event drop in ExecutorManagement Queue , appStatus Queue, eventlog Queue.

Since there are scenarios where there is some long running jobs in Zeppelin Notebook and Jupyter notebook where the spark Application is running till the time the notebook is stopped or there is some validation that is added that will check if the application is idle based on any job running or not. And if the event for jobEnd is dropped from appstatus queue, this will make the notebook to think spark is running the job , resulting in the Spark Job to be in hung state even the nothing is running for long period of time.

Also we have seen the impact where some stage end , stage start , task info is dropped from the executorMangment Queue resulting in taking the decision for upscaling and downscaling of executors

@SaurabhChawla100
Copy link
Contributor Author

so I definitely get the point here and it would definitely be nice to handle this better somehow, but if you are setting it be queue size + some threshold then the application has to have the memory to handle, so how is that different then just setting queue size to be larger? The one benefit is user doesn't have to try to pick an exact size, but the downside is they don't necessarily know how it affects the memory so you ideally add memory for worst case.

The reason I asked which queue dropped is that normally the event log queue fills up but it should only affect the history. If the executor management queue filled up then could affect dynamic allocation which is definitely bad, but in my opinion we should change that not to rely on events. The other queues ideally don't affect the application running but I am wondering if something is.

So the main issue is , there is no fixed size of the Queue that can work for all the Spark Jobs and once the size of the Queue is set at the start of the application, Linked-blocking Queue can handle max those number of the events when the Queue is full, after that event is dropped. The idea here is to handle this scenario more gracefully where the queue can size can be increased in case overflow of events and once those overflow of those events are over queue size can be set to size which is the initial capacity .

This is handled on per queue if there is problem with one queue(eg appStatus Queue), then that queue size will change at run time.
Also there is future plan to make this logic dependent on the driver memory. Taking the decision to increase the size of the Queue based on certain threshold driver max heap size. This can prevent the OOM.

@SaurabhChawla100
Copy link
Contributor Author

I am wondering if we don't want to drop any events, why don't we just set the capacity to Integer.MAX_VALUE? The LinkedBlockingQueue doesn't really allocate that much memory at the beginning.

And I find there's concern about it:

// Cap the capacity of the queue so we get an explicit error (rather than an OOM exception) if
// it's perpetually being added to more quickly than it's being drained.

But I doubt it now. Because we never throw such explicit error but only drop events when offer returns false, right?

And even if we want to throw the error, OOM is still possible when users set a huge capacity, right?

Another thought is, the default value(10000) of spark.scheduler.listenerbus.eventqueue.capacity is probably too small.

I am wondering if we don't want to drop any events, why don't we just set the capacity to Integer.MAX_VALUE? The LinkedBlockingQueue doesn't really allocate that much memory at the beginning. - This will cause Driver to OOM for long Running Jobs if there are large number of events came at point of time, since these event takes driver memory when present on queue.

But I doubt it now. Because we never throw such explicit error but only drop events when offer returns false, right? - yes thats right

And even if we want to throw the error, OOM is still possible when users set a huge capacity, right? - Generally on running production spark jobs. when we set the huge capacity of the queue, at that point of higher Driver memory is also set compared to what was there previously .

Another thought is, the default value(10000) of spark.scheduler.listenerbus.eventqueue.capacity is probably too small. - yes this is very small, with this size seen lots event drop.

@Ngone51
Copy link
Member

Ngone51 commented Aug 12, 2020

I am wondering if we don't want to drop any events, why don't we just set the capacity to Integer.MAX_VALUE? The LinkedBlockingQueue doesn't really allocate that much memory at the beginning.

This will cause Driver to OOM for long Running Jobs if there are large number of events came at point of time, since these event takes driver memory when present on queue.

Could VariableLinkedBlockingQueue avoid OOM in that case?

@SaurabhChawla100
Copy link
Contributor Author

I am wondering if we don't want to drop any events, why don't we just set the capacity to Integer.MAX_VALUE? The LinkedBlockingQueue doesn't really allocate that much memory at the beginning.

This will cause Driver to OOM for long Running Jobs if there are large number of events came at point of time, since these event takes driver memory when present on queue.

Could VariableLinkedBlockingQueue avoid OOM in that case?

Using this VariableLinkedBlockingQueue, we are increasing certain percentage threshold for size of the queue at run time, Which is less aggressive compare to keeping the Integer.MAX_VALUE, Also I am planning to add the validation of spark driver memory to increase the size of the VariableLinkedBlockingQueue

@Ngone51
Copy link
Member

Ngone51 commented Aug 12, 2020

Using this VariableLinkedBlockingQueue, we are increasing certain percentage threshold for size of the queue at run time, Which is less aggressive compare to keeping the Integer.MAX_VALUE, Also I am planning to add the validation of spark driver memory to increase the size of the VariableLinkedBlockingQueue

IIUC, LinkedBlockingQueue doesn't really allocate all the memory in once by setting the capacity to Integer.MAX_VALUE.
It also increase memory usage gradually at runtime.

@SaurabhChawla100
Copy link
Contributor Author

Using this VariableLinkedBlockingQueue, we are increasing certain percentage threshold for size of the queue at run time, Which is less aggressive compare to keeping the Integer.MAX_VALUE, Also I am planning to add the validation of spark driver memory to increase the size of the VariableLinkedBlockingQueue

IIUC, LinkedBlockingQueue doesn't really allocate all the memory in once by setting the capacity to Integer.MAX_VALUE.
It also increase memory usage gradually at runtime.

That's right, memory is allocated once the event is present in the Queue. In case when huge number of events flowing ,on keeping Integer.MAX_VALUE and there is no control on the number of events on the event queue, it keeps the large number of events queue(might be 100k or more), So there is chance of OOM is here if the size is not controlled and driver memory is not sufficient.

VariableLinkedBlockingQueue is similar to LinkedBlockingQueue with the functionality of setting the size of the queue at run time. So we can make use of this VariableLinkedBlockingQueue and adjust the size based on various conditions

@tgravescs
Copy link
Contributor

sorry I'm still not seeing any difference here then increasing the size of the current queue? If both are not really allocating memory for the entire amount until runtime then either way you have to set memory of driver to be the maximum amount used. why not set the queue size to size + size * spark.set.optmized.event.queue.threshold?
If you look at the driver memory used, I don't think that is very reliable. They could change very quickly.

@SaurabhChawla100
Copy link
Contributor Author

SaurabhChawla100 commented Aug 13, 2020

sorry I'm still not seeing any difference here then increasing the size of the current queue? If both are not really allocating memory for the entire amount until runtime then either way you have to set memory of driver to be the maximum amount used. why not set the queue size to size + size * spark.set.optmized.event.queue.threshold?
If you look at the driver memory used, I don't think that is very reliable. They could change very quickly.

If you look at the driver memory used, I don't think that is very reliable. They could change very quickly. - Yes agree used driver memory can change quickly, That why added the validation of used driver memory threshold as 90 percent. We can make it 95 percent or configure through Conf instead of directly using in the code. Here if we consider the VariableLinkedBlockingQueue then we can change the size of the queue at run time.

Ok If we take out consideration of VariableLinkedBlockingQueue and Driver Memory validation to handle this issue, than I can think of below 2 approach

  1. Either to Set Queue size as size + size * spark.set.optmized.event.queue.threshold -> The problem with this approach is that event drop can still happen after increasing the size if the flow of incoming events is at higher rate compare to consumer rate . In this case also critical events can be dropped from the queue (executor Mangament Queue,Appstatus Queue etc). But this will help in the scenario where there is high rate of event at one point of time and on increasing with spark.set.optmized.event.queue.threshold can help in preventing the overflow the queue here.

  2. Make the size of the queue as Unbounded -> The only problem with this approach is Driver getting OOM. But If the event are really important and we cannot avoid dropping the event while running Spark Job. For example -
    a) For some user while running the Spark Job they cannot avoid dropping of events from Appstatus Queue like in case of notebooks(jupyter/Zeepline)
    b) For other user they want to have the dynamic allocation of resources (upscaling/ down scaling) needs to be consistent.
    c) For other user they want to have all the events in the eventlog File, which they used later for ML models to analyse the Spark Jobs.
    There is need to provide higher driver memory in this case but some spark jobs does not require this much driver memory. We cannot make default high driver memory since some application can run fine with default driver memory 1 gb.

We can think of making it per queue basis any such approach based on the user requirement like I have already done in this PR using the conf(spark.set.optmized.event.queue) for VariableLinkedBlockingQueue at the start of application.

Happy to discuss any other idea for tuning these Queues

@Ngone51 @tgravescs

@tgravescs
Copy link
Contributor

We can think of making it per queue basis any such approach based on the user requirement like I have already done in this PR using the conf(spark.set.optmized.event.queue) for VariableLinkedBlockingQueue at the start of application.

I don't get your point here. it can already be configured on a per queue basis (set "spark.scheduler.listenerbus.eventqueue.$name.capacity") and if user sets spark.set.optmized.event.queue why not just set spark.scheduler.listenerbus.eventqueue.$name.capacity to be larger to match whatever driver memory set at?
I'm not necessarily against a change here, I get the issues with dropping events, but this just feels like extra code to do what user can already do. If I'm missing something and you have specific use case I'm missing, please explain in more detail.

So the executor management queue is definitely a problem and if we keep seeing issues personally I think we should integration dynamic allocation manager into scheduler and get away from using messages. The app status queues, I hadn't thought about things like zepplin using those messages, but that makes sense, in just a normal app they shouldn't affect normal application runs.

The only time I've seen issues with these queue is generally when dynamic allocation is enabled and you are running huge number of tasks and dynamic allocation is requests lots of executors and generally task time is low. Basically the startup point when we request executors, we get messages from them starting, we get task start and end messages all going at the same time. I'd be curious to hear what conditions are causing these for you. Part of it is to make sure we haven't regressed and something is taking a long time to process events. its not ideal but we have fixed certain components that did this and it has helped in the past.

I don't really see a solution to this problem unless you can dynamic change it and reliable get the memory of the driver. The problem is (depending on GC and tunings being used) heap will use memory up to a certain point even if it didn't need to necessarily, getting those metrics I don't think is really reliable. I know in the past things like the runtime stats don't increment all the time, it has to have a certain number allocated. The problem with this also is that is heap is used a lot you can't dynamically change so then it goes back to user to increase heap size, which at the same time they might as well increase queue size. So even this is not ideal.

Another thing we can do is if there are certain events critical we can look at putting them into its own queue as well. Perhaps another question is, is your driver just running out of CPU to process these fast enough?

@itskals
Copy link

itskals commented Aug 13, 2020

I was thinking that though spark has many queues and in many cases may be not all queues are used used to the same level at the same time.. I mean, when some queues are heavily used, some might be consume those events faster.

If this is true, instead of having separate queue sizes for each and making it rigid, what if its a pool, from which the queue can loan event holders. and when the events are processed, they are given back to the pool.

This pool is not memory units stuff, rather its just a counter (atomic probably). Let's say if we say for a driver memory of X GB, we allocate N event holders, which needs to be used by all queues. N then is the pool size.
When an event needs to be enqueued in a queue, ask the pool if an event placeholder can be used. If it says ok ( based on the current value of usage) then the queue can enqueue event. if no, the event is dropped.

The idea here is that it is a middle ground between restricted queue size and infinite capacity queue. So here the queues are not statically bound to a size but rather more flexibility of given to grow. Also it has a softer high water mark ( which is N) beyond which it cant grow.

Let me know what you think... @SaurabhChawla100 @Ngone51 @tgravescs

@tgravescs
Copy link
Contributor

Part of the problem with that is that in most cases some queues have higher priority, which is why they were split apart. you really want the executor management queue to work if dynamic allocation is on so you wouldn't want to take the chance of another queue using all of its size. If its off then its not as big of a deal and you could "share" that size. Beyond that I don't really know the stats on the number of message the queues usually see. It would make configuring individual queues much harder but I don't know how much people do that.

@Ngone51
Copy link
Member

Ngone51 commented Aug 13, 2020

It would make configuring individual queues much harder but I don't know how much people do that.

Much harder? IIUC, if users have some experienced stats of the queues of the applications, I guess they could set the individual queues more accurately and we don't need such "pool" at all.

@SaurabhChawla100
Copy link
Contributor Author

SaurabhChawla100 commented Aug 13, 2020

I don't get your point here. it can already be configured on a per queue basis (set "spark.scheduler.listenerbus.eventqueue.$name.capacity") and if user sets spark.set.optmized.event.queue why not just set spark.scheduler.listenerbus.eventqueue.$name.capacity to be larger to match whatever driver memory set at? I'm not necessarily against a change here, I get the issues with dropping events, but this just feels like extra code to do what user can already do. If I'm missing something and you have specific use case I'm missing, please explain in more detail.

Yes I agree we can use "spark.scheduler.listenerbus.eventqueue.$name.capacity" to set the value. Entire idea is to make Queue to perform well in situation where the Queue overflow the initial capacity(i.e. set using conf spark.scheduler.listenerbus.eventqueue.$name.capacity) and some additional threshold capacity helps in reducing the event drop and there will be event drop after it crosses the threshold value. This is the best case scenario where the event drop handled at the time by the threshold capacity and no event is dropped. For handling this I used VariableLinkedBlockingQueue so at run time this can be taken care by increasing the initial capacity at run time with some validation of keeping the driver memory into the consideration.

This can be done with LinkedBlockingQueue by setting the queue size to add this extra threshold when creating the Queue at the start of the Spark Application.

Since this is also true that "There is no fixed size of the Queue which can be used in all the Spark Jobs and even for the Same Spark Job that ran on different input set on daily basis". So this extra threshold can helps in best case scenario where the initial Queue size is set. This will prevent abrupt behaviour of Spark application due to event drop for the critical Queues(appStatusQueue, executormangmentQueue etc).

If something fails / abrupt behaviour after this extra threshold, then consider some manual effort in tuning the queue size which we will do incase of setting "spark.scheduler.listenerbus.eventqueue.$name.capacity". If we consider only this "spark.scheduler.listenerbus.eventqueue.$name.capacity" and every time we need to change the size of the queue when the application performs abruptly which requires more manual efforts even to same Job compare to the cases which is already handled by using the threshold.

So this is one of the Real World scenario of Multi-Tenant environment, where multiple Spark Application running on same Spark Cluster with fixed number of max nodes that can come and there is fixed number of the cores and memory available. Some of the abrupt behaviour that we have seen in this case

  1. Scenario when dynamic Allocation is enabled and event drop has happened in executor management queue and can result in not downscaling of the executor for one of the Spark Application, Even though Executor is idle for long time. So this is holding a resource and some other application wants some more resource and did not get since there is limited number of resources available in the Cluster.

  2. Also seen the scenario where there is dependency Zeppelin notebook on AppStatus Queue and event drop prevented the spark application to go down after the idle timeout, If non of the spark Jobs running for some time. Here also we found the resource is holded one resource is driver and other min number of executors that we set, which can be used by some other Spark Application

Also there are many scenarios that can impact the spark Application due to Event Drop.

This is just to reduce manual intervention on changing the conf to some extent.

Another thing we can do is if there are certain events critical we can look at putting them into its own queue as well. Perhaps another question is, is your driver just running out of CPU to process these fast enough?

  • Completely Agree on this.

@tgravescs
Copy link
Contributor

Yeah so you have described the affects of it dropping the events, which I know. The thing I want to know is why it dropped the events in your cases.
I'm not sure the circumstances you are running or seeing these issues, is this just individuals running their own clusters. If it's a centralized cluster, why not have some cluster defaults and just raise the default values there? This is essentially just doing that.

@itskals
Copy link

itskals commented Aug 14, 2020

Much harder? IIUC, if users have some experienced stats of the queues of the applications, I guess they could set the individual queues more accurately and we don't need such "pool" at all.

@Ngone51 if configuring the event sizes was so easy, then I am fine. I am of the opinion that it is bit hard to arrive at right number, might need trial and error... Guessed, it would have been easier to configure 1 number than 3 or 4... also with some dynamism like this PR, will help... anyways thanks.

@Ngone51
Copy link
Member

Ngone51 commented Aug 14, 2020

@itskals Ah..no. I was not saying configuring individual event sizes is easy. I was just confused about @tgravescs 's comment.

@tgravescs
Copy link
Contributor

I was just saying, if you had one pool you took from you can't control it per queue. Unless of course you put in some sort of minimum or something per queue. In most of the applications I've seen the executor management queue is most important as it will mess up dynamic allocation and can results in application hangs, wasted resources etc. The other queues generally just drop events and lose history information. Obviously the use cases here for jupyter/zeppelin might be different though.

@SaurabhChawla100
Copy link
Contributor Author

SaurabhChawla100 commented Aug 17, 2020

Yeah so you have described the affects of it dropping the events, which I know. The thing I want to know is why it dropped the events in your cases.

There are scenarios where in stage there are large number of task and all task completed in very small time resulting in the queue to fill up very fast and dropping for event after some time from those Queues (executormanagement Queue, AppStatus Queue, even in Event Log Queue). For us executormanagement and AppStatus matters most since we have dynamic allocation enabled and most of the jobs are on the Notebooks( jupyter/zeppelin)

I'm not sure the circumstances you are running or seeing these issues, is this just individuals running their own clusters. If it's a centralized cluster, why not have some cluster defaults and just raise the default values there? This is essentially just doing that.

We are already setting some value in cluster defaults . But as I said earlier also "There is no fixed size of the Queue which can be used in all the Spark Jobs and even for the Same Spark Job that ran on different input set on daily basis"
Thats the reason we are looking for some dynamism in this process queue and some way to reduce human efforts to change the Queue size. IMO changing the driver memory(increasing / decreasing the driver memory) is easier compare to setting the Queue size(also there are multiple Queues) and also debugging the application failed due to OOM is easier to debug than debugging the impact of Event drop, since there are many cases to look upon this event drop (application hangs, wasted resources etc.)

@tgravescs @Ngone51 @itskals

@tgravescs
Copy link
Contributor

We are already setting some value in cluster defaults . But as I said earlier also "There is no fixed size of the Queue which can be used in all the Spark Jobs and even for the Same Spark Job that ran on different input set on daily basis"

So what is your plan to set these configs at? unlimited? If there is no set size for all applications then how does this help, you are adding a config to add a set size to a set size config.

lets say I make my event queue size 10000 for the entire cluster. My spark.set.optmized.event.queue.threshold is 10% 10000 * 10% = 1000. This means my event queue size is 11000.
If users are already changing the event queue size from 10000 to say 30000, then there is no reason they can't make it 33000 (the same as having spark.set.optmized.event.queue.threshold=10%.

@SaurabhChawla100
Copy link
Contributor Author

lets say I make my event queue size 10000 for the entire cluster. My spark.set.optmized.event.queue.threshold is 10% 10000 * 10% = 1000. This means my event queue size is 11000.
If users are already changing the event queue size from 10000 to say 30000, then there is no reason they can't make it 33000 (the same as having spark.set.optmized.event.queue.threshold=10%.

So the idea here is whatever be initial capacity is set, have some threshold on top that initial capacity that can prevent the event drop in the best case scenario.So that there is less human effort is applied in changing the conf for setting queue capacity.

if users are already changing the event queue size from 10000 to say 30000, then there is no reason they can't make it 33000 (the same as having spark.set.optmized.event.queue.threshold=10%. - Yes they can make but in next run , after they see some abrupt behaviour (application hung/ Resource wasted).But what if due to this extra threshold size there is no event drop. Might be that extra size (33000 with 10% or 36000 with 20%) prevented the user to change the conf more frequently.

Anyways If we are the only one impacted by this event drop problem and others are fine with setting it manually. Then I believe the current behaviour is fine.

We can revisit this problem if there is some ask for this in future.

Thank you every one for their valuable feedback on this issue

@tgravescs
Copy link
Contributor

Yes they can make but in next run , after they see some abrupt behaviour (application hung/ Resource wasted).But what if due to this extra threshold size there is no event drop. Might be that extra size (33000 with 10% or 36000 with 20%) prevented the user to change the conf more frequently.

I don't understand this, how does it prevent the user from changing the conf? They either change the percentage to be 20% or the event queue size to 36000. Either way they change a conf.

@itskals
Copy link

itskals commented Aug 18, 2020

I don't understand this, how does it prevent the user from changing the conf? They either change the percentage to be 20% or the event queue size to 36000. Either way they change a conf.

@tgravescs what I understand from the discussions here is that, let us assume 30,000 is a good number in many cases for a certain workload. But in some scenarios, like slight change in input data pattern etc, the number of events generated increases. In that case, this 20% can be helpful. How? This 20% will accommodate, hopefully, the extra burden w/o losing the events and user manually changing it to 36,000. ( could be seen this way, "instead of changing the value twice, change two values once")

Other things that I see is that, the queue size need not increase to the config value( 20%) in one step. It can be gradual and multi step. Also, it can shrink back to original value when event queue is back to "acceptable" usage.
Also I see that the increase in the additional queue size is memory pressure aware and tries to keep itself as best effort. In such cases, setting this value to a higher value say 100% might not harm.

Anyways, it feels good if the system detects that need to increase the event size and does it automatically, while at the same time being resource aware + giving flexibility to user to control it (if needed).

@Ngone51 @SaurabhChawla100

@Ngone51
Copy link
Member

Ngone51 commented Aug 18, 2020

In that case, this 20% can be helpful.

The problem is why you would like to set it to 20% in first place? Why not 10% or 30%? If one exactly know he/she would expect 20% more size, I think he/she would/could also set it to 36,000, especially when he/she has no background of this PR.

Instead of just increasing the queue size(percentage or fixed number), I'd prefer to improve the utilization of the current queue. The idea of “pool” might be a good inspiration. And as @tgravescs mentioned above, if we worry about some queues, we could set the minimum size to avoid aggressive "sharing". But I think the most important thing here is that we need to investigate the statistics of the queues of common applications and see if the "pool" could really help to ease the problem.

@SaurabhChawla100
Copy link
Contributor Author

what I understand from the discussions here is that, let us assume 30,000 is a good number in many cases for a certain workload. But in some scenarios, like slight change in input data pattern etc, the number of events generated increases. In that case, this 20% can be helpful. How? This 20% will accommodate, hopefully, the extra burden w/o losing the events and user manually changing it to 36,000. ( could be seen this way, "instead of changing the value twice, change two values once")

@itskals - Thanks for explaining this is detail

The problem is why you would like to set it to 20% in first place? Why not 10% or 30%? If one exactly know he/she would expect 20% more size, I think he/she would/could also set it to 36,000, especially when he/she has no background of this PR.

We can use it as default value like 10% in the conf and user has fixed the value of some 30,000 only. And there is slight increase in the load while processing the job, and we are getting 32,000 task completed in small interval of time and causing the overflow of the queue even after 30,000 . But if we have this extra threshold of default value 10% which is 33000 than those event drops is prevented and job can be prevented from the abrupt behaviour due to the event drop. And also prevented user to change the value from 30000 to some other number less frequently.

This is the best case scenario example and the proposal is to handle this best case scenario, which prevents the event drop.

@mridulm
Copy link
Contributor

mridulm commented Aug 18, 2020

Queue capacity is an approximation to bound memory usage in event queues; and tends to be conservative - event loss is fine (except DRA) - but driver OOM causes entire app to fail.

@SaurabhChawla100 I understand the motivation, but if the 'x'% more required to avoid dropping event could have been predicted - it also could have been very well used to bump up capacity of queue to begin with along with corresponding driver memory increase.

Since a reasonably simple solution exists in this case (by leveraging queue capacity configs), based on the current usecases/discussion in the PR, I agree with @tgravescs - I dont see a good reason to introduce the additional complexity.

@cloud-fan
Copy link
Contributor

I think we can't 100% guarantee no events drop, whatever we do is just a best effort. That said, the consumer side of the event queue should not be sensitive to event drop. If your use case is to check if a Spark application is still running jobs, can we query the spark driver status via REST API directly?

I get the intention of this PR to make the queue size more dynamic, as the peek number of events is unpredictable. But I don't see how this patch solves it, as users still need to predicate it and set the new config.

@itskals
Copy link

itskals commented Aug 18, 2020

I think the direction to proceed for us, is to get to an agreement that this is worthy problem to solve.... few questions from my side, which can help:

  1. Is the impact of event drops critical for the reliability of spark ?
    [Note: By spark here I mean the base offering core+Dynamic allocation+App status etc]
  2. How common is event drop issue that one has got a complaint of from ones experience?
  3. What is general remedy one offers? ( it could be general strategy Databricks (or your company) offers to its customers)

For the solutions part [if the issue is critical to solve]:

  1. what should be the general direction to solve such problems? [auto-detect=> auto-adjust=> auto-heal etc]
  2. How much user involvement is acceptable?

If we have a quorum, we can decide what can be next steps......

@tgravescs
Copy link
Contributor

From my experience:

  1. For the dynamic allocation it definitely is, but in my opinion the right solution there is to pull the dynamic allocation manager into the core scheduler and not rely on the messages and listener queues. I have not seen direct issues with others.
  2. In my previous experience it was very uncommon and adjust queue sizes worked. But I also had increased the default size of the queue along with the default memory of the driver to be larger at a cluster config level.

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Nov 27, 2020
@github-actions github-actions bot closed this Nov 28, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants