From 0a67684c9d895be1dc7a883e753b5c42c662e819 Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Thu, 1 Jun 2017 17:52:52 -0700 Subject: [PATCH 1/5] KAFKA-5365 [WIP]: Fix regression in compressed message iteration affecting magic v0 and v1 --- .../record/AbstractLegacyRecordBatch.java | 77 ++++++++++++------- .../common/record/DefaultRecordBatch.java | 4 + .../common/record/FileLogInputStream.java | 5 ++ .../kafka/common/record/RecordBatch.java | 13 ++++ .../record/AbstractLegacyRecordBatchTest.java | 43 +++++++++++ .../main/scala/kafka/log/LogValidator.scala | 6 +- 6 files changed, 116 insertions(+), 32 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java index 9b74d06844e77..83eebb2d660b1 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java @@ -31,6 +31,7 @@ import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.util.ArrayDeque; +import java.util.Iterator; import java.util.NoSuchElementException; import static org.apache.kafka.common.record.Records.LOG_OVERHEAD; @@ -222,13 +223,18 @@ public boolean isControlBatch() { * @return An iterator over the records contained within this batch */ @Override - public CloseableIterator iterator() { - return iterator(BufferSupplier.NO_CACHING); + public Iterator iterator() { + return iterator(BufferSupplier.NO_CACHING, true); } - private CloseableIterator iterator(BufferSupplier bufferSupplier) { + @Override + public Iterator unassignedOffsetsIterator() { + return iterator(BufferSupplier.NO_CACHING, false); + } + + private CloseableIterator iterator(BufferSupplier bufferSupplier, boolean hasAssignedOffsets) { if (isCompressed()) - return new DeepRecordsIterator(this, false, Integer.MAX_VALUE, bufferSupplier); + return new DeepRecordsIterator(this, false, Integer.MAX_VALUE, bufferSupplier, hasAssignedOffsets); return new CloseableIterator() { private boolean hasNext = true; @@ -259,7 +265,7 @@ public void remove() { @Override public CloseableIterator streamingIterator(BufferSupplier bufferSupplier) { // the older message format versions do not support streaming, so we return the normal iterator - return iterator(bufferSupplier); + return iterator(bufferSupplier, true); } static void writeHeader(ByteBuffer buffer, long offset, int size) { @@ -307,14 +313,19 @@ public AbstractLegacyRecordBatch nextBatch() throws IOException { } private static class DeepRecordsIterator extends AbstractIterator implements CloseableIterator { - private final ArrayDeque batches; + private final ArrayDeque innerEntries; private final long absoluteBaseOffset; private final byte wrapperMagic; - private DeepRecordsIterator(AbstractLegacyRecordBatch wrapperEntry, boolean ensureMatchingMagic, - int maxMessageSize, BufferSupplier bufferSupplier) { + private DeepRecordsIterator(AbstractLegacyRecordBatch wrapperEntry, + boolean ensureMatchingMagic, + int maxMessageSize, + BufferSupplier bufferSupplier, + boolean hasOffsetsAssigned) { LegacyRecord wrapperRecord = wrapperEntry.outerRecord(); this.wrapperMagic = wrapperRecord.magic(); + if (wrapperMagic != RecordBatch.MAGIC_VALUE_V0 && wrapperMagic != RecordBatch.MAGIC_VALUE_V1) + throw new InvalidRecordException("Invalid wrapper magic found in legacy deep record iterator " + wrapperMagic); CompressionType compressionType = wrapperRecord.compressionType(); ByteBuffer wrapperValue = wrapperRecord.value(); @@ -325,47 +336,55 @@ private DeepRecordsIterator(AbstractLegacyRecordBatch wrapperEntry, boolean ensu InputStream stream = compressionType.wrapForInput(wrapperValue, wrapperRecord.magic(), bufferSupplier); LogInputStream logStream = new DataLogInputStream(stream, maxMessageSize); - long wrapperRecordOffset = wrapperEntry.lastOffset(); - long wrapperRecordTimestamp = wrapperRecord.timestamp(); - this.batches = new ArrayDeque<>(); + long lastOffsetFromWrapper = wrapperEntry.lastOffset(); + long timestampFromWrapper = wrapperRecord.timestamp(); + this.innerEntries = new ArrayDeque<>(); // If relative offset is used, we need to decompress the entire message first to compute // the absolute offset. For simplicity and because it's a format that is on its way out, we // do the same for message format version 0 try { while (true) { - AbstractLegacyRecordBatch batch = logStream.nextBatch(); - if (batch == null) + AbstractLegacyRecordBatch innerEntry = logStream.nextBatch(); + if (innerEntry == null) break; - LegacyRecord record = batch.outerRecord(); + LegacyRecord record = innerEntry.outerRecord(); byte magic = record.magic(); if (ensureMatchingMagic && magic != wrapperMagic) throw new InvalidRecordException("Compressed message magic " + magic + " does not match wrapper magic " + wrapperMagic); - if (magic > RecordBatch.MAGIC_VALUE_V0) { + if (magic == RecordBatch.MAGIC_VALUE_V1) { LegacyRecord recordWithTimestamp = new LegacyRecord( record.buffer(), - wrapperRecordTimestamp, + timestampFromWrapper, wrapperRecord.timestampType()); - batch = new BasicLegacyRecordBatch(batch.lastOffset(), recordWithTimestamp); + innerEntry = new BasicLegacyRecordBatch(innerEntry.lastOffset(), recordWithTimestamp); } - batches.addLast(batch); - // break early if we reach the last offset in the batch - if (batch.offset() == wrapperRecordOffset) - break; + innerEntries.addLast(innerEntry); } - if (batches.isEmpty()) + if (innerEntries.isEmpty()) throw new InvalidRecordException("Found invalid compressed record set with no inner records"); - if (wrapperMagic > RecordBatch.MAGIC_VALUE_V0) - this.absoluteBaseOffset = wrapperRecordOffset - batches.getLast().lastOffset(); - else + if (!hasOffsetsAssigned) { + // If offsets are not assigned, then we do not use the offset from the wrapper record because + // the value could be set differently depending on the client. For example, older clients + // always used offset 0 in the wrapper record, while post-0.10.1 clients used the last offset + // from the inner record's relative offsets. + this.absoluteBaseOffset = 0; + } else if (wrapperMagic == RecordBatch.MAGIC_VALUE_V1) { + long lastInnerOffset = innerEntries.getLast().offset(); + if (lastOffsetFromWrapper < lastInnerOffset) + throw new InvalidRecordException("Found invalid wrapper offset in compressed v1 message set: " + + lastOffsetFromWrapper); + this.absoluteBaseOffset = lastOffsetFromWrapper - lastInnerOffset; + } else { this.absoluteBaseOffset = -1; + } } catch (IOException e) { throw new KafkaException(e); } finally { @@ -375,14 +394,14 @@ private DeepRecordsIterator(AbstractLegacyRecordBatch wrapperEntry, boolean ensu @Override protected Record makeNext() { - if (batches.isEmpty()) + if (innerEntries.isEmpty()) return allDone(); - AbstractLegacyRecordBatch entry = batches.remove(); + AbstractLegacyRecordBatch entry = innerEntries.remove(); // Convert offset to absolute offset if needed. - if (absoluteBaseOffset >= 0) { - long absoluteOffset = absoluteBaseOffset + entry.lastOffset(); + if (wrapperMagic == RecordBatch.MAGIC_VALUE_V1) { + long absoluteOffset = absoluteBaseOffset + entry.offset(); entry = new BasicLegacyRecordBatch(absoluteOffset, entry.outerRecord()); } diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java index 7a0e53063a0b8..31075477e8590 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java @@ -288,6 +288,10 @@ public Iterator iterator() { } } + @Override + public Iterator unassignedOffsetsIterator() { + return iterator(); + } @Override public CloseableIterator streamingIterator(BufferSupplier bufferSupplier) { diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java b/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java index 75eb1b3e35438..7bc9541b976fd 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java +++ b/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java @@ -154,6 +154,11 @@ public CloseableIterator streamingIterator(BufferSupplier bufferSupplier return loadFullBatch().streamingIterator(bufferSupplier); } + @Override + public Iterator unassignedOffsetsIterator() { + return loadFullBatch().unassignedOffsetsIterator(); + } + @Override public boolean isValid() { return loadFullBatch().isValid(); diff --git a/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java index 65a6a95fbe41f..2094c500e4341 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java @@ -230,6 +230,19 @@ public interface RecordBatch extends Iterable { */ CloseableIterator streamingIterator(BufferSupplier decompressionBufferSupplier); + /** + * Get an iterator over the records assuming that offsets have not yet been assigned. This is used + * by the broker for record validation prior to assigning offsets. + * + * @param decompressionBufferSupplier The supplier of ByteBuffer(s) used for decompression if supported. + * For small record batches, allocating a potentially large buffer (64 KB for LZ4) + * will dominate the cost of decompressing and iterating over the records in the + * batch. As such, a supplier that reuses buffers will have a significant + * performance impact. + * @return The iterator + */ + Iterator unassignedOffsetsIterator(); + /** * Check whether this is a control batch (i.e. whether the control bit is set in the batch attributes). * For magic versions prior to 2, this is always false. diff --git a/clients/src/test/java/org/apache/kafka/common/record/AbstractLegacyRecordBatchTest.java b/clients/src/test/java/org/apache/kafka/common/record/AbstractLegacyRecordBatchTest.java index 0f01f2a664915..ad2d1d20286e9 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/AbstractLegacyRecordBatchTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/AbstractLegacyRecordBatchTest.java @@ -20,6 +20,8 @@ import org.apache.kafka.common.utils.Utils; import org.junit.Test; +import java.util.Arrays; +import java.util.Iterator; import java.util.List; import static org.junit.Assert.assertEquals; @@ -56,6 +58,47 @@ public void testSetLastOffsetCompressed() { assertEquals(offset++, record.offset()); } + @Test + public void testIterateCompressedRecordWithWrapperOffsetZeroV0() { + for (byte magic : Arrays.asList(RecordBatch.MAGIC_VALUE_V0, RecordBatch.MAGIC_VALUE_V1)) { + SimpleRecord[] simpleRecords = new SimpleRecord[] { + new SimpleRecord(1L, "a".getBytes(), "1".getBytes()), + new SimpleRecord(2L, "b".getBytes(), "2".getBytes()), + new SimpleRecord(3L, "c".getBytes(), "3".getBytes()) + }; + + MemoryRecords records = MemoryRecords.withRecords(magic, 0L, + CompressionType.GZIP, TimestampType.CREATE_TIME, simpleRecords); + + ByteBufferLegacyRecordBatch batch = new ByteBufferLegacyRecordBatch(records.buffer()); + batch.setLastOffset(0L); + + long offset = 0L; + Iterator iterator = batch.unassignedOffsetsIterator(); + while (iterator.hasNext()) { + Record record = iterator.next(); + assertEquals(offset++, record.offset()); + } + } + } + + @Test(expected = InvalidRecordException.class) + public void testInvalidWrapperOffsetV1() { + SimpleRecord[] simpleRecords = new SimpleRecord[] { + new SimpleRecord(1L, "a".getBytes(), "1".getBytes()), + new SimpleRecord(2L, "b".getBytes(), "2".getBytes()), + new SimpleRecord(3L, "c".getBytes(), "3".getBytes()) + }; + + MemoryRecords records = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V1, 0L, + CompressionType.GZIP, TimestampType.CREATE_TIME, simpleRecords); + + ByteBufferLegacyRecordBatch batch = new ByteBufferLegacyRecordBatch(records.buffer()); + batch.setLastOffset(0L); + + batch.iterator(); + } + @Test(expected = IllegalArgumentException.class) public void testSetNoTimestampTypeNotAllowed() { MemoryRecords records = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V1, 0L, diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala index ee5cb586f18e3..b05e3475d8f53 100644 --- a/core/src/main/scala/kafka/log/LogValidator.scala +++ b/core/src/main/scala/kafka/log/LogValidator.scala @@ -126,7 +126,7 @@ private[kafka] object LogValidator extends Logging { for (batch <- records.batches.asScala) { validateBatch(batch, isFromClient, toMagicValue) - for (record <- batch.asScala) { + for (record <- batch.unassignedOffsetsIterator.asScala) { validateRecord(batch, record, now, timestampType, timestampDiffMaxMs, compactedTopic) builder.appendWithOffset(offsetCounter.getAndIncrement(), record) } @@ -160,7 +160,7 @@ private[kafka] object LogValidator extends Logging { var maxBatchTimestamp = RecordBatch.NO_TIMESTAMP var offsetOfMaxBatchTimestamp = -1L - for (record <- batch.asScala) { + for (record <- batch.unassignedOffsetsIterator.asScala) { validateRecord(batch, record, now, timestampType, timestampDiffMaxMs, compactedTopic) val offset = offsetCounter.getAndIncrement() if (batch.magic > RecordBatch.MAGIC_VALUE_V0 && record.timestamp > maxBatchTimestamp) { @@ -235,7 +235,7 @@ private[kafka] object LogValidator extends Logging { if (sourceCodec == NoCompressionCodec && batch.isControlBatch) inPlaceAssignment = true - for (record <- batch.asScala) { + for (record <- batch.unassignedOffsetsIterator.asScala) { validateRecord(batch, record, now, timestampType, timestampDiffMaxMs, compactedTopic) if (sourceCodec != NoCompressionCodec && record.isCompressed) throw new InvalidRecordException("Compressed outer record should not have an inner record with a " + From b41e9ce0e4a1c1e12998a5a350e08d51b130e74e Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Thu, 1 Jun 2017 18:47:39 -0700 Subject: [PATCH 2/5] Drop unassigned iterator and check locally for wrapper offset of 0 --- .../record/AbstractLegacyRecordBatch.java | 18 ++++++------------ .../common/record/DefaultRecordBatch.java | 5 ----- .../common/record/FileLogInputStream.java | 5 ----- .../kafka/common/record/RecordBatch.java | 13 ------------- .../record/AbstractLegacyRecordBatchTest.java | 9 +++------ .../main/scala/kafka/log/LogValidator.scala | 6 +++--- 6 files changed, 12 insertions(+), 44 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java index 83eebb2d660b1..b0a081792d3fd 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java @@ -224,17 +224,12 @@ public boolean isControlBatch() { */ @Override public Iterator iterator() { - return iterator(BufferSupplier.NO_CACHING, true); + return iterator(BufferSupplier.NO_CACHING); } - @Override - public Iterator unassignedOffsetsIterator() { - return iterator(BufferSupplier.NO_CACHING, false); - } - - private CloseableIterator iterator(BufferSupplier bufferSupplier, boolean hasAssignedOffsets) { + private CloseableIterator iterator(BufferSupplier bufferSupplier) { if (isCompressed()) - return new DeepRecordsIterator(this, false, Integer.MAX_VALUE, bufferSupplier, hasAssignedOffsets); + return new DeepRecordsIterator(this, false, Integer.MAX_VALUE, bufferSupplier); return new CloseableIterator() { private boolean hasNext = true; @@ -265,7 +260,7 @@ public void remove() { @Override public CloseableIterator streamingIterator(BufferSupplier bufferSupplier) { // the older message format versions do not support streaming, so we return the normal iterator - return iterator(bufferSupplier, true); + return iterator(bufferSupplier); } static void writeHeader(ByteBuffer buffer, long offset, int size) { @@ -320,8 +315,7 @@ private static class DeepRecordsIterator extends AbstractIterator implem private DeepRecordsIterator(AbstractLegacyRecordBatch wrapperEntry, boolean ensureMatchingMagic, int maxMessageSize, - BufferSupplier bufferSupplier, - boolean hasOffsetsAssigned) { + BufferSupplier bufferSupplier) { LegacyRecord wrapperRecord = wrapperEntry.outerRecord(); this.wrapperMagic = wrapperRecord.magic(); if (wrapperMagic != RecordBatch.MAGIC_VALUE_V0 && wrapperMagic != RecordBatch.MAGIC_VALUE_V1) @@ -370,7 +364,7 @@ private DeepRecordsIterator(AbstractLegacyRecordBatch wrapperEntry, if (innerEntries.isEmpty()) throw new InvalidRecordException("Found invalid compressed record set with no inner records"); - if (!hasOffsetsAssigned) { + if (lastOffsetFromWrapper == 0) { // If offsets are not assigned, then we do not use the offset from the wrapper record because // the value could be set differently depending on the client. For example, older clients // always used offset 0 in the wrapper record, while post-0.10.1 clients used the last offset diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java index 31075477e8590..59d9c419724dc 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java @@ -288,11 +288,6 @@ public Iterator iterator() { } } - @Override - public Iterator unassignedOffsetsIterator() { - return iterator(); - } - @Override public CloseableIterator streamingIterator(BufferSupplier bufferSupplier) { if (isCompressed()) diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java b/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java index 7bc9541b976fd..75eb1b3e35438 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java +++ b/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java @@ -154,11 +154,6 @@ public CloseableIterator streamingIterator(BufferSupplier bufferSupplier return loadFullBatch().streamingIterator(bufferSupplier); } - @Override - public Iterator unassignedOffsetsIterator() { - return loadFullBatch().unassignedOffsetsIterator(); - } - @Override public boolean isValid() { return loadFullBatch().isValid(); diff --git a/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java index 2094c500e4341..65a6a95fbe41f 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java @@ -230,19 +230,6 @@ public interface RecordBatch extends Iterable { */ CloseableIterator streamingIterator(BufferSupplier decompressionBufferSupplier); - /** - * Get an iterator over the records assuming that offsets have not yet been assigned. This is used - * by the broker for record validation prior to assigning offsets. - * - * @param decompressionBufferSupplier The supplier of ByteBuffer(s) used for decompression if supported. - * For small record batches, allocating a potentially large buffer (64 KB for LZ4) - * will dominate the cost of decompressing and iterating over the records in the - * batch. As such, a supplier that reuses buffers will have a significant - * performance impact. - * @return The iterator - */ - Iterator unassignedOffsetsIterator(); - /** * Check whether this is a control batch (i.e. whether the control bit is set in the batch attributes). * For magic versions prior to 2, this is always false. diff --git a/clients/src/test/java/org/apache/kafka/common/record/AbstractLegacyRecordBatchTest.java b/clients/src/test/java/org/apache/kafka/common/record/AbstractLegacyRecordBatchTest.java index ad2d1d20286e9..b99328d695cb3 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/AbstractLegacyRecordBatchTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/AbstractLegacyRecordBatchTest.java @@ -59,7 +59,7 @@ public void testSetLastOffsetCompressed() { } @Test - public void testIterateCompressedRecordWithWrapperOffsetZeroV0() { + public void testIterateCompressedRecordWithWrapperOffsetZero() { for (byte magic : Arrays.asList(RecordBatch.MAGIC_VALUE_V0, RecordBatch.MAGIC_VALUE_V1)) { SimpleRecord[] simpleRecords = new SimpleRecord[] { new SimpleRecord(1L, "a".getBytes(), "1".getBytes()), @@ -74,11 +74,8 @@ public void testIterateCompressedRecordWithWrapperOffsetZeroV0() { batch.setLastOffset(0L); long offset = 0L; - Iterator iterator = batch.unassignedOffsetsIterator(); - while (iterator.hasNext()) { - Record record = iterator.next(); + for (Record record : batch) assertEquals(offset++, record.offset()); - } } } @@ -94,7 +91,7 @@ public void testInvalidWrapperOffsetV1() { CompressionType.GZIP, TimestampType.CREATE_TIME, simpleRecords); ByteBufferLegacyRecordBatch batch = new ByteBufferLegacyRecordBatch(records.buffer()); - batch.setLastOffset(0L); + batch.setLastOffset(1L); batch.iterator(); } diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala index b05e3475d8f53..ee5cb586f18e3 100644 --- a/core/src/main/scala/kafka/log/LogValidator.scala +++ b/core/src/main/scala/kafka/log/LogValidator.scala @@ -126,7 +126,7 @@ private[kafka] object LogValidator extends Logging { for (batch <- records.batches.asScala) { validateBatch(batch, isFromClient, toMagicValue) - for (record <- batch.unassignedOffsetsIterator.asScala) { + for (record <- batch.asScala) { validateRecord(batch, record, now, timestampType, timestampDiffMaxMs, compactedTopic) builder.appendWithOffset(offsetCounter.getAndIncrement(), record) } @@ -160,7 +160,7 @@ private[kafka] object LogValidator extends Logging { var maxBatchTimestamp = RecordBatch.NO_TIMESTAMP var offsetOfMaxBatchTimestamp = -1L - for (record <- batch.unassignedOffsetsIterator.asScala) { + for (record <- batch.asScala) { validateRecord(batch, record, now, timestampType, timestampDiffMaxMs, compactedTopic) val offset = offsetCounter.getAndIncrement() if (batch.magic > RecordBatch.MAGIC_VALUE_V0 && record.timestamp > maxBatchTimestamp) { @@ -235,7 +235,7 @@ private[kafka] object LogValidator extends Logging { if (sourceCodec == NoCompressionCodec && batch.isControlBatch) inPlaceAssignment = true - for (record <- batch.unassignedOffsetsIterator.asScala) { + for (record <- batch.asScala) { validateRecord(batch, record, now, timestampType, timestampDiffMaxMs, compactedTopic) if (sourceCodec != NoCompressionCodec && record.isCompressed) throw new InvalidRecordException("Compressed outer record should not have an inner record with a " + From 83411706f7bf1f7073cfbf5ee350ce8303d5b311 Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Thu, 1 Jun 2017 18:49:58 -0700 Subject: [PATCH 3/5] Abbreviate comment on wrapper offset check --- .../kafka/common/record/AbstractLegacyRecordBatch.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java index b0a081792d3fd..d301358fd0530 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java @@ -365,10 +365,7 @@ private DeepRecordsIterator(AbstractLegacyRecordBatch wrapperEntry, throw new InvalidRecordException("Found invalid compressed record set with no inner records"); if (lastOffsetFromWrapper == 0) { - // If offsets are not assigned, then we do not use the offset from the wrapper record because - // the value could be set differently depending on the client. For example, older clients - // always used offset 0 in the wrapper record, while post-0.10.1 clients used the last offset - // from the inner record's relative offsets. + // The outer offset may be 0 if this is produce data from an older client. this.absoluteBaseOffset = 0; } else if (wrapperMagic == RecordBatch.MAGIC_VALUE_V1) { long lastInnerOffset = innerEntries.getLast().offset(); From 5eb17d14e1b15d39326829670c3494fd7c7f3890 Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Thu, 1 Jun 2017 18:58:11 -0700 Subject: [PATCH 4/5] Fix checkstyle errors --- .../record/AbstractLegacyRecordBatchTest.java | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/common/record/AbstractLegacyRecordBatchTest.java b/clients/src/test/java/org/apache/kafka/common/record/AbstractLegacyRecordBatchTest.java index b99328d695cb3..df6a16baf7584 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/AbstractLegacyRecordBatchTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/AbstractLegacyRecordBatchTest.java @@ -21,7 +21,6 @@ import org.junit.Test; import java.util.Arrays; -import java.util.Iterator; import java.util.List; import static org.junit.Assert.assertEquals; @@ -62,9 +61,9 @@ public void testSetLastOffsetCompressed() { public void testIterateCompressedRecordWithWrapperOffsetZero() { for (byte magic : Arrays.asList(RecordBatch.MAGIC_VALUE_V0, RecordBatch.MAGIC_VALUE_V1)) { SimpleRecord[] simpleRecords = new SimpleRecord[] { - new SimpleRecord(1L, "a".getBytes(), "1".getBytes()), - new SimpleRecord(2L, "b".getBytes(), "2".getBytes()), - new SimpleRecord(3L, "c".getBytes(), "3".getBytes()) + new SimpleRecord(1L, "a".getBytes(), "1".getBytes()), + new SimpleRecord(2L, "b".getBytes(), "2".getBytes()), + new SimpleRecord(3L, "c".getBytes(), "3".getBytes()) }; MemoryRecords records = MemoryRecords.withRecords(magic, 0L, @@ -82,9 +81,9 @@ public void testIterateCompressedRecordWithWrapperOffsetZero() { @Test(expected = InvalidRecordException.class) public void testInvalidWrapperOffsetV1() { SimpleRecord[] simpleRecords = new SimpleRecord[] { - new SimpleRecord(1L, "a".getBytes(), "1".getBytes()), - new SimpleRecord(2L, "b".getBytes(), "2".getBytes()), - new SimpleRecord(3L, "c".getBytes(), "3".getBytes()) + new SimpleRecord(1L, "a".getBytes(), "1".getBytes()), + new SimpleRecord(2L, "b".getBytes(), "2".getBytes()), + new SimpleRecord(3L, "c".getBytes(), "3".getBytes()) }; MemoryRecords records = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V1, 0L, From eabca8eecc8c760260cfd4560717bbd147d00fe2 Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Thu, 1 Jun 2017 20:05:52 -0700 Subject: [PATCH 5/5] Only check wrapper offset for v1 records --- .../record/AbstractLegacyRecordBatch.java | 20 ++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java index d301358fd0530..eaf691eca95b4 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java @@ -364,15 +364,17 @@ private DeepRecordsIterator(AbstractLegacyRecordBatch wrapperEntry, if (innerEntries.isEmpty()) throw new InvalidRecordException("Found invalid compressed record set with no inner records"); - if (lastOffsetFromWrapper == 0) { - // The outer offset may be 0 if this is produce data from an older client. - this.absoluteBaseOffset = 0; - } else if (wrapperMagic == RecordBatch.MAGIC_VALUE_V1) { - long lastInnerOffset = innerEntries.getLast().offset(); - if (lastOffsetFromWrapper < lastInnerOffset) - throw new InvalidRecordException("Found invalid wrapper offset in compressed v1 message set: " - + lastOffsetFromWrapper); - this.absoluteBaseOffset = lastOffsetFromWrapper - lastInnerOffset; + if (wrapperMagic == RecordBatch.MAGIC_VALUE_V1) { + if (lastOffsetFromWrapper == 0) { + // The outer offset may be 0 if this is produce data from an older client. + this.absoluteBaseOffset = 0; + } else { + long lastInnerOffset = innerEntries.getLast().offset(); + if (lastOffsetFromWrapper < lastInnerOffset) + throw new InvalidRecordException("Found invalid wrapper offset in compressed v1 message set: " + + lastOffsetFromWrapper); + this.absoluteBaseOffset = lastOffsetFromWrapper - lastInnerOffset; + } } else { this.absoluteBaseOffset = -1; }