From 834873961ea81873e79583a6f2bd23d56ff23572 Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Tue, 20 Dec 2016 12:01:14 -0800 Subject: [PATCH 1/3] MINOR: Support auto-incrementing offsets in MemoryRecordsBuilder --- .../producer/internals/RecordBatch.java | 3 +- .../kafka/common/record/MemoryRecords.java | 7 ++++ .../common/record/MemoryRecordsBuilder.java | 21 ++++++++++++ .../clients/consumer/KafkaConsumerTest.java | 4 +-- .../consumer/internals/FetcherTest.java | 14 ++++---- .../record/ByteBufferLogInputStreamTest.java | 12 +++---- .../record/MemoryRecordsBuilderTest.java | 24 ++++++------- .../common/record/MemoryRecordsTest.java | 34 +++++++++---------- 8 files changed, 72 insertions(+), 47 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java index e9ef441e0e28e..68b27d3fa14a4 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java @@ -45,7 +45,6 @@ public final class RecordBatch { public final ProduceRequestResult produceFuture; public long lastAppendTime; private final List thunks; - private long offsetCounter = 0L; private boolean retry; private final MemoryRecordsBuilder recordsBuilder; @@ -69,7 +68,7 @@ public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, if (!recordsBuilder.hasRoomFor(key, value)) { return null; } else { - long checksum = this.recordsBuilder.append(offsetCounter++, timestamp, key, value); + long checksum = this.recordsBuilder.append(timestamp, key, value); this.maxRecordSize = Math.max(this.maxRecordSize, Record.recordSize(key, value)); this.lastAppendTime = now; FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount, diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java index 1485486d6e482..43c45d369e69c 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java @@ -303,6 +303,13 @@ public static MemoryRecordsBuilder builder(ByteBuffer buffer, return builder(buffer, magic, compressionType, timestampType, 0L); } + public static MemoryRecordsBuilder builder(ByteBuffer buffer, + CompressionType compressionType, + TimestampType timestampType, + long baseOffset) { + return builder(buffer, Record.CURRENT_MAGIC_VALUE, compressionType, timestampType, baseOffset, System.currentTimeMillis()); + } + public static MemoryRecordsBuilder builder(ByteBuffer buffer, byte magic, CompressionType compressionType, diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java index b90a9e673aab5..f500a03f0420e 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java @@ -233,6 +233,18 @@ public long append(long offset, long timestamp, byte[] key, byte[] value) { } } + /** + * Append a new record at the next consecutive offset. If no records have been appended yet, use the base + * offset of this builder. + * @param timestamp The record timestamp + * @param key The record key + * @param value The record value + * @return crc of the record + */ + public long append(long timestamp, byte[] key, byte[] value) { + return append(lastOffset < 0 ? baseOffset : lastOffset + 1, timestamp, key, value); + } + /** * Add the record, converting to the desired magic value if necessary. * @param offset The offset of the record @@ -300,6 +312,15 @@ public void append(long offset, Record record) { appendUnchecked(offset, record); } + /** + * Append the record at the next consecutive offset. If no records have been appended yet, use the base + * offset of this builder. + * @param record The record to add + */ + public void append(Record record) { + append(lastOffset < 0 ? baseOffset : lastOffset + 1, record); + } + private long toInnerOffset(long offset) { // use relative offsets for compressed messages with magic v1 if (magic > 0 && compressionType != CompressionType.NONE) diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index a4386f86e01ed..8240d057ec8c4 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -1325,9 +1325,9 @@ private FetchResponse fetchResponse(Map fetches) { TopicPartition partition = fetchEntry.getKey(); long fetchOffset = fetchEntry.getValue().offset; int fetchCount = fetchEntry.getValue().count; - MemoryRecordsBuilder records = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME); + MemoryRecordsBuilder records = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME, fetchOffset); for (int i = 0; i < fetchCount; i++) - records.append(fetchOffset + i, 0L, ("key-" + i).getBytes(), ("value-" + i).getBytes()); + records.append(0L, ("key-" + i).getBytes(), ("value-" + i).getBytes()); tpResponses.put(partition, new FetchResponse.PartitionData(Errors.NONE.code(), 0, records.build())); } return new FetchResponse(tpResponses, 0); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index 15075cb31dacf..34071218a80fc 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -106,15 +106,15 @@ public void setup() throws Exception { metadata.update(cluster, time.milliseconds()); client.setNode(node); - MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME); - builder.append(1L, 0L, "key".getBytes(), "value-1".getBytes()); - builder.append(2L, 0L, "key".getBytes(), "value-2".getBytes()); - builder.append(3L, 0L, "key".getBytes(), "value-3".getBytes()); + MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME, 1L); + builder.append(0L, "key".getBytes(), "value-1".getBytes()); + builder.append(0L, "key".getBytes(), "value-2".getBytes()); + builder.append(0L, "key".getBytes(), "value-3".getBytes()); records = builder.build(); - builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME); - builder.append(4L, 0L, "key".getBytes(), "value-4".getBytes()); - builder.append(5L, 0L, "key".getBytes(), "value-5".getBytes()); + builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME, 4L); + builder.append(0L, "key".getBytes(), "value-4".getBytes()); + builder.append(0L, "key".getBytes(), "value-5".getBytes()); nextRecords = builder.build(); } diff --git a/clients/src/test/java/org/apache/kafka/common/record/ByteBufferLogInputStreamTest.java b/clients/src/test/java/org/apache/kafka/common/record/ByteBufferLogInputStreamTest.java index c8621cd56e754..0fad9a4e008dc 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/ByteBufferLogInputStreamTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/ByteBufferLogInputStreamTest.java @@ -31,8 +31,8 @@ public class ByteBufferLogInputStreamTest { public void iteratorIgnoresIncompleteEntries() { ByteBuffer buffer = ByteBuffer.allocate(2048); MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, Record.MAGIC_VALUE_V1, CompressionType.NONE, TimestampType.CREATE_TIME, 0L); - builder.append(0L, 15L, "a".getBytes(), "1".getBytes()); - builder.append(1L, 20L, "b".getBytes(), "2".getBytes()); + builder.append(15L, "a".getBytes(), "1".getBytes()); + builder.append(20L, "b".getBytes(), "2".getBytes()); ByteBuffer recordsBuffer = builder.build().buffer(); recordsBuffer.limit(recordsBuffer.limit() - 5); @@ -49,7 +49,7 @@ public void iteratorIgnoresIncompleteEntries() { public void testSetCreateTimeV1() { ByteBuffer buffer = ByteBuffer.allocate(2048); MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, Record.MAGIC_VALUE_V1, CompressionType.NONE, TimestampType.CREATE_TIME, 0L); - builder.append(0L, 15L, "a".getBytes(), "1".getBytes()); + builder.append(15L, "a".getBytes(), "1".getBytes()); Iterator iterator = builder.build().shallowEntries().iterator(); assertTrue(iterator.hasNext()); @@ -66,7 +66,7 @@ public void testSetCreateTimeV1() { public void testSetCreateTimeNotAllowedV0() { ByteBuffer buffer = ByteBuffer.allocate(2048); MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, Record.MAGIC_VALUE_V0, CompressionType.NONE, TimestampType.CREATE_TIME, 0L); - builder.append(0L, 15L, "a".getBytes(), "1".getBytes()); + builder.append(15L, "a".getBytes(), "1".getBytes()); Iterator iterator = builder.build().shallowEntries().iterator(); assertTrue(iterator.hasNext()); @@ -80,7 +80,7 @@ public void testSetCreateTimeNotAllowedV0() { public void testSetLogAppendTimeV1() { ByteBuffer buffer = ByteBuffer.allocate(2048); MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, Record.MAGIC_VALUE_V1, CompressionType.NONE, TimestampType.CREATE_TIME, 0L); - builder.append(0L, 15L, "a".getBytes(), "1".getBytes()); + builder.append(15L, "a".getBytes(), "1".getBytes()); Iterator iterator = builder.build().shallowEntries().iterator(); assertTrue(iterator.hasNext()); @@ -97,7 +97,7 @@ public void testSetLogAppendTimeV1() { public void testSetLogAppendTimeNotAllowedV0() { ByteBuffer buffer = ByteBuffer.allocate(2048); MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, Record.MAGIC_VALUE_V0, CompressionType.NONE, TimestampType.CREATE_TIME, 0L); - builder.append(0L, 15L, "a".getBytes(), "1".getBytes()); + builder.append(15L, "a".getBytes(), "1".getBytes()); Iterator iterator = builder.build().shallowEntries().iterator(); assertTrue(iterator.hasNext()); diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java index a52976b7485de..c9dc97fb08129 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java @@ -55,10 +55,9 @@ public void testCompressionRateV0() { TimestampType.CREATE_TIME, 0L, 0L, buffer.capacity()); int uncompressedSize = 0; - long offset = 0L; for (Record record : records) { uncompressedSize += record.sizeInBytes() + Records.LOG_OVERHEAD; - builder.append(offset++, record); + builder.append(record); } MemoryRecords built = builder.build(); @@ -86,10 +85,9 @@ public void testCompressionRateV1() { TimestampType.CREATE_TIME, 0L, 0L, buffer.capacity()); int uncompressedSize = 0; - long offset = 0L; for (Record record : records) { uncompressedSize += record.sizeInBytes() + Records.LOG_OVERHEAD; - builder.append(offset++, record); + builder.append(record); } MemoryRecords built = builder.build(); @@ -110,9 +108,9 @@ public void buildUsingLogAppendTime() { long logAppendTime = System.currentTimeMillis(); MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, Record.MAGIC_VALUE_V1, compressionType, TimestampType.LOG_APPEND_TIME, 0L, logAppendTime, buffer.capacity()); - builder.append(0L, 0L, "a".getBytes(), "1".getBytes()); - builder.append(1L, 0L, "b".getBytes(), "2".getBytes()); - builder.append(2L, 0L, "c".getBytes(), "3".getBytes()); + builder.append(0L, "a".getBytes(), "1".getBytes()); + builder.append(0L, "b".getBytes(), "2".getBytes()); + builder.append(0L, "c".getBytes(), "3".getBytes()); MemoryRecords records = builder.build(); MemoryRecordsBuilder.RecordsInfo info = builder.info(); @@ -159,9 +157,9 @@ public void buildUsingCreateTime() { long logAppendTime = System.currentTimeMillis(); MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, Record.MAGIC_VALUE_V1, compressionType, TimestampType.CREATE_TIME, 0L, logAppendTime, buffer.capacity()); - builder.append(0L, 0L, "a".getBytes(), "1".getBytes()); - builder.append(1L, 2L, "b".getBytes(), "2".getBytes()); - builder.append(2L, 1L, "c".getBytes(), "3".getBytes()); + builder.append(0L, "a".getBytes(), "1".getBytes()); + builder.append(2L, "b".getBytes(), "2".getBytes()); + builder.append(1L, "c".getBytes(), "3".getBytes()); MemoryRecords records = builder.build(); MemoryRecordsBuilder.RecordsInfo info = builder.info(); @@ -188,11 +186,11 @@ public void writePastLimit() { long logAppendTime = System.currentTimeMillis(); MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, Record.MAGIC_VALUE_V1, compressionType, TimestampType.CREATE_TIME, 0L, logAppendTime, buffer.capacity()); - builder.append(0L, 0L, "a".getBytes(), "1".getBytes()); - builder.append(1L, 1L, "b".getBytes(), "2".getBytes()); + builder.append(0L, "a".getBytes(), "1".getBytes()); + builder.append(1L, "b".getBytes(), "2".getBytes()); assertFalse(builder.hasRoomFor("c".getBytes(), "3".getBytes())); - builder.append(2L, 2L, "c".getBytes(), "3".getBytes()); + builder.append(2L, "c".getBytes(), "3".getBytes()); MemoryRecords records = builder.build(); MemoryRecordsBuilder.RecordsInfo info = builder.info(); diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java index f2741ee344f04..9c8ca7f290e8c 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java @@ -60,8 +60,8 @@ public void testIterator() { for (int i = 0; i < list.size(); i++) { Record r = list.get(i); - builder1.append(firstOffset + i, r); - builder2.append(firstOffset + i, i + 1, toNullableArray(r.key()), toNullableArray(r.value())); + builder1.append(r); + builder2.append(i + 1, toNullableArray(r.key()), toNullableArray(r.value())); } MemoryRecords recs1 = builder1.build(); @@ -85,7 +85,7 @@ public void testIterator() { @Test public void testHasRoomForMethod() { MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), magic, compression, TimestampType.CREATE_TIME); - builder.append(0, Record.create(magic, 0L, "a".getBytes(), "1".getBytes())); + builder.append(Record.create(magic, 0L, "a".getBytes(), "1".getBytes())); assertTrue(builder.hasRoomFor("b".getBytes(), "2".getBytes())); builder.close(); @@ -95,23 +95,23 @@ public void testHasRoomForMethod() { @Test public void testFilterTo() { ByteBuffer buffer = ByteBuffer.allocate(2048); - MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME); - builder.append(0L, 10L, null, "a".getBytes()); + MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 0L); + builder.append(10L, null, "a".getBytes()); builder.close(); builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 1L); - builder.append(1L, 11L, "1".getBytes(), "b".getBytes()); - builder.append(2L, 12L, null, "c".getBytes()); + builder.append(11L, "1".getBytes(), "b".getBytes()); + builder.append(12L, null, "c".getBytes()); builder.close(); builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 3L); - builder.append(3L, 13L, null, "d".getBytes()); - builder.append(4L, 20L, "4".getBytes(), "e".getBytes()); - builder.append(5L, 15L, "5".getBytes(), "f".getBytes()); + builder.append(13L, null, "d".getBytes()); + builder.append(20L, "4".getBytes(), "e".getBytes()); + builder.append(15L, "5".getBytes(), "f".getBytes()); builder.close(); builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 6L); - builder.append(6L, 16L, "6".getBytes(), "g".getBytes()); + builder.append(16L, "6".getBytes(), "g".getBytes()); builder.close(); buffer.flip(); @@ -175,18 +175,18 @@ public void testFilterToPreservesLogAppendTime() { ByteBuffer buffer = ByteBuffer.allocate(2048); MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.LOG_APPEND_TIME, 0L, logAppendTime); - builder.append(0L, 10L, null, "a".getBytes()); + builder.append(10L, null, "a".getBytes()); builder.close(); builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.LOG_APPEND_TIME, 1L, logAppendTime); - builder.append(1L, 11L, "1".getBytes(), "b".getBytes()); - builder.append(2L, 12L, null, "c".getBytes()); + builder.append(11L, "1".getBytes(), "b".getBytes()); + builder.append(12L, null, "c".getBytes()); builder.close(); builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.LOG_APPEND_TIME, 3L, logAppendTime); - builder.append(3L, 13L, null, "d".getBytes()); - builder.append(4L, 14L, "4".getBytes(), "e".getBytes()); - builder.append(5L, 15L, "5".getBytes(), "f".getBytes()); + builder.append(13L, null, "d".getBytes()); + builder.append(14L, "4".getBytes(), "e".getBytes()); + builder.append(15L, "5".getBytes(), "f".getBytes()); builder.close(); buffer.flip(); From b3b6e44a6004381a17b4ca0adbc71e9294902bd4 Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Tue, 20 Dec 2016 14:43:47 -0800 Subject: [PATCH 2/3] Use appendWithOffset when adding a record with an offset --- .../common/record/MemoryRecordsBuilder.java | 28 +++++++++++++------ .../consumer/internals/FetcherTest.java | 8 +++--- .../record/MemoryRecordsBuilderTest.java | 15 +++++----- .../main/scala/kafka/log/LogValidator.scala | 2 +- 4 files changed, 31 insertions(+), 22 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java index f500a03f0420e..3775ec6cbb786 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java @@ -164,6 +164,8 @@ public MemoryRecords build() { public RecordsInfo info() { if (timestampType == TimestampType.LOG_APPEND_TIME) return new RecordsInfo(logAppendTime, lastOffset); + else if (maxTimestamp == Record.NO_TIMESTAMP) + return new RecordsInfo(Record.NO_TIMESTAMP, lastOffset); else return new RecordsInfo(maxTimestamp, compressionType == CompressionType.NONE ? offsetOfMaxTimestamp : lastOffset); } @@ -208,14 +210,14 @@ private void writerCompressedWrapperHeader() { } /** - * Append a new record and offset to the buffer + * Append a new record at the given offset. * @param offset The absolute offset of the record in the log buffer * @param timestamp The record timestamp * @param key The record key * @param value The record value * @return crc of the record */ - public long append(long offset, long timestamp, byte[] key, byte[] value) { + public long appendWithOffset(long offset, long timestamp, byte[] key, byte[] value) { try { if (lastOffset > 0 && offset <= lastOffset) throw new IllegalArgumentException(String.format("Illegal offset %s following previous offset %s (Offsets must increase monotonically).", offset, lastOffset)); @@ -242,17 +244,25 @@ public long append(long offset, long timestamp, byte[] key, byte[] value) { * @return crc of the record */ public long append(long timestamp, byte[] key, byte[] value) { - return append(lastOffset < 0 ? baseOffset : lastOffset + 1, timestamp, key, value); + return appendWithOffset(lastOffset < 0 ? baseOffset : lastOffset + 1, timestamp, key, value); } /** - * Add the record, converting to the desired magic value if necessary. + * Add the record at the next consecutive offset, converting to the desired magic value if necessary. + * @param record The record to add + */ + public void convertAndAppend(Record record) { + convertAndAppendWithOffset(lastOffset < 0 ? baseOffset : lastOffset + 1, record); + } + + /** + * Add the record at the given offset, converting to the desired magic value if necessary. * @param offset The offset of the record * @param record The record to add */ - public void convertAndAppend(long offset, Record record) { + public void convertAndAppendWithOffset(long offset, Record record) { if (magic == record.magic()) { - append(offset, record); + appendWithOffset(offset, record); return; } @@ -295,7 +305,7 @@ public void appendUnchecked(long offset, Record record) { * @param entry The entry to append */ public void append(LogEntry entry) { - append(entry.offset(), entry.record()); + appendWithOffset(entry.offset(), entry.record()); } /** @@ -304,7 +314,7 @@ public void append(LogEntry entry) { * @param offset The offset of the record * @param record The record to add */ - public void append(long offset, Record record) { + public void appendWithOffset(long offset, Record record) { if (record.magic() != magic) throw new IllegalArgumentException("Inner log entries must have matching magic values as the wrapper"); if (lastOffset > 0 && offset <= lastOffset) @@ -318,7 +328,7 @@ public void append(long offset, Record record) { * @param record The record to add */ public void append(Record record) { - append(lastOffset < 0 ? baseOffset : lastOffset + 1, record); + appendWithOffset(lastOffset < 0 ? baseOffset : lastOffset + 1, record); } private long toInnerOffset(long offset) { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index 34071218a80fc..272a5ee406bd2 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -293,9 +293,9 @@ public void testFetchNonContinuousRecords() { // this test verifies the fetcher updates the current fetched/consumed positions correctly for this case MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME); - builder.append(15L, 0L, "key".getBytes(), "value-1".getBytes()); - builder.append(20L, 0L, "key".getBytes(), "value-2".getBytes()); - builder.append(30L, 0L, "key".getBytes(), "value-3".getBytes()); + builder.appendWithOffset(15L, 0L, "key".getBytes(), "value-1".getBytes()); + builder.appendWithOffset(20L, 0L, "key".getBytes(), "value-2".getBytes()); + builder.appendWithOffset(30L, 0L, "key".getBytes(), "value-3".getBytes()); MemoryRecords records = builder.build(); List> consumerRecords; @@ -618,7 +618,7 @@ public void testQuotaMetrics() throws Exception { if (i > 1) { MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME); for (int v = 0; v < 3; v++) { - builder.append((long) i * 3 + v, Record.NO_TIMESTAMP, "key".getBytes(), String.format("value-%d", v).getBytes()); + builder.appendWithOffset((long) i * 3 + v, Record.NO_TIMESTAMP, "key".getBytes(), String.format("value-%d", v).getBytes()); } this.records = builder.build(); } diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java index c9dc97fb08129..bea6730a74aae 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java @@ -133,14 +133,13 @@ public void convertUsingLogAppendTime() { MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, Record.MAGIC_VALUE_V1, compressionType, TimestampType.LOG_APPEND_TIME, 0L, logAppendTime, buffer.capacity()); - builder.convertAndAppend(0L, Record.create(Record.MAGIC_VALUE_V0, 0L, "a".getBytes(), "1".getBytes())); - builder.convertAndAppend(1L, Record.create(Record.MAGIC_VALUE_V0, 0L, "b".getBytes(), "2".getBytes())); - builder.convertAndAppend(2L, Record.create(Record.MAGIC_VALUE_V0, 0L, "c".getBytes(), "3".getBytes())); + builder.convertAndAppend(Record.create(Record.MAGIC_VALUE_V0, 0L, "a".getBytes(), "1".getBytes())); + builder.convertAndAppend(Record.create(Record.MAGIC_VALUE_V0, 0L, "b".getBytes(), "2".getBytes())); + builder.convertAndAppend(Record.create(Record.MAGIC_VALUE_V0, 0L, "c".getBytes(), "3".getBytes())); MemoryRecords records = builder.build(); MemoryRecordsBuilder.RecordsInfo info = builder.info(); assertEquals(logAppendTime, info.maxTimestamp); - assertEquals(2L, info.shallowOffsetOfMaxTimestamp); for (Record record : records.records()) { @@ -213,14 +212,14 @@ public void convertUsingCreateTime() { MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, Record.MAGIC_VALUE_V1, compressionType, TimestampType.CREATE_TIME, 0L, logAppendTime, buffer.capacity()); - builder.convertAndAppend(0L, Record.create(Record.MAGIC_VALUE_V0, 0L, "a".getBytes(), "1".getBytes())); - builder.convertAndAppend(0L, Record.create(Record.MAGIC_VALUE_V0, 0L, "b".getBytes(), "2".getBytes())); - builder.convertAndAppend(0L, Record.create(Record.MAGIC_VALUE_V0, 0L, "c".getBytes(), "3".getBytes())); + builder.convertAndAppend(Record.create(Record.MAGIC_VALUE_V0, 0L, "a".getBytes(), "1".getBytes())); + builder.convertAndAppend(Record.create(Record.MAGIC_VALUE_V0, 0L, "b".getBytes(), "2".getBytes())); + builder.convertAndAppend(Record.create(Record.MAGIC_VALUE_V0, 0L, "c".getBytes(), "3".getBytes())); MemoryRecords records = builder.build(); MemoryRecordsBuilder.RecordsInfo info = builder.info(); assertEquals(Record.NO_TIMESTAMP, info.maxTimestamp); - assertEquals(0L, info.shallowOffsetOfMaxTimestamp); + assertEquals(2L, info.shallowOffsetOfMaxTimestamp); for (Record record : records.records()) { assertEquals(TimestampType.CREATE_TIME, record.timestampType()); diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala index 1713942407522..d99c2ad26f6d3 100644 --- a/core/src/main/scala/kafka/log/LogValidator.scala +++ b/core/src/main/scala/kafka/log/LogValidator.scala @@ -156,7 +156,7 @@ private[kafka] object LogValidator { val record = logEntry.record validateKey(record, compactedTopic) validateTimestamp(record, now, timestampType, messageTimestampDiffMaxMs) - builder.convertAndAppend(offsetCounter.getAndIncrement(), record) + builder.convertAndAppendWithOffset(offsetCounter.getAndIncrement(), record) } builder.close() From 73952fb49411a14dfc35c1e554c80c91f78f23d1 Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Tue, 20 Dec 2016 15:14:48 -0800 Subject: [PATCH 3/3] Fix offset check for the case when last offset == 1 --- .../kafka/common/record/MemoryRecords.java | 2 +- .../common/record/MemoryRecordsBuilder.java | 15 +++-------- .../record/MemoryRecordsBuilderTest.java | 27 +++++++++++++++++++ 3 files changed, 31 insertions(+), 13 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java index 43c45d369e69c..65d91c68b4c58 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java @@ -397,7 +397,7 @@ private static MemoryRecordsBuilder builderWithEntries(ByteBuffer buffer, MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, compressionType, timestampType, firstOffset, logAppendTime); for (LogEntry entry : entries) - builder.append(entry); + builder.appendWithOffset(entry.offset(), entry.record()); return builder; } diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java index 3775ec6cbb786..d60861bb7f7fb 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java @@ -219,7 +219,7 @@ private void writerCompressedWrapperHeader() { */ public long appendWithOffset(long offset, long timestamp, byte[] key, byte[] value) { try { - if (lastOffset > 0 && offset <= lastOffset) + if (lastOffset >= 0 && offset <= lastOffset) throw new IllegalArgumentException(String.format("Illegal offset %s following previous offset %s (Offsets must increase monotonically).", offset, lastOffset)); int size = Record.recordSize(magic, key, value); @@ -266,7 +266,7 @@ public void convertAndAppendWithOffset(long offset, Record record) { return; } - if (lastOffset > 0 && offset <= lastOffset) + if (lastOffset >= 0 && offset <= lastOffset) throw new IllegalArgumentException(String.format("Illegal offset %s following previous offset %s (Offsets must increase monotonically).", offset, lastOffset)); try { @@ -299,15 +299,6 @@ public void appendUnchecked(long offset, Record record) { } } - /** - * Append the given log entry. The entry's record must have a magic which matches the magic use to - * construct this builder and the offset must be greater than the last appended entry. - * @param entry The entry to append - */ - public void append(LogEntry entry) { - appendWithOffset(entry.offset(), entry.record()); - } - /** * Add a record with a given offset. The record must have a magic which matches the magic use to * construct this builder and the offset must be greater than the last appended entry. @@ -317,7 +308,7 @@ public void append(LogEntry entry) { public void appendWithOffset(long offset, Record record) { if (record.magic() != magic) throw new IllegalArgumentException("Inner log entries must have matching magic values as the wrapper"); - if (lastOffset > 0 && offset <= lastOffset) + if (lastOffset >= 0 && offset <= lastOffset) throw new IllegalArgumentException(String.format("Illegal offset %s following previous offset %s (Offsets must increase monotonically).", offset, lastOffset)); appendUnchecked(offset, record); } diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java index bea6730a74aae..034faf631dfd1 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java @@ -203,6 +203,33 @@ public void writePastLimit() { } } + @Test(expected = IllegalArgumentException.class) + public void testAppendAtInvalidOffset() { + ByteBuffer buffer = ByteBuffer.allocate(1024); + buffer.position(bufferOffset); + + long logAppendTime = System.currentTimeMillis(); + MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, Record.MAGIC_VALUE_V1, compressionType, + TimestampType.CREATE_TIME, 0L, logAppendTime, buffer.capacity()); + + builder.appendWithOffset(0L, System.currentTimeMillis(), "a".getBytes(), null); + + // offsets must increase monotonically + builder.appendWithOffset(0L, System.currentTimeMillis(), "b".getBytes(), null); + } + + @Test(expected = IllegalArgumentException.class) + public void testAppendWithInvalidMagic() { + ByteBuffer buffer = ByteBuffer.allocate(1024); + buffer.position(bufferOffset); + + long logAppendTime = System.currentTimeMillis(); + MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, Record.MAGIC_VALUE_V1, compressionType, + TimestampType.CREATE_TIME, 0L, logAppendTime, buffer.capacity()); + + builder.append(Record.create(Record.MAGIC_VALUE_V0, 0L, "a".getBytes(), null)); + } + @Test public void convertUsingCreateTime() { ByteBuffer buffer = ByteBuffer.allocate(1024);