Skip to content

Commit

Permalink
KAFKA-8570; Grow buffer to hold down converted records if it was insu…
Browse files Browse the repository at this point in the history
…fficiently sized (#6974)

When the log contains out of order message formats (for example v2 message followed by v1 message) and consists of compressed batches typically greater than 1kB in size, it is possible for down-conversion to fail. With compressed batches, we estimate the size of down-converted batches using:

```
    private static int estimateCompressedSizeInBytes(int size, CompressionType compressionType) {
        return compressionType == CompressionType.NONE ? size : Math.min(Math.max(size / 2, 1024), 1 << 16);
    }
```

This almost always underestimates size of down-converted records if the batch is between 1kB-64kB in size. In general, this means we may under estimate the total size required for compressed batches.

Because of an implicit assumption in the code that messages with a lower message format appear before any with a higher message format, we do not grow the buffer we copy the down converted records into when we see a message <= the target message format. This assumption becomes incorrect when the log contains out of order message formats, for example because of leaders flapping while upgrading the message format.

Reviewers: Jason Gustafson <jason@confluent.io>
  • Loading branch information
dhruvilshah3 authored and hachikuji committed Jun 21, 2019
1 parent 96d8bb4 commit e6e6a58
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 0 deletions.
Expand Up @@ -18,6 +18,7 @@

import org.apache.kafka.common.errors.UnsupportedCompressionTypeException;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;

import java.nio.ByteBuffer;
import java.util.ArrayList;
Expand Down Expand Up @@ -80,9 +81,11 @@ protected static ConvertedRecords<MemoryRecords> downConvert(Iterable<? extends
ByteBuffer buffer = ByteBuffer.allocate(totalSizeEstimate);
long temporaryMemoryBytes = 0;
int numRecordsConverted = 0;

for (RecordBatchAndRecords recordBatchAndRecords : recordBatchAndRecordsList) {
temporaryMemoryBytes += recordBatchAndRecords.batch.sizeInBytes();
if (recordBatchAndRecords.batch.magic() <= toMagic) {
buffer = Utils.ensureCapacity(buffer, buffer.position() + recordBatchAndRecords.batch.sizeInBytes());
recordBatchAndRecords.batch.writeTo(buffer);
} else {
MemoryRecordsBuilder builder = convertRecordBatch(toMagic, buffer, recordBatchAndRecords);
Expand Down
Expand Up @@ -34,6 +34,7 @@
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Random;

import static java.util.Arrays.asList;
import static org.apache.kafka.common.utils.Utils.utf8;
Expand Down Expand Up @@ -382,6 +383,39 @@ public void testFormatConversionWithPartialMessage() throws IOException {
assertTrue("No messages should be returned", !it.hasNext());
}

@Test
public void testDownconversionAfterMessageFormatDowngrade() throws IOException {
// random bytes
Random random = new Random();
byte[] bytes = new byte[3000];
random.nextBytes(bytes);

// records
CompressionType compressionType = CompressionType.GZIP;
List<Long> offsets = asList(0L, 1L);
List<Byte> magic = asList(RecordBatch.MAGIC_VALUE_V2, RecordBatch.MAGIC_VALUE_V1); // downgrade message format from v2 to v1
List<SimpleRecord> records = asList(
new SimpleRecord(1L, "k1".getBytes(), bytes),
new SimpleRecord(2L, "k2".getBytes(), bytes));
byte toMagic = 1;

// create MemoryRecords
ByteBuffer buffer = ByteBuffer.allocate(8000);
for (int i = 0; i < records.size(); i++) {
MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic.get(i), compressionType, TimestampType.CREATE_TIME, 0L);
builder.appendWithOffset(offsets.get(i), records.get(i));
builder.close();
}
buffer.flip();

// create FileRecords, down-convert and verify
try (FileRecords fileRecords = FileRecords.open(tempFile())) {
fileRecords.append(MemoryRecords.readableRecords(buffer));
fileRecords.flush();
downConvertAndVerifyRecords(records, offsets, fileRecords, compressionType, toMagic, 0L, time);
}
}

@Test
public void testConversion() throws IOException {
doTestConversion(CompressionType.NONE, RecordBatch.MAGIC_VALUE_V0);
Expand Down

0 comments on commit e6e6a58

Please sign in to comment.