-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-6120: RecordCollector should not retry sending #4148
Conversation
Call for review @bbejeck @dguy @guozhangwang |
retest this please |
Utils.sleep(SEND_RETRY_BACKOFF); | ||
} | ||
}); | ||
} catch (final TimeoutException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not a comment: I agree that we should get rid of the retry logic but only rely on producer internals for retries; one caveat of this change though is that if we did hit the issue that KIP-91 tries to solve, Streams will be less resilient as compared to today (previously we can tolerate up to 3X downtime while now it is X downtime) since increasing max.block
will not help here.
Just FYI to @xvrl
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it not help if max.block.ms
was set to a reasonably large value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@guozhangwang can you explain how this affects our ability to withstand downtime? You mentioned 3X vs. X downtime, what currently determines the value of X?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually it's not exactly 3X v.s. X. And here is the difference:
Assuming the broker is down, then without this PR the producer would first use request.timeout
to throw the exception for records in its accumulated queue, and then gets caught here and retry sending, and upon retries it will wait up to max.block.ms
since queue is full and then throw the TimeoutException again, up to three times. So the total time it can endure broker to be down is
request.timeout + 3 * max.block.ms
And without this PR it would be request.timeout
.
Note that the issue itself will only happen if we do not yet know the destination leader of the partition when broker is down, so its likelihood-to-hit is not like 100%.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I though the call to send()
will never throw TimeoutException
after request.timeout
passed. The TimeoutException
will be provided in the callback handler and streams will throw it later on. And this behavior does not change after this PR. One will always need to increase producer config retries
to be more resilient to this scenario.
Please correct me if I am wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The TimeoutException
that we catch here, should only originate from the scenario for a full producer queue and max.block.ms
passed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mjsax That is right, the TimeoutException
from Sender#failBatch()
is returned in the callback's exception, which will only be thrown in the next call. And retries
will not help here. So it is really max.block.ms
v.s. 3 * max.block.ms
.
Currently this config value's default is 60 secs and Streams does not override it. So the effect is that if we do hit the issue that KIP-91's solving, it is a resilience of 60 seconds v.s. 180 seconds.
final RecordCollector collector = new RecordCollectorImpl( | ||
new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) { | ||
@Override | ||
public synchronized Future<RecordMetadata> send(final ProducerRecord record, final Callback callback) { | ||
throw new TimeoutException(); | ||
throw new KafkaException(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm.. I'm wondering how did we succeed in this test case, since in the above code send()
call is only captured with TimeoutException
? Note that we only set the KafkaException in the callback while here we throw exception directly. And in fact, you changed the expected exception from StreamsException to KafkaException in line 128 above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As Damian pointed out, it should still be StreamsException
as expected exception and we need to wrap this exception with an additional catch-block.
@SuppressWarnings("unchecked") | ||
@Test(expected = StreamsException.class) | ||
public void shouldThrowStreamsExceptionAfterMaxAttempts() { | ||
@Test(expected = KafkaException.class) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be StreamsException
?
Updated this. |
FAILURE |
SUCCESS |
1 similar comment
SUCCESS |
Merged to trunk. |
No description provided.