Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-7385: Fix log cleaner behavior when empty batches are retained #5623

Merged
merged 6 commits into from Sep 9, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -160,20 +160,14 @@ public FilterResult filterTo(TopicPartition partition, RecordFilter filter, Byte
private static FilterResult filterTo(TopicPartition partition, Iterable<MutableRecordBatch> batches,
RecordFilter filter, ByteBuffer destinationBuffer, int maxRecordBatchSize,
BufferSupplier decompressionBufferSupplier) {
long maxTimestamp = RecordBatch.NO_TIMESTAMP;
long maxOffset = -1L;
long shallowOffsetOfMaxTimestamp = -1L;
int messagesRead = 0;
int bytesRead = 0; // bytes processed from `batches`
int messagesRetained = 0;
int bytesRetained = 0;

FilterResult filterResult = new FilterResult(destinationBuffer);
ByteBufferOutputStream bufferOutputStream = new ByteBufferOutputStream(destinationBuffer);

for (MutableRecordBatch batch : batches) {
bytesRead += batch.sizeInBytes();

long maxOffset = -1L;
BatchRetention batchRetention = filter.checkBatchRetention(batch);
filterResult.bytesRead += batch.sizeInBytes();

if (batchRetention == BatchRetention.DELETE)
continue;

Expand All @@ -189,7 +183,7 @@ private static FilterResult filterTo(TopicPartition partition, Iterable<MutableR
try (final CloseableIterator<Record> iterator = batch.streamingIterator(decompressionBufferSupplier)) {
while (iterator.hasNext()) {
Record record = iterator.next();
messagesRead += 1;
filterResult.messagesRead += 1;

if (filter.shouldRetainRecord(batch, record)) {
// Check for log corruption due to KAFKA-4298. If we find it, make sure that we overwrite
Expand All @@ -210,31 +204,20 @@ private static FilterResult filterTo(TopicPartition partition, Iterable<MutableR
if (!retainedRecords.isEmpty()) {
if (writeOriginalBatch) {
batch.writeTo(bufferOutputStream);
messagesRetained += retainedRecords.size();
bytesRetained += batch.sizeInBytes();
if (batch.maxTimestamp() > maxTimestamp) {
maxTimestamp = batch.maxTimestamp();
shallowOffsetOfMaxTimestamp = batch.lastOffset();
}
filterResult.updateRetainedBatchMetadata(batch, retainedRecords.size(), false);
} else {
MemoryRecordsBuilder builder = buildRetainedRecordsInto(batch, retainedRecords, bufferOutputStream);
MemoryRecords records = builder.build();
int filteredBatchSize = records.sizeInBytes();

messagesRetained += retainedRecords.size();
bytesRetained += filteredBatchSize;

if (filteredBatchSize > batch.sizeInBytes() && filteredBatchSize > maxRecordBatchSize)
log.warn("Record batch from {} with last offset {} exceeded max record batch size {} after cleaning " +
"(new size is {}). Consumers with version earlier than 0.10.1.0 may need to " +
"increase their fetch sizes.",
partition, batch.lastOffset(), maxRecordBatchSize, filteredBatchSize);

MemoryRecordsBuilder.RecordsInfo info = builder.info();
if (info.maxTimestamp > maxTimestamp) {
maxTimestamp = info.maxTimestamp;
shallowOffsetOfMaxTimestamp = info.shallowOffsetOfMaxTimestamp;
}
filterResult.updateRetainedBatchMetadata(info.maxTimestamp, info.shallowOffsetOfMaxTimestamp,
maxOffset, retainedRecords.size(), filteredBatchSize);
}
} else if (batchRetention == BatchRetention.RETAIN_EMPTY) {
if (batchMagic < RecordBatch.MAGIC_VALUE_V2)
Expand All @@ -245,18 +228,19 @@ private static FilterResult filterTo(TopicPartition partition, Iterable<MutableR
batch.producerEpoch(), batch.baseSequence(), batch.baseOffset(), batch.lastOffset(),
batch.partitionLeaderEpoch(), batch.timestampType(), batch.maxTimestamp(),
batch.isTransactional(), batch.isControlBatch());
filterResult.updateRetainedBatchMetadata(batch, 0, true);
}

// If we had to allocate a new buffer to fit the filtered output (see KAFKA-5316), return early to
// If we had to allocate a new buffer to fit the filtered buffer (see KAFKA-5316), return early to
// avoid the need for additional allocations.
ByteBuffer outputBuffer = bufferOutputStream.buffer();
if (outputBuffer != destinationBuffer)
return new FilterResult(outputBuffer, messagesRead, bytesRead, messagesRetained, bytesRetained,
maxOffset, maxTimestamp, shallowOffsetOfMaxTimestamp);
if (outputBuffer != destinationBuffer) {
filterResult.outputBuffer = outputBuffer;
return filterResult;
}
}

return new FilterResult(destinationBuffer, messagesRead, bytesRead, messagesRetained, bytesRetained,
maxOffset, maxTimestamp, shallowOffsetOfMaxTimestamp);
return filterResult;
}

private static MemoryRecordsBuilder buildRetainedRecordsInto(RecordBatch originalBatch,
Expand Down Expand Up @@ -369,33 +353,76 @@ public enum BatchRetention {
}

public static class FilterResult {
public final ByteBuffer output;
public final int messagesRead;
public final int bytesRead;
public final int messagesRetained;
public final int bytesRetained;
public final long maxOffset;
public final long maxTimestamp;
public final long shallowOffsetOfMaxTimestamp;

// Note that `bytesRead` should contain only bytes from batches that have been processed,
// i.e. bytes from `messagesRead` and any discarded batches.
public FilterResult(ByteBuffer output,
int messagesRead,
int bytesRead,
int messagesRetained,
int bytesRetained,
long maxOffset,
long maxTimestamp,
long shallowOffsetOfMaxTimestamp) {
this.output = output;
this.messagesRead = messagesRead;
this.bytesRead = bytesRead;
this.messagesRetained = messagesRetained;
this.bytesRetained = bytesRetained;
this.maxOffset = maxOffset;
this.maxTimestamp = maxTimestamp;
this.shallowOffsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp;
private ByteBuffer outputBuffer;
private int messagesRead = 0;
// Note that `bytesRead` should contain only bytes from batches that have been processed, i.e. bytes from
// `messagesRead` and any discarded batches.
private int bytesRead = 0;
private int messagesRetained = 0;
private int bytesRetained = 0;
private long maxOffset = -1L;
private long maxTimestamp = RecordBatch.NO_TIMESTAMP;
private long shallowOffsetOfMaxTimestamp = -1L;

private FilterResult(ByteBuffer outputBuffer) {
this.outputBuffer = outputBuffer;
}

private void updateRetainedBatchMetadata(MutableRecordBatch retainedBatch, int numMessagesInBatch, boolean headerOnly) {
int bytesRetained = headerOnly ? DefaultRecordBatch.RECORD_BATCH_OVERHEAD : retainedBatch.sizeInBytes();
updateRetainedBatchMetadata(retainedBatch.maxTimestamp(), retainedBatch.lastOffset(),
retainedBatch.lastOffset(), numMessagesInBatch, bytesRetained);
}

private void updateRetainedBatchMetadata(long maxTimestamp, long shallowOffsetOfMaxTimestamp, long maxOffset,
int messagesRetained, int bytesRetained) {
validateBatchMetadata(maxTimestamp, shallowOffsetOfMaxTimestamp, maxOffset);
if (maxTimestamp > this.maxTimestamp) {
this.maxTimestamp = maxTimestamp;
this.shallowOffsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp;
}
this.maxOffset = Math.max(maxOffset, this.maxOffset);
this.messagesRetained += messagesRetained;
this.bytesRetained += bytesRetained;
}

private void validateBatchMetadata(long maxTimestamp, long shallowOffsetOfMaxTimestamp, long maxOffset) {
if (maxTimestamp != RecordBatch.NO_TIMESTAMP && shallowOffsetOfMaxTimestamp < 0)
throw new IllegalArgumentException("shallowOffset undefined for maximum timestamp " + maxTimestamp);
if (maxOffset < 0)
throw new IllegalArgumentException("maxOffset undefined");
}

public ByteBuffer outputBuffer() {
return outputBuffer;
}

public int messagesRead() {
return messagesRead;
}

public int bytesRead() {
return bytesRead;
}

public int messagesRetained() {
return messagesRetained;
}

public int bytesRetained() {
return bytesRetained;
}

public long maxOffset() {
return maxOffset;
}

public long maxTimestamp() {
return maxTimestamp;
}

public long shallowOffsetOfMaxTimestamp() {
return shallowOffsetOfMaxTimestamp;
}
}

Expand Down
Expand Up @@ -2113,8 +2113,8 @@ protected boolean shouldRetainRecord(RecordBatch recordBatch, Record record) {
return record.key() != null;
}
}, ByteBuffer.allocate(1024), Integer.MAX_VALUE, BufferSupplier.NO_CACHING);
result.output.flip();
MemoryRecords compactedRecords = MemoryRecords.readableRecords(result.output);
result.outputBuffer().flip();
MemoryRecords compactedRecords = MemoryRecords.readableRecords(result.outputBuffer());

subscriptions.assignFromUser(singleton(tp0));
subscriptions.seek(tp0, 0);
Expand Down