From f630f3f63a8ef58b0abcb80013109256ea34e3f3 Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Wed, 1 Feb 2017 20:40:26 -0800 Subject: [PATCH 1/4] MINOR: Ensure timestamp type is provided when upconverting messages --- .../kafka/common/record/AbstractRecords.java | 4 +-- .../common/record/MemoryRecordsBuilder.java | 3 ++ .../apache/kafka/common/record/Record.java | 24 +++++++++----- .../apache/kafka/common/record/Records.java | 3 +- .../kafka/common/record/TimestampType.java | 3 ++ .../kafka/common/record/FileRecordsTest.java | 10 +++--- .../kafka/common/record/SimpleRecordTest.java | 18 +++++++++-- .../common/record/TimestampTypeTest.java | 5 +++ .../src/main/scala/kafka/log/LogSegment.scala | 10 ++++-- .../main/scala/kafka/log/LogValidator.scala | 20 ++++++------ .../main/scala/kafka/server/KafkaApis.scala | 14 ++++----- .../test/scala/unit/kafka/log/LogTest.scala | 2 +- .../unit/kafka/log/LogValidatorTest.scala | 31 +++++++++++++++++-- 13 files changed, 106 insertions(+), 41 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java index 3a96d88e4b0d1..47b96e2f12eb7 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java @@ -55,10 +55,10 @@ public boolean hasMatchingShallowMagic(byte magic) { * Convert this message set to use the specified message format. */ @Override - public Records toMessageFormat(byte toMagic) { + public Records toMessageFormat(byte toMagic, TimestampType upconvertTimestampType) { List converted = new ArrayList<>(); for (LogEntry entry : deepEntries()) - converted.add(LogEntry.create(entry.offset(), entry.record().convert(toMagic))); + converted.add(LogEntry.create(entry.offset(), entry.record().convert(toMagic, upconvertTimestampType))); if (converted.isEmpty()) { // This indicates that the message is too large, which indicates that the buffer is not large diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java index 69e9003ff7f6e..0260f174a8f21 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java @@ -112,6 +112,9 @@ public MemoryRecordsBuilder(ByteBuffer buffer, long baseOffset, long logAppendTime, int writeLimit) { + if (magic > Record.MAGIC_VALUE_V0 && timestampType == TimestampType.NO_TIMESTAMP_TYPE) + throw new IllegalArgumentException("TimestampType must be set for magic >= 0"); + this.magic = magic; this.timestampType = timestampType; this.compressionType = compressionType; diff --git a/clients/src/main/java/org/apache/kafka/common/record/Record.java b/clients/src/main/java/org/apache/kafka/common/record/Record.java index 0c0fa3c49c4d6..fddd189b75fe7 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/Record.java +++ b/clients/src/main/java/org/apache/kafka/common/record/Record.java @@ -242,9 +242,9 @@ public long timestamp() { } /** - * The timestamp of the message. - * @return the timstamp type or {@link TimestampType#NO_TIMESTAMP_TYPE} if the magic is 0 or the message has - * been up-converted. + * Get the timestamp type of the record. + * + * @return The timestamp type or {@link TimestampType#NO_TIMESTAMP_TYPE} if the magic is 0. */ public TimestampType timestampType() { if (magic() == 0) @@ -338,15 +338,25 @@ public int convertedSize(byte toMagic) { * Convert this record to another message format. * * @param toMagic The target magic version to convert to + * @param upconvertTimestampType The timestamp type to use if up-converting from magic 0, ignored if + * down-converting or if no conversion is needed * @return A new record instance with a freshly allocated ByteBuffer. */ - public Record convert(byte toMagic) { - if (toMagic == magic()) + public Record convert(byte toMagic, TimestampType upconvertTimestampType) { + byte magic = magic(); + if (toMagic == magic) return this; + final TimestampType timestampType; + if (magic == Record.MAGIC_VALUE_V0) { + if (upconvertTimestampType == null || upconvertTimestampType == TimestampType.NO_TIMESTAMP_TYPE) + throw new IllegalArgumentException("Cannot upconvert using timestamp type " + upconvertTimestampType); + timestampType = upconvertTimestampType; + } else { + timestampType = TimestampType.forAttributes(attributes()); + } + ByteBuffer buffer = ByteBuffer.allocate(convertedSize(toMagic)); - TimestampType timestampType = wrapperRecordTimestampType != null ? - wrapperRecordTimestampType : TimestampType.forAttributes(attributes()); convertTo(buffer, toMagic, timestamp(), timestampType); buffer.rewind(); return new Record(buffer); diff --git a/clients/src/main/java/org/apache/kafka/common/record/Records.java b/clients/src/main/java/org/apache/kafka/common/record/Records.java index 9235f928ec293..bdc96558b7930 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/Records.java +++ b/clients/src/main/java/org/apache/kafka/common/record/Records.java @@ -83,8 +83,9 @@ public interface Records { * Convert all entries in this buffer to the format passed as a parameter. Note that this requires * deep iteration since all of the deep records must also be converted to the desired format. * @param toMagic The magic value to convert to + * @param upconvertTimestampType The timestamp type to use if up-converting from magic 0 * @return A Records (which may or may not be the same instance) */ - Records toMessageFormat(byte toMagic); + Records toMessageFormat(byte toMagic, TimestampType upconvertTimestampType); } diff --git a/clients/src/main/java/org/apache/kafka/common/record/TimestampType.java b/clients/src/main/java/org/apache/kafka/common/record/TimestampType.java index 55c966ac731fb..182cbd1dcfd01 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/TimestampType.java +++ b/clients/src/main/java/org/apache/kafka/common/record/TimestampType.java @@ -34,6 +34,9 @@ public enum TimestampType { } public byte updateAttributes(byte attributes) { + if (this == NO_TIMESTAMP_TYPE) + throw new IllegalArgumentException("Cannot use NO_TIMESTAMP_TYPE in attributes"); + return this == CREATE_TIME ? (byte) (attributes & ~Record.TIMESTAMP_TYPE_MASK) : (byte) (attributes | Record.TIMESTAMP_TYPE_MASK); } diff --git a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java index dcd3befbd1eea..d8d086dcfdbc3 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java @@ -299,7 +299,7 @@ public void testFormatConversionWithPartialMessage() throws IOException { int start = fileRecords.searchForOffsetWithSize(1, 0).position; int size = entry.sizeInBytes(); FileRecords slice = fileRecords.read(start, size - 1); - Records messageV0 = slice.toMessageFormat(Record.MAGIC_VALUE_V0); + Records messageV0 = slice.toMessageFormat(Record.MAGIC_VALUE_V0, null); assertTrue("No message should be there", shallowEntries(messageV0).isEmpty()); assertEquals("There should be " + (size - 1) + " bytes", size - 1, messageV0.sizeInBytes()); } @@ -316,7 +316,7 @@ public void testConvertNonCompressedToMagic1() throws IOException { try (FileRecords fileRecords = FileRecords.open(tempFile())) { fileRecords.append(records); fileRecords.flush(); - Records convertedRecords = fileRecords.toMessageFormat(Record.MAGIC_VALUE_V1); + Records convertedRecords = fileRecords.toMessageFormat(Record.MAGIC_VALUE_V1, TimestampType.CREATE_TIME); verifyConvertedMessageSet(entries, convertedRecords, Record.MAGIC_VALUE_V1); } } @@ -332,7 +332,7 @@ public void testConvertCompressedToMagic1() throws IOException { try (FileRecords fileRecords = FileRecords.open(tempFile())) { fileRecords.append(records); fileRecords.flush(); - Records convertedRecords = fileRecords.toMessageFormat(Record.MAGIC_VALUE_V1); + Records convertedRecords = fileRecords.toMessageFormat(Record.MAGIC_VALUE_V1, TimestampType.CREATE_TIME); verifyConvertedMessageSet(entries, convertedRecords, Record.MAGIC_VALUE_V1); } } @@ -348,7 +348,7 @@ public void testConvertNonCompressedToMagic0() throws IOException { try (FileRecords fileRecords = FileRecords.open(tempFile())) { fileRecords.append(records); fileRecords.flush(); - Records convertedRecords = fileRecords.toMessageFormat(Record.MAGIC_VALUE_V0); + Records convertedRecords = fileRecords.toMessageFormat(Record.MAGIC_VALUE_V0, null); verifyConvertedMessageSet(entries, convertedRecords, Record.MAGIC_VALUE_V0); } } @@ -364,7 +364,7 @@ public void testConvertCompressedToMagic0() throws IOException { try (FileRecords fileRecords = FileRecords.open(tempFile())) { fileRecords.append(records); fileRecords.flush(); - Records convertedRecords = fileRecords.toMessageFormat(Record.MAGIC_VALUE_V0); + Records convertedRecords = fileRecords.toMessageFormat(Record.MAGIC_VALUE_V0, null); verifyConvertedMessageSet(entries, convertedRecords, Record.MAGIC_VALUE_V0); } } diff --git a/clients/src/test/java/org/apache/kafka/common/record/SimpleRecordTest.java b/clients/src/test/java/org/apache/kafka/common/record/SimpleRecordTest.java index 427c743dd55d4..ecfbc25cf8512 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/SimpleRecordTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/SimpleRecordTest.java @@ -57,6 +57,18 @@ public void testIsValidWithFourBytesBuffer() { record.ensureValid(); } + @Test(expected = IllegalArgumentException.class) + public void cannotUpconvertWithNullTimestampType() { + Record record = Record.create(Record.MAGIC_VALUE_V0, Record.NO_TIMESTAMP, "foo".getBytes(), "bar".getBytes()); + record.convert(Record.MAGIC_VALUE_V1, null); + } + + @Test(expected = IllegalArgumentException.class) + public void cannotUpconvertWithNoTimestampType() { + Record record = Record.create(Record.MAGIC_VALUE_V0, Record.NO_TIMESTAMP, "foo".getBytes(), "bar".getBytes()); + record.convert(Record.MAGIC_VALUE_V1, TimestampType.NO_TIMESTAMP_TYPE); + } + @Test public void testConvertFromV0ToV1() { byte[][] keys = new byte[][] {"a".getBytes(), "".getBytes(), null, "b".getBytes()}; @@ -64,10 +76,11 @@ public void testConvertFromV0ToV1() { for (int i = 0; i < keys.length; i++) { Record record = Record.create(Record.MAGIC_VALUE_V0, Record.NO_TIMESTAMP, keys[i], values[i]); - Record converted = record.convert(Record.MAGIC_VALUE_V1); + Record converted = record.convert(Record.MAGIC_VALUE_V1, TimestampType.CREATE_TIME); assertEquals(Record.MAGIC_VALUE_V1, converted.magic()); assertEquals(Record.NO_TIMESTAMP, converted.timestamp()); + assertEquals(TimestampType.CREATE_TIME, converted.timestampType()); assertEquals(record.key(), converted.key()); assertEquals(record.value(), converted.value()); assertTrue(record.isValid()); @@ -82,10 +95,11 @@ public void testConvertFromV1ToV0() { for (int i = 0; i < keys.length; i++) { Record record = Record.create(Record.MAGIC_VALUE_V1, System.currentTimeMillis(), keys[i], values[i]); - Record converted = record.convert(Record.MAGIC_VALUE_V0); + Record converted = record.convert(Record.MAGIC_VALUE_V0, null); assertEquals(Record.MAGIC_VALUE_V0, converted.magic()); assertEquals(Record.NO_TIMESTAMP, converted.timestamp()); + assertEquals(TimestampType.NO_TIMESTAMP_TYPE, converted.timestampType()); assertEquals(record.key(), converted.key()); assertEquals(record.value(), converted.value()); assertTrue(record.isValid()); diff --git a/clients/src/test/java/org/apache/kafka/common/record/TimestampTypeTest.java b/clients/src/test/java/org/apache/kafka/common/record/TimestampTypeTest.java index 47597152a2918..e7e2a3b52ffc4 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/TimestampTypeTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/TimestampTypeTest.java @@ -34,4 +34,9 @@ public void toAndFromAttributesLogAppendTime() { assertEquals(TimestampType.LOG_APPEND_TIME, TimestampType.forAttributes(attributes)); } + @Test(expected = IllegalArgumentException.class) + public void updateAttributesNotAllowedForNoTimestampType() { + TimestampType.NO_TIMESTAMP_TYPE.updateAttributes((byte) 0); + } + } diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index 8854c3a115cd4..15fa29a75e697 100755 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -98,7 +98,11 @@ class LogSegment(val log: FileRecords, * @param records The log entries to append. */ @nonthreadsafe - def append(firstOffset: Long, largestOffset: Long, largestTimestamp: Long, shallowOffsetOfMaxTimestamp: Long, records: MemoryRecords) { + def append(firstOffset: Long, + largestOffset: Long, + largestTimestamp: Long, + shallowOffsetOfMaxTimestamp: Long, + records: MemoryRecords) { if (records.sizeInBytes > 0) { trace("Inserting %d bytes at offset %d at position %d with largest timestamp %d at shallow offset %d" .format(records.sizeInBytes, firstOffset, log.sizeInBytes(), largestTimestamp, shallowOffsetOfMaxTimestamp)) @@ -419,9 +423,9 @@ class LogSegment(val log: FileRecords, */ def close() { CoreUtils.swallow(timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp, skipFullCheck = true)) - CoreUtils.swallow(index.close) + CoreUtils.swallow(index.close()) CoreUtils.swallow(timeIndex.close()) - CoreUtils.swallow(log.close) + CoreUtils.swallow(log.close()) } /** diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala index 224a792778319..45e364cde9051 100644 --- a/core/src/main/scala/kafka/log/LogValidator.scala +++ b/core/src/main/scala/kafka/log/LogValidator.scala @@ -148,15 +148,15 @@ private[kafka] object LogValidator { * 3. When magic value to use is above 0, but some fields of inner messages need to be overwritten. * 4. Message format conversion is needed. */ - def validateMessagesAndAssignOffsetsCompressed(records: MemoryRecords, - offsetCounter: LongRef, - now: Long, - sourceCodec: CompressionCodec, - targetCodec: CompressionCodec, - compactedTopic: Boolean = false, - messageFormatVersion: Byte = Record.CURRENT_MAGIC_VALUE, - messageTimestampType: TimestampType, - messageTimestampDiffMaxMs: Long): ValidationAndOffsetAssignResult = { + private def validateMessagesAndAssignOffsetsCompressed(records: MemoryRecords, + offsetCounter: LongRef, + now: Long, + sourceCodec: CompressionCodec, + targetCodec: CompressionCodec, + compactedTopic: Boolean = false, + messageFormatVersion: Byte = Record.CURRENT_MAGIC_VALUE, + messageTimestampType: TimestampType, + messageTimestampDiffMaxMs: Long): ValidationAndOffsetAssignResult = { // No in place assignment situation 1 and 2 var inPlaceAssignment = sourceCodec == targetCodec && messageFormatVersion > Record.MAGIC_VALUE_V0 @@ -186,7 +186,7 @@ private[kafka] object LogValidator { if (record.magic != messageFormatVersion) inPlaceAssignment = false - validatedRecords += record.convert(messageFormatVersion) + validatedRecords += record.convert(messageFormatVersion, messageTimestampType) } if (!inPlaceAssignment) { diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 7bb3ed54de2ba..e6dd2c962d6df 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -79,7 +79,7 @@ class KafkaApis(val requestChannel: RequestChannel, ApiKeys.forId(request.requestId) match { case ApiKeys.PRODUCE => handleProducerRequest(request) case ApiKeys.FETCH => handleFetchRequest(request) - case ApiKeys.LIST_OFFSETS => handleOffsetRequest(request) + case ApiKeys.LIST_OFFSETS => handleListOffsetRequest(request) case ApiKeys.METADATA => handleTopicMetadataRequest(request) case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request) case ApiKeys.STOP_REPLICA => handleStopReplicaRequest(request) @@ -478,7 +478,7 @@ class KafkaApis(val requestChannel: RequestChannel, val convertedData = if (versionId <= 1 && replicaManager.getMagicAndTimestampType(tp).exists(_._1 > Record.MAGIC_VALUE_V0) && !data.records.hasMatchingShallowMagic(Record.MAGIC_VALUE_V0)) { trace(s"Down converting message to V0 for fetch request from $clientId") - FetchPartitionData(data.error, data.hw, data.records.toMessageFormat(Record.MAGIC_VALUE_V0)) + FetchPartitionData(data.error, data.hw, data.records.toMessageFormat(Record.MAGIC_VALUE_V0, null)) } else data tp -> new FetchResponse.PartitionData(convertedData.error.code, convertedData.hw, convertedData.records) @@ -557,20 +557,20 @@ class KafkaApis(val requestChannel: RequestChannel, /** * Handle an offset request */ - def handleOffsetRequest(request: RequestChannel.Request) { + def handleListOffsetRequest(request: RequestChannel.Request) { val version = request.header.apiVersion() val mergedResponseMap = if (version == 0) - handleOffsetRequestV0(request) + handleListOffsetRequestV0(request) else - handleOffsetRequestV1(request) + handleListOffsetRequestV1(request) val response = new ListOffsetResponse(mergedResponseMap.asJava, version) requestChannel.sendResponse(new RequestChannel.Response(request, response)) } - private def handleOffsetRequestV0(request : RequestChannel.Request) : Map[TopicPartition, ListOffsetResponse.PartitionData] = { + private def handleListOffsetRequestV0(request : RequestChannel.Request) : Map[TopicPartition, ListOffsetResponse.PartitionData] = { val correlationId = request.header.correlationId val clientId = request.header.clientId val offsetRequest = request.body.asInstanceOf[ListOffsetRequest] @@ -621,7 +621,7 @@ class KafkaApis(val requestChannel: RequestChannel, responseMap ++ unauthorizedResponseStatus } - private def handleOffsetRequestV1(request : RequestChannel.Request): Map[TopicPartition, ListOffsetResponse.PartitionData] = { + private def handleListOffsetRequestV1(request : RequestChannel.Request): Map[TopicPartition, ListOffsetResponse.PartitionData] = { val correlationId = request.header.correlationId val clientId = request.header.clientId val offsetRequest = request.body.asInstanceOf[ListOffsetRequest] diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 08cdac5807176..c42233ecbd4d3 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -362,7 +362,7 @@ class LogTest extends JUnitSuite { val head = messages.iterator.next() assertEquals("Offsets not equal", offset, head.offset) assertEquals("Messages not equal at offset " + offset, messageSets(i).shallowEntries.iterator.next().record, - head.record.convert(messageSets(i).shallowEntries.iterator.next().record.magic)) + head.record.convert(messageSets(i).shallowEntries.iterator.next().record.magic, null)) offset = head.offset + 1 } val lastRead = log.read(startOffset = numMessages, maxLength = 1024*1024, maxOffset = Some(numMessages + 1)).records diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala index b201febd74936..7e34ce0dcc8e3 100644 --- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala @@ -125,9 +125,9 @@ class LogValidatorTest extends JUnitSuite { var i = 0 for (logEntry <- validatedRecords.deepEntries.asScala) { - logEntry.record.ensureValid() - assertEquals(logEntry.record.timestamp, timestampSeq(i)) - assertEquals(logEntry.record.timestampType, TimestampType.CREATE_TIME) + assertTrue(logEntry.record.isValid) + assertEquals(timestampSeq(i), logEntry.record.timestamp) + assertEquals(TimestampType.CREATE_TIME, logEntry.record.timestampType) i += 1 } assertEquals(s"Max timestamp should be ${now + 1}", now + 1, validatingResults.maxTimestamp) @@ -135,6 +135,31 @@ class LogValidatorTest extends JUnitSuite { assertFalse("Message size should not have been changed", validatingResults.messageSizeMaybeChanged) } + @Test + def testCreateTimeUpConversion() { + val records = createRecords(magicValue = Record.MAGIC_VALUE_V0, codec = CompressionType.GZIP) + val validatedResults = + LogValidator.validateMessagesAndAssignOffsets(records, + offsetCounter = new LongRef(0), + now = System.currentTimeMillis(), + sourceCodec = DefaultCompressionCodec, + targetCodec = DefaultCompressionCodec, + messageFormatVersion = Record.MAGIC_VALUE_V1, + messageTimestampType = TimestampType.CREATE_TIME, + messageTimestampDiffMaxMs = 1000L) + val validatedRecords = validatedResults.validatedRecords + + for (logEntry <- validatedRecords.deepEntries.asScala) { + assertTrue(logEntry.record.isValid) + assertEquals(Record.NO_TIMESTAMP, logEntry.record.timestamp) + assertEquals(TimestampType.CREATE_TIME, logEntry.record.timestampType) + } + assertEquals(s"Max timestamp should be ${Record.NO_TIMESTAMP}", Record.NO_TIMESTAMP, validatedResults.maxTimestamp) + assertEquals(s"Offset of max timestamp should be ${validatedRecords.deepEntries.asScala.size - 1}", + validatedRecords.deepEntries.asScala.size - 1, validatedResults.shallowOffsetOfMaxTimestamp) + assertTrue("Message size should have been changed", validatedResults.messageSizeMaybeChanged) + } + @Test def testCreateTimeCompressed() { val now = System.currentTimeMillis() From e270b62fe20a2ad668432d33a04e68a6ad5ed2f6 Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Wed, 1 Feb 2017 20:52:10 -0800 Subject: [PATCH 2/4] Use NO_TIMESTAMP_TYPE instead of null when no up-conversion needed --- .../main/java/org/apache/kafka/common/record/Record.java | 4 ++-- .../org/apache/kafka/common/record/FileRecordsTest.java | 6 +++--- .../org/apache/kafka/common/record/SimpleRecordTest.java | 8 +------- core/src/main/scala/kafka/server/KafkaApis.scala | 5 +++-- core/src/test/scala/unit/kafka/log/LogTest.scala | 4 ++-- 5 files changed, 11 insertions(+), 16 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/Record.java b/clients/src/main/java/org/apache/kafka/common/record/Record.java index fddd189b75fe7..a424ed0615364 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/Record.java +++ b/clients/src/main/java/org/apache/kafka/common/record/Record.java @@ -349,8 +349,8 @@ public Record convert(byte toMagic, TimestampType upconvertTimestampType) { final TimestampType timestampType; if (magic == Record.MAGIC_VALUE_V0) { - if (upconvertTimestampType == null || upconvertTimestampType == TimestampType.NO_TIMESTAMP_TYPE) - throw new IllegalArgumentException("Cannot upconvert using timestamp type " + upconvertTimestampType); + if (upconvertTimestampType == TimestampType.NO_TIMESTAMP_TYPE) + throw new IllegalArgumentException("Cannot up-convert using timestamp type " + upconvertTimestampType); timestampType = upconvertTimestampType; } else { timestampType = TimestampType.forAttributes(attributes()); diff --git a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java index d8d086dcfdbc3..d0ab42740e79a 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java @@ -299,7 +299,7 @@ public void testFormatConversionWithPartialMessage() throws IOException { int start = fileRecords.searchForOffsetWithSize(1, 0).position; int size = entry.sizeInBytes(); FileRecords slice = fileRecords.read(start, size - 1); - Records messageV0 = slice.toMessageFormat(Record.MAGIC_VALUE_V0, null); + Records messageV0 = slice.toMessageFormat(Record.MAGIC_VALUE_V0, TimestampType.NO_TIMESTAMP_TYPE); assertTrue("No message should be there", shallowEntries(messageV0).isEmpty()); assertEquals("There should be " + (size - 1) + " bytes", size - 1, messageV0.sizeInBytes()); } @@ -348,7 +348,7 @@ public void testConvertNonCompressedToMagic0() throws IOException { try (FileRecords fileRecords = FileRecords.open(tempFile())) { fileRecords.append(records); fileRecords.flush(); - Records convertedRecords = fileRecords.toMessageFormat(Record.MAGIC_VALUE_V0, null); + Records convertedRecords = fileRecords.toMessageFormat(Record.MAGIC_VALUE_V0, TimestampType.NO_TIMESTAMP_TYPE); verifyConvertedMessageSet(entries, convertedRecords, Record.MAGIC_VALUE_V0); } } @@ -364,7 +364,7 @@ public void testConvertCompressedToMagic0() throws IOException { try (FileRecords fileRecords = FileRecords.open(tempFile())) { fileRecords.append(records); fileRecords.flush(); - Records convertedRecords = fileRecords.toMessageFormat(Record.MAGIC_VALUE_V0, null); + Records convertedRecords = fileRecords.toMessageFormat(Record.MAGIC_VALUE_V0, TimestampType.NO_TIMESTAMP_TYPE); verifyConvertedMessageSet(entries, convertedRecords, Record.MAGIC_VALUE_V0); } } diff --git a/clients/src/test/java/org/apache/kafka/common/record/SimpleRecordTest.java b/clients/src/test/java/org/apache/kafka/common/record/SimpleRecordTest.java index ecfbc25cf8512..e4c4a6781b9d4 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/SimpleRecordTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/SimpleRecordTest.java @@ -57,12 +57,6 @@ public void testIsValidWithFourBytesBuffer() { record.ensureValid(); } - @Test(expected = IllegalArgumentException.class) - public void cannotUpconvertWithNullTimestampType() { - Record record = Record.create(Record.MAGIC_VALUE_V0, Record.NO_TIMESTAMP, "foo".getBytes(), "bar".getBytes()); - record.convert(Record.MAGIC_VALUE_V1, null); - } - @Test(expected = IllegalArgumentException.class) public void cannotUpconvertWithNoTimestampType() { Record record = Record.create(Record.MAGIC_VALUE_V0, Record.NO_TIMESTAMP, "foo".getBytes(), "bar".getBytes()); @@ -95,7 +89,7 @@ public void testConvertFromV1ToV0() { for (int i = 0; i < keys.length; i++) { Record record = Record.create(Record.MAGIC_VALUE_V1, System.currentTimeMillis(), keys[i], values[i]); - Record converted = record.convert(Record.MAGIC_VALUE_V0, null); + Record converted = record.convert(Record.MAGIC_VALUE_V0, TimestampType.NO_TIMESTAMP_TYPE); assertEquals(Record.MAGIC_VALUE_V0, converted.magic()); assertEquals(Record.NO_TIMESTAMP, converted.timestamp()); diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index e6dd2c962d6df..c406681455ee8 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -39,7 +39,7 @@ import org.apache.kafka.common.errors.{ClusterAuthorizationException, NotLeaderF import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.{ApiKeys, Errors, Protocol} -import org.apache.kafka.common.record.{MemoryRecords, Record} +import org.apache.kafka.common.record.{MemoryRecords, Record, TimestampType} import org.apache.kafka.common.requests._ import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.utils.{Time, Utils} @@ -478,7 +478,8 @@ class KafkaApis(val requestChannel: RequestChannel, val convertedData = if (versionId <= 1 && replicaManager.getMagicAndTimestampType(tp).exists(_._1 > Record.MAGIC_VALUE_V0) && !data.records.hasMatchingShallowMagic(Record.MAGIC_VALUE_V0)) { trace(s"Down converting message to V0 for fetch request from $clientId") - FetchPartitionData(data.error, data.hw, data.records.toMessageFormat(Record.MAGIC_VALUE_V0, null)) + val downConvertedRecords = data.records.toMessageFormat(Record.MAGIC_VALUE_V0, TimestampType.NO_TIMESTAMP_TYPE) + FetchPartitionData(data.error, data.hw, downConvertedRecords) } else data tp -> new FetchResponse.PartitionData(convertedData.error.code, convertedData.hw, convertedData.records) diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index c42233ecbd4d3..8fbeced440974 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -353,7 +353,7 @@ class LogTest extends JUnitSuite { val numMessages = 100 val messageSets = (0 until numMessages).map(i => TestUtils.singletonRecords(i.toString.getBytes)) messageSets.foreach(log.append(_)) - log.flush + log.flush() /* do successive reads to ensure all our messages are there */ var offset = 0L @@ -362,7 +362,7 @@ class LogTest extends JUnitSuite { val head = messages.iterator.next() assertEquals("Offsets not equal", offset, head.offset) assertEquals("Messages not equal at offset " + offset, messageSets(i).shallowEntries.iterator.next().record, - head.record.convert(messageSets(i).shallowEntries.iterator.next().record.magic, null)) + head.record.convert(messageSets(i).shallowEntries.iterator.next().record.magic, TimestampType.NO_TIMESTAMP_TYPE)) offset = head.offset + 1 } val lastRead = log.read(startOffset = numMessages, maxLength = 1024*1024, maxOffset = Some(numMessages + 1)).records From 75c55be8968034e7c9934a0ccbe780c9bdba5cc3 Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Thu, 2 Feb 2017 13:42:04 -0800 Subject: [PATCH 3/4] Record conversion should take wrapper timestamp type into account --- .../src/main/java/org/apache/kafka/common/record/Record.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/Record.java b/clients/src/main/java/org/apache/kafka/common/record/Record.java index a424ed0615364..9dca5448a9811 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/Record.java +++ b/clients/src/main/java/org/apache/kafka/common/record/Record.java @@ -353,7 +353,7 @@ public Record convert(byte toMagic, TimestampType upconvertTimestampType) { throw new IllegalArgumentException("Cannot up-convert using timestamp type " + upconvertTimestampType); timestampType = upconvertTimestampType; } else { - timestampType = TimestampType.forAttributes(attributes()); + timestampType = timestampType(); } ByteBuffer buffer = ByteBuffer.allocate(convertedSize(toMagic)); From b42804b53bcc4bff90e30dacf770fd7d90435273 Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Thu, 2 Feb 2017 13:42:15 -0800 Subject: [PATCH 4/4] Minor style tweaks in tests --- .../scala/unit/kafka/log/LogConfigTest.scala | 2 +- .../unit/kafka/log/LogValidatorTest.scala | 21 +++++++------------ 2 files changed, 9 insertions(+), 14 deletions(-) diff --git a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala index 862083daff50a..5df76bce157c5 100644 --- a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala @@ -117,7 +117,7 @@ class LogConfigTest { p.setProperty(LogConfig.RetentionBytesProp, "100") LogConfig.validate(p) p.setProperty(LogConfig.RetentionBytesProp, "90") - val except = intercept[IllegalArgumentException] { + intercept[IllegalArgumentException] { LogConfig.validate(p) } } diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala index 7e34ce0dcc8e3..bb50497d780cb 100644 --- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala @@ -80,8 +80,7 @@ class LogValidatorTest extends JUnitSuite { def testLogAppendTimeWithoutRecompression() { val now = System.currentTimeMillis() // The timestamps should be overwritten - val records = createRecords(magicValue = Record.MAGIC_VALUE_V1, - timestamp = 0L, codec = CompressionType.GZIP) + val records = createRecords(magicValue = Record.MAGIC_VALUE_V1, timestamp = 0L, codec = CompressionType.GZIP) val validatedResults = LogValidator.validateMessagesAndAssignOffsets( records, offsetCounter = new LongRef(0), @@ -107,8 +106,7 @@ class LogValidatorTest extends JUnitSuite { def testCreateTimeNonCompressed() { val now = System.currentTimeMillis() val timestampSeq = Seq(now - 1, now + 1, now) - val records = - MemoryRecords.withRecords(CompressionType.NONE, + val records = MemoryRecords.withRecords(CompressionType.NONE, Record.create(Record.MAGIC_VALUE_V1, timestampSeq(0), "hello".getBytes), Record.create(Record.MAGIC_VALUE_V1, timestampSeq(1), "there".getBytes), Record.create(Record.MAGIC_VALUE_V1, timestampSeq(2), "beautiful".getBytes)) @@ -138,8 +136,7 @@ class LogValidatorTest extends JUnitSuite { @Test def testCreateTimeUpConversion() { val records = createRecords(magicValue = Record.MAGIC_VALUE_V0, codec = CompressionType.GZIP) - val validatedResults = - LogValidator.validateMessagesAndAssignOffsets(records, + val validatedResults = LogValidator.validateMessagesAndAssignOffsets(records, offsetCounter = new LongRef(0), now = System.currentTimeMillis(), sourceCodec = DefaultCompressionCodec, @@ -164,14 +161,12 @@ class LogValidatorTest extends JUnitSuite { def testCreateTimeCompressed() { val now = System.currentTimeMillis() val timestampSeq = Seq(now - 1, now + 1, now) - val records = - MemoryRecords.withRecords(CompressionType.GZIP, + val records = MemoryRecords.withRecords(CompressionType.GZIP, Record.create(Record.MAGIC_VALUE_V1, timestampSeq(0), "hello".getBytes), Record.create(Record.MAGIC_VALUE_V1, timestampSeq(1), "there".getBytes), Record.create(Record.MAGIC_VALUE_V1, timestampSeq(2), "beautiful".getBytes)) - val validatedResults = - LogValidator.validateMessagesAndAssignOffsets(records, + val validatedResults = LogValidator.validateMessagesAndAssignOffsets(records, offsetCounter = new LongRef(0), now = System.currentTimeMillis(), sourceCodec = DefaultCompressionCodec, @@ -183,9 +178,9 @@ class LogValidatorTest extends JUnitSuite { var i = 0 for (logEntry <- validatedRecords.deepEntries.asScala) { - logEntry.record.ensureValid() - assertEquals(logEntry.record.timestamp, timestampSeq(i)) - assertEquals(logEntry.record.timestampType, TimestampType.CREATE_TIME) + assertTrue(logEntry.record.isValid) + assertEquals(timestampSeq(i), logEntry.record.timestamp) + assertEquals(TimestampType.CREATE_TIME, logEntry.record.timestampType) i += 1 } assertEquals(s"Max timestamp should be ${now + 1}", now + 1, validatedResults.maxTimestamp)