Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-5490: Skip empty record batches in the consumer #3408

Closed

Conversation

ijuma
Copy link
Contributor

@ijuma ijuma commented Jun 22, 2017

The actual fix for KAFKA-5490 is in
#3406.

This is just the consumer change that will allow the cleaner
to use empty record batches without breaking 0.11.0.0
consumers (assuming that KAFKA-5490 does not make the cut).
This is a safe change even if we decide to go with a different option
for KAFKA-5490 and I'd like to include it in RC2.

@ijuma ijuma force-pushed the kafka-5490-consumer-should-skip-empty-batches branch from b1121c4 to 991b51b Compare June 22, 2017 12:10
@ijuma
Copy link
Contributor Author

ijuma commented Jun 22, 2017

@dguy, can you please review this?

@ijuma
Copy link
Contributor Author

ijuma commented Jun 22, 2017

Btw, this is @hachikuji's code, I just split the consumer change from the log cleaner changes.

@asfgit
Copy link

asfgit commented Jun 22, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/5602/
Test PASSed (JDK 8 and Scala 2.12).

@asfgit
Copy link

asfgit commented Jun 22, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/5616/
Test PASSed (JDK 7 and Scala 2.11).

Copy link
Contributor

@dguy dguy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @ijuma i've left a couple of minor comments/questions?

@@ -1061,6 +1061,7 @@ private Record nextFetchedRecord() {
}

records = currentBatch.streamingIterator(decompressionBufferSupplier);
continue;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we use

} else {
 Record record = records.next();
...
}

instead of continue here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good.

@@ -393,6 +395,24 @@ private static byte computeAttributes(CompressionType type, TimestampType timest
return attributes;
}

public static void writeEmptyHeader(ByteBuffer buffer,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is only for testing should we put it in a Test class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#3406 uses this in non-test code. We will merge that soon as well, so it seems like this way will cause less churn.

@@ -1669,6 +1669,35 @@ protected boolean shouldRetain(RecordBatch recordBatch, Record record) {
}

@Test
public void testUpdatePositionOnEmptyBatch() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If writeEmptyHeader is only used by test code and is moved to a test utils class, then i guess we don't need this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As per my other comment, we hope to use it in non-test code soon.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm I think you meant to comment on testEmptyHeader. This test is used to validate that the fetcher change works correctly.

@ijuma
Copy link
Contributor Author

ijuma commented Jun 22, 2017

I updated the PR, @dguy.

Copy link
Contributor

@dguy dguy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @ijuma LGTM

@ijuma
Copy link
Contributor Author

ijuma commented Jun 22, 2017

I will merge to trunk and 0.11.0 once the tests pass.

@asfgit
Copy link

asfgit commented Jun 22, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/5608/
Test FAILed (JDK 8 and Scala 2.12).

@asfgit
Copy link

asfgit commented Jun 22, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/5622/
Test PASSed (JDK 7 and Scala 2.11).

asfgit pushed a commit that referenced this pull request Jun 22, 2017
The actual fix for KAFKA-5490 is in
#3406.

This is just the consumer change that will allow the cleaner
to use empty record batches without breaking 0.11.0.0
consumers (assuming that KAFKA-5490 does not make the cut).
This is a safe change even if we decide to go with a different option
for KAFKA-5490 and I'd like to include it in RC2.

Author: Jason Gustafson <jason@confluent.io>
Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Damian Guy <damian.guy@gmail.com>, Ismael Juma <ismael@juma.me.uk>

Closes #3408 from ijuma/kafka-5490-consumer-should-skip-empty-batches

(cherry picked from commit fc58ac5)
Signed-off-by: Ismael Juma <ismael@juma.me.uk>
@asfgit asfgit closed this in fc58ac5 Jun 22, 2017
@ijuma ijuma deleted the kafka-5490-consumer-should-skip-empty-batches branch September 5, 2017 08:38
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants