-
Notifications
You must be signed in to change notification settings - Fork 13.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-2837: fix transient failure of kafka.api.ProducerBounceTest > testBrokerFailure #648
Conversation
2015-12-1
2015-12-04#KAFKA-2893
2015-12-9
|
||
val numServers = 2 | ||
|
||
val overridingProps = new Properties() | ||
overridingProps.put(KafkaConfig.AutoCreateTopicsEnableProp, false.toString) | ||
overridingProps.put(KafkaConfig.MessageMaxBytesProp, serverMessageMaxBytes.toString) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you need to remove this config?
@ZoneMayor Thanks for looking into this. I think this reasoning makes sense. In the current producer version we have already deprecated METADATA_FETCH_TIMEOUT_CONFIG and BLOCK_ON_BUFFER_FULL_CONFIG, so in order to eliminate this issue instead of just reducing its likelihood we could choose to not set any of these two, and instead set MAX_BLOCK_MS_CONFIG to Long.MAX_VALUE so that the producer will be blocked forever if there is not enough data. |
👍 The reasoning also makes sense to me. |
2015-12-10
…into trunk-KAFKA-2837
@@ -455,7 +455,7 @@ object TestUtils extends Logging { | |||
*/ | |||
def createNewProducer(brokerList: String, | |||
acks: Int = -1, | |||
metadataFetchTimeout: Long = 3000L, | |||
maxBlockMs: Long = Long.MaxValue, | |||
blockOnBufferFull: Boolean = true, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems this variable is not used any more, could we remove it from createNewProducer as well?
LGTM. Merged to trunk. |
@guozhangwang I don't understand the reasoning for setting |
@ijuma Yeah you are right. Setting it to MaxValue is not a good solution actually. I will submit a follow-up patch shortly. |
Changelog: https://github.com/netty/netty/issues?q=is%3Aclosed+milestone%3A4.1.73.Final Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>
I can reproduced this transient failure, it seldom happen;
code is like below:
// rolling bounce brokers
for (i <- 0 until numServers) {
for (server <- servers) {
server.shutdown()
server.awaitShutdown()
server.startup()
Thread.sleep(2000)
}
Brokers keep rolling restart, and producer keep sending messages;
In every loop, it will wait for election of partition leader;
But if the election is slow, more messages will be buffered in RecordAccumulator's BufferPool;
The limit for buffer is set to be 30000;
TimeoutException("Failed to allocate memory within the configured max blocking time") will show up when out of memory;
Since for every restart of the broker, it will sleep for 2000 ms, so this transient failure seldom happen;
But if I reduce the sleeping period, the bigger chance failure happens;
for example if the broker with role of controller suffered a restart, it will take time to select controller first, then select leader, which will lead to more messges blocked in KafkaProducer:RecordAccumulator:BufferPool;
In this fix, I just enlarge the producer's buffer size to be 1MB;
@guozhangwang , Could you give some comments?