New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-7385: Fix log cleaner behavior when empty batches are retained #5623
Conversation
6045039
to
0e0684f
Compare
0e0684f
to
2f9fcaf
Compare
@@ -371,31 +388,24 @@ public int hashCode() { | |||
public static class FilterResult { | |||
public final ByteBuffer output; | |||
public final int messagesRead; | |||
public final int bytesRead; | |||
public final int totalBytesRead; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why was this only this one renamed?
|
||
for (MutableRecordBatch batch : batches) { | ||
bytesRead += batch.sizeInBytes(); | ||
long maxOffset = -1L; | ||
int messagesRead = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there are no messages retained, then don't we lose this value?
@dhruvilshah3 can you please add a description to the PR explaining the fix? |
private static FilterResult filterTo(TopicPartition partition, Iterable<MutableRecordBatch> batches, | ||
RecordFilter filter, ByteBuffer destinationBuffer, int maxRecordBatchSize, | ||
BufferSupplier decompressionBufferSupplier) { | ||
private static class FilteredBatchesMetadata { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please explain why we added this new class and only have a subset of fields in it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class is tracking state for all filtered batches. Earlier, all branches in filterTo
were cluttered with logic to track state per batch and aggregating that over all the batches we have seen so far. With this class, we now only track per batch state and FilteredBatchesMetadata
holds the aggregate. Cleaner separation and avoiding code repetition, mostly which also makes it easier to reason about which fields have / have not been initialized in a particular branch.
Two fields have been left out from here: messagesRead
and bytesRead
. Both these fields are aggregated for all the data we have seen so far, regardless of whether we end up filtering it or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the explanation. Maybe worth documenting the intent (briefly).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it might be nicer to make the fields in FilterResult
mutable and move the functionality there. It's a little annoying to have an additional internal class just for accumulating a subset of the filter stats.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the patch, left a few small comments.
@@ -245,18 +263,17 @@ private static FilterResult filterTo(TopicPartition partition, Iterable<MutableR | |||
batch.producerEpoch(), batch.baseSequence(), batch.baseOffset(), batch.lastOffset(), | |||
batch.partitionLeaderEpoch(), batch.timestampType(), batch.maxTimestamp(), | |||
batch.isTransactional(), batch.isControlBatch()); | |||
filteredBatchesMetadata.addRetainedBatchMetadata(batch, retainedRecords.size(), true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't retainedRecords.size()
just 0 down this path?
private static FilterResult filterTo(TopicPartition partition, Iterable<MutableRecordBatch> batches, | ||
RecordFilter filter, ByteBuffer destinationBuffer, int maxRecordBatchSize, | ||
BufferSupplier decompressionBufferSupplier) { | ||
private static class FilteredBatchesMetadata { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it might be nicer to make the fields in FilterResult
mutable and move the functionality there. It's a little annoying to have an additional internal class just for accumulating a subset of the filter stats.
clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
Show resolved
Hide resolved
ByteBufferOutputStream bufferOutputStream = new ByteBufferOutputStream(destinationBuffer); | ||
int totalBytesRead = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since FilterResult
is mutable, could we just increment the bytesRead
and messagesRead
fields directly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks for the patch.
…ined (#5623) With idempotent/transactional producers, we may leave empty batches in the log during log compaction. When filtering the data, we keep track of state like `maxOffset` and `maxTimestamp` of filtered data. This patch ensures we maintain this state correctly for the case when only empty batches are left in `MemoryRecords#filterTo`. Without this patch, we did not initialize `maxOffset` in this edge case which led us to append data to the log with `maxOffset` = -1L, causing the append to fail and log cleaner to crash. Reviewers: Jason Gustafson <jason@confluent.io>
…ined (apache#5623) With idempotent/transactional producers, we may leave empty batches in the log during log compaction. When filtering the data, we keep track of state like `maxOffset` and `maxTimestamp` of filtered data. This patch ensures we maintain this state correctly for the case when only empty batches are left in `MemoryRecords#filterTo`. Without this patch, we did not initialize `maxOffset` in this edge case which led us to append data to the log with `maxOffset` = -1L, causing the append to fail and log cleaner to crash. Reviewers: Jason Gustafson <jason@confluent.io>
With idempotent producers, we may leave empty batches in the log during log compaction. When filtering the data, we keep track of state like
maxOffset
andmaxTimestamp
of filtered data. This patch ensures we maintain this state correctly for the case when only empty batches are left inMemoryRecords#filterTo
. Without this patch, we did not initializemaxOffset
in this edge case which led us to append data to the log withmaxOffset
= -1L, causing the append to fail and log cleaner to crash.