-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-4761: Fix producer regression handling small or zero batch size #2545
Conversation
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
d20a5d3
to
54528fa
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it make sense to add the original reproducing test case? https://issues.apache.org/jira/secure/attachment/12852383/KafkaProducerTest.java
Refer to this link for build results (access rights to CI server needed): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks @hachikuji
LGTM too. As @apurvam mentioned It would be great if the unit test in the JIRA can be added too. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. I asked a question to clarify the cause of the original bug and why this is a correct fix. It is more for documentation and my own learning.
@@ -376,7 +390,7 @@ public boolean isClosed() { | |||
} | |||
|
|||
public boolean isFull() { | |||
return isClosed() || this.writeLimit <= estimatedBytesWritten(); | |||
return isClosed() || (this.numRecords > 0 && this.writeLimit <= estimatedBytesWritten()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume that writeLimit
is based of batch size?
So, in cases where writeLimit == 0
, even an empty batch would be considered full, triggering the NPE on send?
Is that correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, correct. The difference from the old code is really in the hasRoomFor
method, which now uses isFull()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add a comment here?
@apurvam @vahidhashemian The integration test I've added effectively does the same thing, right? |
@hachikuji, yes it does. Ignore my previous comment. |
@guozhangwang Thanks for reviewing. I'll merge to trunk and 0.10.2 shortly. |
@hachikuji Yes, sorry I missed the last file you updated. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR, left a few minor comments, looks good otherwise.
@@ -376,7 +390,7 @@ public boolean isClosed() { | |||
} | |||
|
|||
public boolean isFull() { | |||
return isClosed() || this.writeLimit <= estimatedBytesWritten(); | |||
return isClosed() || (this.numRecords > 0 && this.writeLimit <= estimatedBytesWritten()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add a comment here?
@@ -308,6 +308,10 @@ public static void isValidClusterId(String clusterId) { | |||
assertEquals(toList(it1.iterator()), toList(it2.iterator())); | |||
} | |||
|
|||
public static <T> List<T> toList(Iterable<? extends T> iterable) { | |||
return toList(iterable.iterator()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: one of the checkEquals
can maybe use this method now.
@@ -175,6 +175,29 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { | |||
sendAndVerifyTimestamp(producer, TimestampType.CREATE_TIME) | |||
} | |||
|
|||
protected def sendAndVerify(producer: KafkaProducer[Array[Byte], Array[Byte]], numRecords: Int = numRecords) { | |||
val partition = new Integer(0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason why we're not just saying val partition = 0
?
val record = new ProducerRecord(topic, partition, "key".getBytes, "value".getBytes) | ||
(record, producer.send(record)) | ||
} | ||
producer.close(20000L, TimeUnit.MILLISECONDS) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we allow this timeout to be passed as a parameter (with a default) as it may need to be different depending on numRecords
?
TestUtils.createTopic(zkUtils, topic, 1, 2, servers) | ||
|
||
val recordAndFutures = for (i <- 1 to numRecords) yield { | ||
val record = new ProducerRecord(topic, partition, "key".getBytes, "value".getBytes) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it not be a little better to include i
in the key and value from a debugging perspective?
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updates, LGTM.
Author: Jason Gustafson <jason@confluent.io> Reviewers: Apurva Mehta <apurva.1618@gmail.com>, Vahid Hashemian <vahidhashemian@us.ibm.com>, Ismael Juma <ismael@juma.me.uk>, Guozhang Wang <wangguoz@gmail.com> Closes #2545 from hachikuji/KAFKA-4761 (cherry picked from commit 3b36d5c) Signed-off-by: Jason Gustafson <jason@confluent.io>
Thanks for the fast fix! |
@smoczarski Awesome, thanks for the update! |
Author: Jason Gustafson <jason@confluent.io> Reviewers: Apurva Mehta <apurva.1618@gmail.com>, Vahid Hashemian <vahidhashemian@us.ibm.com>, Ismael Juma <ismael@juma.me.uk>, Guozhang Wang <wangguoz@gmail.com> Closes apache#2545 from hachikuji/KAFKA-4761
No description provided.