-
Notifications
You must be signed in to change notification settings - Fork 13.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-3720: Change TimeoutException to BufferExhaustedException when no memory can be allocated for a record within max.block.ms #8399
Conversation
…ncrease buffer-exhausted-records metric when no memory can be allocated for a record withhin max.block.ms.
|
||
/** | ||
* This exception is thrown if the producer is in non-blocking mode and the rate of data production exceeds the rate at | ||
* which data can be sent for long enough for the allocated buffer to be exhausted. | ||
*/ | ||
public class BufferExhaustedException extends KafkaException { | ||
public class BufferExhaustedException extends TimeoutException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add comment for this inheritance? this change is for keeping compatibility.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this for backwards compatibility? It seems to me that part of the reason is to allow the users to handle all timeout related type of exceptions with one catch clause.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this for backwards compatibility?
the client code catching TimeoutException to handle memory issue will be broken if BufferExhaustedException does not extend TimeoutException.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is a bit hen and egg I guess. The main reason for adding it in this case was compatibility (at least to my mind), but what you say is a very welcome side effect.
Happy to add a clarification to the comment of course if we feel this makes sense.
For another, the purpose of this PR is totally different from issue (Remove BufferExhaustedException from doSend() in KafkaProducer). Please update the title of issue after this PR is accepted. |
trace related #1417 and notice the following significant comment
WDYT? |
Hi @chia7712, thanks for your comment! I've added a comment on why the Exception extends TimeoutException and generally updated the javadoc for that class a little. Renaming the jira should not be an issue. Regarding your comment on whether to catch and rethrow or wrap in the Future the exception, I've written my thoughts on that in the jira covering this PR. In short, I agree that it would make sense to rethrow, but that could potentially break user implementations as it changes the current functionality. |
… used to test two different failure scenarios that both used to throw TimeoutException, but now one of them throws BufferExhaustedException.
@soenkeliebau In the original PR (#1417), there was a discussion whether this exception should be thrown within the future or not. What's your reasoning for choosing the option you did? |
@ijuma My main reason was that I wanted to keep current behavior as it is. Granted, the number of people that this might affect would probably be extremely small, but still .. |
@soenkeliebau Is that true? We currently throw |
That is true, but the code in that catch block can currently only be triggered by metadata refresh taking too long. If we rethrow we now add a second scenario in which that block might be executed, while bypassing the catch (TimeoutException e) block further down that has been tailored to this scenario. Something like this for example (full disclosure, haven't tested it): Future<RecordMetadata> sendResult = null;
try {
sendResult = producer.send(new ProducerRecord<>(topic, null, "test"));
} catch (TimeoutException e) {
System.out.println("Metadata refresh took too long!");
// ...
}
// ...
producer.flush();
try {
sendResult.get();
} catch (TimeoutException e) {
System.out.println("Couldn't allocate memory for record within max.block.ms");
} catch (Exception e) {
e.printStackTrace();
} We'd have the same issue with Callbacks, all code looking for TimeoutExceptions in a Callback would be bypassed if we rethrow.. I freely admit that I'm probably harping on about fringe cases here!! If everybody else thinks this is a non-issue I am happy to accept that! |
@soenkeliebau Metadata refresh taking too long and allocation being blocked for too long are pretty similar. The timeout config for both is even the same ( @hachikuji @junrao Do you have any thoughts on this? |
@ijuma As I said, I agree with you that this should throw in the foreground, I just wanted to be conservative about any chance of breaking user code, maybe too conservative :) If we make sure to mention this in the notable changes and not include it in a bugfix release that should be fine I think. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@soenkeliebau : Thanks for the PR. Looks good overall. Just a couple of comments below.
@@ -947,6 +947,8 @@ private void throwIfProducerClosed() { | |||
// for other exceptions throw directly | |||
} catch (ApiException e) { | |||
log.debug("Exception occurred during message send:", e); | |||
if (e instanceof BufferExhaustedException) | |||
this.metrics.sensor("buffer-exhausted-records").record(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we do this inside RecordAccumulator, which is where the "buffer-exhausted-records" sensor is defined?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@junrao , counter-proposal, if I may: can we maybe move the sensor definition to BufferPool and record the metric there, as that is where the exception is originally thrown.
In RecordsAccumulator we would have to define a new catch block just for this purpose, whereas BufferPool has a dedicated conditional ready to use.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that seems reasonable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've moved things around to this effect.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ping @junrao
@@ -947,6 +947,8 @@ private void throwIfProducerClosed() { | |||
// for other exceptions throw directly | |||
} catch (ApiException e) { | |||
log.debug("Exception occurred during message send:", e); | |||
if (e instanceof BufferExhaustedException) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's fine to keep compatibility for now. However, I do wonder if we should just throw ApiException to the caller since the producer can block for max.block.ms. This needs a KIP discussion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't looked at it, but do all ApiExceptions that may be caught here observe max.block.ms?
Or do you mean just throw ApiException in case the buffer is exhausted?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking that all retriable errors should observe max.block.ms if not already. For non-retirable errors, just throw the exception immediately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll do some digging on this topic. But if I am not misunderstanding you, we can handle that in a separate change and merge this for now, right?
…here the exception originates. Reordered imports in PlaintextProducerSendTest.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@soenkeliebau : Thanks for the updated PR. One more comment below.
clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@soenkeliebau : Just a couple of more comments on unit tests.
public void testBufferExhaustedExceptionIsThrown() throws Exception { | ||
BufferPool pool = new BufferPool(2, 1, metrics, time, metricGroup); | ||
pool.allocate(1, maxBlockTimeMs); | ||
pool.allocate(2, maxBlockTimeMs); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This waits on real Timer. So waiting 2 secs in a unit test is too long. Perhaps try 10ms?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Happy to do that of course. The maxBlockTimeMs affects the entire class of tests though, I'd like to move that to a separate PR to make the change easier to trace in case tests become unstable due to this.
I ran a couple thousand tests with different values and for me testBlockTimeout became unstable with a value of 10ms - afaik can tell we are betting on a race condition in line 188 . We allocate three bytes, start delayed deallocations, wait a little and then hope that at least one deallocation took place by the time we check. Which worked for 2000 ms, but apparently breaks sometimes for 10 ms.
It is an easy fix by changing the condition to 7 instead of 8, but I'm not sure how much actual worth that assertion has after that.
Happy to discuss this further, but maybe we can first agree on if a new PR makes sense. I think it makes sense to separate this out tbh.
@@ -171,14 +181,14 @@ public void testBlockTimeout() throws Exception { | |||
try { | |||
pool.allocate(10, maxBlockTimeMs); | |||
fail("The buffer allocated more memory than its maximum value 10"); | |||
} catch (TimeoutException e) { | |||
} catch (BufferExhaustedException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to the above, perhaps reduce maxBlockTimeMs to 10ms?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks @junrao - I answered in your related comment above.
clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@soenkeliebau : Thanks for the reply. I agree that we can address maxBlockTimeMs in a separate PR. Could you file a jira for that? For this PR, LGTM
Thanks for your help Jun! |
@soenkeliebau did you file the JIRA @junrao mentioned above? It would be good to reference it here. |
@ijuma apologies for the delayed response I did indeed open that pull request and an accompanying issue. The PR is merged by now, but it appears that I forgot to close the ticket, I'll do that now. |
Change TimeoutException to BufferExhaustedException when no memory can be allocated for a record within max.block.ms
Refactored BufferExhaustedException to be a subclass of TimeoutException so existing code that catches TimeoutExceptions keeps working.
Added handling to count these Exceptions in the metric "buffer-exhausted-records".
Test Strategy
There were existing test cases to check this behavior, which I refactored.
I then added an extra case to check whether the expected Exception is actually thrown, which was not covered by current tests.
Committer Checklist (excluded from commit message)