Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-5213; Mark a MemoryRecordsBuilder as full as soon as the append stream is closed #3015

Conversation

apurvam
Copy link
Contributor

@apurvam apurvam commented May 10, 2017

No description provided.

@apurvam
Copy link
Contributor Author

apurvam commented May 10, 2017

cc @hachikuji @ijuma

FutureRecordMetadata result0 = batch.tryAppend(now, null, new byte[10], Record.EMPTY_HEADERS, null, now);
assertNotNull(result0);
memoryRecordsBuilder.closeForRecordAppends();
assertFalse(memoryRecordsBuilder.hasRoomFor(now, null, new byte[10]));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd add this same test (but assertTrue) just before closeForRecordAppends to verify that the closeForRecordApends is what causes the change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion, updated.

@@ -664,7 +664,7 @@ public boolean isClosed() {
public boolean isFull() {
// note that the write limit is respected only after the first record is added which ensures we can always
// create non-empty batches (this is used to disable batching when the producer's batch size is set to 0).
return isClosed() || (this.numRecords > 0 && this.writeLimit <= estimatedBytesWritten());
return appendStreamIsClosed || (this.numRecords > 0 && this.writeLimit <= estimatedBytesWritten());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible that isClosed() is true, but appendStreamIsClosed is false?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. MemoryRecordsBuilder.close calls closeForRecordAppends after performing validations.

Copy link
Contributor

@ijuma ijuma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks. @hachikuji, do you want to take a look?

Copy link

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks for the quick fix!

@ijuma
Copy link
Contributor

ijuma commented May 10, 2017

Bonus points for including the test from the start. :)

@asfbot
Copy link

asfbot commented May 10, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3725/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented May 10, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/3729/
Test PASSed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented May 11, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/3735/
Test PASSed (JDK 8 and Scala 2.11).

@asfgit asfgit closed this in 970c00e May 11, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants