diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 609c9c99792e..46ffe0f1864f 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -456,12 +456,13 @@
+
+
-
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/SupportedVersionRange.java b/clients/src/main/java/org/apache/kafka/clients/admin/SupportedVersionRange.java
index b85a392a658c..3ba6303eacd5 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/SupportedVersionRange.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/SupportedVersionRange.java
@@ -28,7 +28,7 @@ public class SupportedVersionRange {
/**
* Raises an exception unless the following conditions are met:
- * 1 <= minVersion <= maxVersion.
+ * 0 <= minVersion <= maxVersion.
*
* @param minVersion The minimum version value.
* @param maxVersion The maximum version value.
diff --git a/clients/src/main/java/org/apache/kafka/common/feature/BaseVersionRange.java b/clients/src/main/java/org/apache/kafka/common/feature/BaseVersionRange.java
index 2d6ce702e253..573e2c47a082 100644
--- a/clients/src/main/java/org/apache/kafka/common/feature/BaseVersionRange.java
+++ b/clients/src/main/java/org/apache/kafka/common/feature/BaseVersionRange.java
@@ -26,7 +26,7 @@
/**
* Represents an immutable basic version range using 2 attributes: min and max, each of type short.
* The min and max attributes need to satisfy 2 rules:
- * - they are each expected to be >= 1, as we only consider positive version values to be valid.
+ * - they are each expected to be >= 0, as we only consider positive version values to be valid.
* - max should be >= min.
*
* The class also provides API to convert the version range to a map.
@@ -48,7 +48,7 @@ class BaseVersionRange {
/**
* Raises an exception unless the following condition is met:
- * minValue >= 1 and maxValue >= 1 and maxValue >= minValue.
+ * minValue >= 0 and maxValue >= 0 and maxValue >= minValue.
*
* @param minKeyLabel Label for the min version key, that's used only to convert to/from a map.
* @param minValue The minimum version value.
@@ -56,14 +56,14 @@ class BaseVersionRange {
* @param maxValue The maximum version value.
*
* @throws IllegalArgumentException If any of the following conditions are true:
- * - (minValue < 1) OR (maxValue < 1) OR (maxValue < minValue).
+ * - (minValue < 0) OR (maxValue < 0) OR (maxValue < minValue).
* - minKeyLabel is empty, OR, minKeyLabel is empty.
*/
protected BaseVersionRange(String minKeyLabel, short minValue, String maxKeyLabel, short maxValue) {
- if (minValue < 1 || maxValue < 1 || maxValue < minValue) {
+ if (minValue < 0 || maxValue < 0 || maxValue < minValue) {
throw new IllegalArgumentException(
String.format(
- "Expected minValue >= 1, maxValue >= 1 and maxValue >= minValue, but received" +
+ "Expected minValue >= 0, maxValue >= 0 and maxValue >= minValue, but received" +
" minValue: %d, maxValue: %d", minValue, maxValue));
}
if (minKeyLabel.isEmpty()) {
@@ -86,6 +86,7 @@ public short max() {
return maxValue;
}
+ @Override
public String toString() {
return String.format(
"%s[%s]",
diff --git a/clients/src/main/java/org/apache/kafka/common/feature/SupportedVersionRange.java b/clients/src/main/java/org/apache/kafka/common/feature/SupportedVersionRange.java
index a864a9176263..b06263549032 100644
--- a/clients/src/main/java/org/apache/kafka/common/feature/SupportedVersionRange.java
+++ b/clients/src/main/java/org/apache/kafka/common/feature/SupportedVersionRange.java
@@ -33,7 +33,7 @@ public SupportedVersionRange(short minVersion, short maxVersion) {
}
public SupportedVersionRange(short maxVersion) {
- this((short) 1, maxVersion);
+ this((short) 0, maxVersion);
}
public static SupportedVersionRange fromMap(Map versionRangeMap) {
diff --git a/clients/src/main/java/org/apache/kafka/common/record/ControlRecordType.java b/clients/src/main/java/org/apache/kafka/common/record/ControlRecordType.java
index 39268044db34..8d0bbdba9eaa 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/ControlRecordType.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/ControlRecordType.java
@@ -44,11 +44,15 @@ public enum ControlRecordType {
ABORT((short) 0),
COMMIT((short) 1),
- // Raft quorum related control messages.
+ // KRaft quorum related control messages
LEADER_CHANGE((short) 2),
SNAPSHOT_HEADER((short) 3),
SNAPSHOT_FOOTER((short) 4),
+ // KRaft membership changes messages
+ KRAFT_VERSION((short) 5),
+ KRAFT_VOTERS((short) 6),
+
// UNKNOWN is used to indicate a control type which the client is not aware of and should be ignored
UNKNOWN((short) -1);
@@ -108,6 +112,10 @@ public static ControlRecordType fromTypeId(short typeId) {
return SNAPSHOT_HEADER;
case 4:
return SNAPSHOT_FOOTER;
+ case 5:
+ return KRAFT_VERSION;
+ case 6:
+ return KRAFT_VOTERS;
default:
return UNKNOWN;
diff --git a/clients/src/main/java/org/apache/kafka/common/record/ControlRecordUtils.java b/clients/src/main/java/org/apache/kafka/common/record/ControlRecordUtils.java
index 3b1fd21f7875..1e78448643e1 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/ControlRecordUtils.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/ControlRecordUtils.java
@@ -16,9 +16,11 @@
*/
package org.apache.kafka.common.record;
+import org.apache.kafka.common.message.KRaftVersionRecord;
import org.apache.kafka.common.message.LeaderChangeMessage;
-import org.apache.kafka.common.message.SnapshotHeaderRecord;
import org.apache.kafka.common.message.SnapshotFooterRecord;
+import org.apache.kafka.common.message.SnapshotHeaderRecord;
+import org.apache.kafka.common.message.VotersRecord;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import java.nio.ByteBuffer;
@@ -27,49 +29,77 @@
* Utility class for easy interaction with control records.
*/
public class ControlRecordUtils {
+ public static final short KRAFT_VERSION_CURRENT_VERSION = 0;
public static final short LEADER_CHANGE_CURRENT_VERSION = 0;
- public static final short SNAPSHOT_HEADER_CURRENT_VERSION = 0;
public static final short SNAPSHOT_FOOTER_CURRENT_VERSION = 0;
+ public static final short SNAPSHOT_HEADER_CURRENT_VERSION = 0;
+ public static final short KRAFT_VOTERS_CURRENT_VERSION = 0;
public static LeaderChangeMessage deserializeLeaderChangeMessage(Record record) {
ControlRecordType recordType = ControlRecordType.parse(record.key());
- if (recordType != ControlRecordType.LEADER_CHANGE) {
- throw new IllegalArgumentException(
- "Expected LEADER_CHANGE control record type(2), but found " + recordType.toString());
- }
+ validateControlRecordType(ControlRecordType.LEADER_CHANGE, recordType);
+
return deserializeLeaderChangeMessage(record.value());
}
public static LeaderChangeMessage deserializeLeaderChangeMessage(ByteBuffer data) {
- ByteBufferAccessor byteBufferAccessor = new ByteBufferAccessor(data.slice());
- return new LeaderChangeMessage(byteBufferAccessor, LEADER_CHANGE_CURRENT_VERSION);
+ return new LeaderChangeMessage(new ByteBufferAccessor(data.slice()), LEADER_CHANGE_CURRENT_VERSION);
}
public static SnapshotHeaderRecord deserializeSnapshotHeaderRecord(Record record) {
ControlRecordType recordType = ControlRecordType.parse(record.key());
- if (recordType != ControlRecordType.SNAPSHOT_HEADER) {
- throw new IllegalArgumentException(
- "Expected SNAPSHOT_HEADER control record type(3), but found " + recordType.toString());
- }
+ validateControlRecordType(ControlRecordType.SNAPSHOT_HEADER, recordType);
+
return deserializeSnapshotHeaderRecord(record.value());
}
public static SnapshotHeaderRecord deserializeSnapshotHeaderRecord(ByteBuffer data) {
- ByteBufferAccessor byteBufferAccessor = new ByteBufferAccessor(data.slice());
- return new SnapshotHeaderRecord(byteBufferAccessor, SNAPSHOT_HEADER_CURRENT_VERSION);
+ return new SnapshotHeaderRecord(new ByteBufferAccessor(data.slice()), SNAPSHOT_HEADER_CURRENT_VERSION);
}
public static SnapshotFooterRecord deserializeSnapshotFooterRecord(Record record) {
ControlRecordType recordType = ControlRecordType.parse(record.key());
- if (recordType != ControlRecordType.SNAPSHOT_FOOTER) {
- throw new IllegalArgumentException(
- "Expected SNAPSHOT_FOOTER control record type(4), but found " + recordType.toString());
- }
+ validateControlRecordType(ControlRecordType.SNAPSHOT_FOOTER, recordType);
+
return deserializeSnapshotFooterRecord(record.value());
}
public static SnapshotFooterRecord deserializeSnapshotFooterRecord(ByteBuffer data) {
- ByteBufferAccessor byteBufferAccessor = new ByteBufferAccessor(data.slice());
- return new SnapshotFooterRecord(byteBufferAccessor, SNAPSHOT_FOOTER_CURRENT_VERSION);
+ return new SnapshotFooterRecord(new ByteBufferAccessor(data.slice()), SNAPSHOT_FOOTER_CURRENT_VERSION);
+ }
+
+ public static KRaftVersionRecord deserializeKRaftVersionRecord(Record record) {
+ ControlRecordType recordType = ControlRecordType.parse(record.key());
+ validateControlRecordType(ControlRecordType.KRAFT_VERSION, recordType);
+
+ return deserializeKRaftVersionRecord(record.value());
+ }
+
+ public static KRaftVersionRecord deserializeKRaftVersionRecord(ByteBuffer data) {
+ return new KRaftVersionRecord(new ByteBufferAccessor(data.slice()), KRAFT_VERSION_CURRENT_VERSION);
+ }
+
+ public static VotersRecord deserializeVotersRecord(Record record) {
+ ControlRecordType recordType = ControlRecordType.parse(record.key());
+ validateControlRecordType(ControlRecordType.KRAFT_VOTERS, recordType);
+
+ return deserializeVotersRecord(record.value());
+ }
+
+ public static VotersRecord deserializeVotersRecord(ByteBuffer data) {
+ return new VotersRecord(new ByteBufferAccessor(data.slice()), KRAFT_VOTERS_CURRENT_VERSION);
+ }
+
+ private static void validateControlRecordType(ControlRecordType expected, ControlRecordType actual) {
+ if (actual != expected) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Expected %s control record type(%d), but found %s",
+ expected,
+ expected.type(),
+ actual
+ )
+ );
+ }
}
}
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index c5b43d4f8856..4eab7f7b658f 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -18,9 +18,11 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.message.KRaftVersionRecord;
import org.apache.kafka.common.message.LeaderChangeMessage;
-import org.apache.kafka.common.message.SnapshotHeaderRecord;
import org.apache.kafka.common.message.SnapshotFooterRecord;
+import org.apache.kafka.common.message.SnapshotHeaderRecord;
+import org.apache.kafka.common.message.VotersRecord;
import org.apache.kafka.common.network.TransferableChannel;
import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention;
import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetentionResult;
@@ -154,7 +156,7 @@ public FilterResult filterTo(TopicPartition partition, RecordFilter filter, Byte
/**
* Note: This method is also used to convert the first timestamp of the batch (which is usually the timestamp of the first record)
- * to the delete horizon of the tombstones or txn markers which are present in the batch.
+ * to the delete horizon of the tombstones or txn markers which are present in the batch.
*/
private static FilterResult filterTo(TopicPartition partition, Iterable batches,
RecordFilter filter, ByteBuffer destinationBuffer, int maxRecordBatchSize,
@@ -728,83 +730,114 @@ public static MemoryRecords withLeaderChangeMessage(
ByteBuffer buffer,
LeaderChangeMessage leaderChangeMessage
) {
- writeLeaderChangeMessage(buffer, initialOffset, timestamp, leaderEpoch, leaderChangeMessage);
- buffer.flip();
- return MemoryRecords.readableRecords(buffer);
+ try (MemoryRecordsBuilder builder = createKraftControlReccordBuilder(
+ initialOffset,
+ timestamp,
+ leaderEpoch,
+ buffer
+ )
+ ) {
+ builder.appendLeaderChangeMessage(timestamp, leaderChangeMessage);
+ return builder.build();
+ }
}
- private static void writeLeaderChangeMessage(
- ByteBuffer buffer,
+ public static MemoryRecords withSnapshotHeaderRecord(
long initialOffset,
long timestamp,
int leaderEpoch,
- LeaderChangeMessage leaderChangeMessage
+ ByteBuffer buffer,
+ SnapshotHeaderRecord snapshotHeaderRecord
) {
- try (MemoryRecordsBuilder builder = new MemoryRecordsBuilder(
- buffer, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE,
- TimestampType.CREATE_TIME, initialOffset, timestamp,
- RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE,
- false, true, leaderEpoch, buffer.capacity())
+ try (MemoryRecordsBuilder builder = createKraftControlReccordBuilder(
+ initialOffset,
+ timestamp,
+ leaderEpoch,
+ buffer
+ )
) {
- builder.appendLeaderChangeMessage(timestamp, leaderChangeMessage);
+ builder.appendSnapshotHeaderMessage(timestamp, snapshotHeaderRecord);
+ return builder.build();
}
}
- public static MemoryRecords withSnapshotHeaderRecord(
+ public static MemoryRecords withSnapshotFooterRecord(
long initialOffset,
long timestamp,
int leaderEpoch,
ByteBuffer buffer,
- SnapshotHeaderRecord snapshotHeaderRecord
+ SnapshotFooterRecord snapshotFooterRecord
) {
- writeSnapshotHeaderRecord(buffer, initialOffset, timestamp, leaderEpoch, snapshotHeaderRecord);
- buffer.flip();
- return MemoryRecords.readableRecords(buffer);
+ try (MemoryRecordsBuilder builder = createKraftControlReccordBuilder(
+ initialOffset,
+ timestamp,
+ leaderEpoch,
+ buffer
+ )
+ ) {
+ builder.appendSnapshotFooterMessage(timestamp, snapshotFooterRecord);
+ return builder.build();
+ }
}
- private static void writeSnapshotHeaderRecord(
- ByteBuffer buffer,
+ public static MemoryRecords withKRaftVersionRecord(
long initialOffset,
long timestamp,
int leaderEpoch,
- SnapshotHeaderRecord snapshotHeaderRecord
+ ByteBuffer buffer,
+ KRaftVersionRecord kraftVersionRecord
) {
- try (MemoryRecordsBuilder builder = new MemoryRecordsBuilder(
- buffer, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE,
- TimestampType.CREATE_TIME, initialOffset, timestamp,
- RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE,
- false, true, leaderEpoch, buffer.capacity())
+ try (MemoryRecordsBuilder builder = createKraftControlReccordBuilder(
+ initialOffset,
+ timestamp,
+ leaderEpoch,
+ buffer
+ )
) {
- builder.appendSnapshotHeaderMessage(timestamp, snapshotHeaderRecord);
+ builder.appendKRaftVersionMessage(timestamp, kraftVersionRecord);
+ return builder.build();
}
}
- public static MemoryRecords withSnapshotFooterRecord(
+ public static MemoryRecords withVotersRecord(
long initialOffset,
long timestamp,
int leaderEpoch,
ByteBuffer buffer,
- SnapshotFooterRecord snapshotFooterRecord
+ VotersRecord votersRecord
) {
- writeSnapshotFooterRecord(buffer, initialOffset, timestamp, leaderEpoch, snapshotFooterRecord);
- buffer.flip();
- return MemoryRecords.readableRecords(buffer);
+ try (MemoryRecordsBuilder builder = createKraftControlReccordBuilder(
+ initialOffset,
+ timestamp,
+ leaderEpoch,
+ buffer
+ )
+ ) {
+ builder.appendVotersMessage(timestamp, votersRecord);
+ return builder.build();
+ }
}
- private static void writeSnapshotFooterRecord(
- ByteBuffer buffer,
+ private static MemoryRecordsBuilder createKraftControlReccordBuilder(
long initialOffset,
long timestamp,
int leaderEpoch,
- SnapshotFooterRecord snapshotFooterRecord
+ ByteBuffer buffer
) {
- try (MemoryRecordsBuilder builder = new MemoryRecordsBuilder(
- buffer, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE,
- TimestampType.CREATE_TIME, initialOffset, timestamp,
- RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE,
- false, true, leaderEpoch, buffer.capacity())
- ) {
- builder.appendSnapshotFooterMessage(timestamp, snapshotFooterRecord);
- }
+ return new MemoryRecordsBuilder(
+ buffer,
+ RecordBatch.CURRENT_MAGIC_VALUE,
+ CompressionType.NONE,
+ TimestampType.CREATE_TIME,
+ initialOffset,
+ timestamp,
+ RecordBatch.NO_PRODUCER_ID,
+ RecordBatch.NO_PRODUCER_EPOCH,
+ RecordBatch.NO_SEQUENCE,
+ false,
+ true,
+ leaderEpoch,
+ buffer.capacity()
+ );
}
}
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
index b8c3fd53d212..1afa91b5487f 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
@@ -18,9 +18,11 @@
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.message.KRaftVersionRecord;
import org.apache.kafka.common.message.LeaderChangeMessage;
-import org.apache.kafka.common.message.SnapshotHeaderRecord;
import org.apache.kafka.common.message.SnapshotFooterRecord;
+import org.apache.kafka.common.message.SnapshotHeaderRecord;
+import org.apache.kafka.common.message.VotersRecord;
import org.apache.kafka.common.protocol.MessageUtil;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.utils.ByteBufferOutputStream;
@@ -602,11 +604,12 @@ public void append(SimpleRecord record) {
/**
* Append a control record at the next sequential offset.
+ *
* @param timestamp The record timestamp
* @param type The control record type (cannot be UNKNOWN)
* @param value The control record value
*/
- private void appendControlRecord(long timestamp, ControlRecordType type, ByteBuffer value) {
+ public void appendControlRecord(long timestamp, ControlRecordType type, ByteBuffer value) {
Struct keyStruct = type.recordKey();
ByteBuffer key = ByteBuffer.allocate(keyStruct.sizeOf());
keyStruct.writeTo(key);
@@ -650,6 +653,22 @@ public void appendSnapshotFooterMessage(long timestamp, SnapshotFooterRecord sna
);
}
+ public void appendKRaftVersionMessage(long timestamp, KRaftVersionRecord kraftVersionRecord) {
+ appendControlRecord(
+ timestamp,
+ ControlRecordType.KRAFT_VERSION,
+ MessageUtil.toByteBuffer(kraftVersionRecord, ControlRecordUtils.KRAFT_VERSION_CURRENT_VERSION)
+ );
+ }
+
+ public void appendVotersMessage(long timestamp, VotersRecord votersRecord) {
+ appendControlRecord(
+ timestamp,
+ ControlRecordType.KRAFT_VOTERS,
+ MessageUtil.toByteBuffer(votersRecord, ControlRecordUtils.KRAFT_VOTERS_CURRENT_VERSION)
+ );
+ }
+
/**
* Add a legacy record without doing offset/magic validation (this should only be used in testing).
* @param offset The offset of the record
diff --git a/clients/src/main/resources/common/message/KRaftVersionRecord.json b/clients/src/main/resources/common/message/KRaftVersionRecord.json
new file mode 100644
index 000000000000..8610f75fe5d7
--- /dev/null
+++ b/clients/src/main/resources/common/message/KRaftVersionRecord.json
@@ -0,0 +1,27 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+ "type": "data",
+ "name": "KRaftVersionRecord",
+ "validVersions": "0",
+ "flexibleVersions": "0+",
+ "fields": [
+ { "name": "Version", "type": "int16", "versions": "0+",
+ "about": "The version of the kraft version record" },
+ { "name": "KRaftVersion", "type": "int16", "versions": "0+",
+ "about": "The kraft protocol version" }
+ ]
+}
diff --git a/clients/src/main/resources/common/message/VotersRecord.json b/clients/src/main/resources/common/message/VotersRecord.json
new file mode 100644
index 000000000000..e9df56ad4f14
--- /dev/null
+++ b/clients/src/main/resources/common/message/VotersRecord.json
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+ "type": "data",
+ "name": "VotersRecord",
+ "validVersions": "0",
+ "flexibleVersions": "0+",
+ "fields": [
+ { "name": "Version", "type": "int16", "versions": "0+",
+ "about": "The version of the voters record" },
+ { "name": "Voters", "type": "[]Voter", "versions": "0+", "fields": [
+ { "name": "VoterId", "type": "int32", "versions": "0+", "entityType": "brokerId",
+ "about": "The replica id of the voter in the topic partition" },
+ { "name": "VoterDirectoryId", "type": "uuid", "versions": "0+",
+ "about": "The directory id of the voter in the topic partition" },
+ { "name": "Endpoints", "type": "[]Endpoint", "versions": "0+",
+ "about": "The endpoint that can be used to communicate with the voter", "fields": [
+ { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
+ "about": "The name of the endpoint" },
+ { "name": "Host", "type": "string", "versions": "0+",
+ "about": "The hostname" },
+ { "name": "Port", "type": "uint16", "versions": "0+",
+ "about": "The port" }
+ ]},
+ { "name": "KRaftVersionFeature", "type": "KRaftVersionFeature", "versions": "0+",
+ "about": "The range of versions of the protocol that the replica supports", "fields": [
+ { "name": "MinSupportedVersion", "type": "int16", "versions": "0+",
+ "about": "The minimum supported KRaft protocol version" },
+ { "name": "MaxSupportedVersion", "type": "int16", "versions": "0+",
+ "about": "The maximum supported KRaft protocol version" }
+ ]}
+ ]}
+ ]
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/feature/SupportedVersionRangeTest.java b/clients/src/test/java/org/apache/kafka/common/feature/SupportedVersionRangeTest.java
index a1d2af419fd2..47f312d9d902 100644
--- a/clients/src/test/java/org/apache/kafka/common/feature/SupportedVersionRangeTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/feature/SupportedVersionRangeTest.java
@@ -36,18 +36,18 @@
public class SupportedVersionRangeTest {
@Test
public void testFailDueToInvalidParams() {
- // min and max can't be < 1.
+ // min and max can't be < 0.
assertThrows(
IllegalArgumentException.class,
- () -> new SupportedVersionRange((short) 0, (short) 0));
- // min can't be < 1.
+ () -> new SupportedVersionRange((short) -1, (short) -1));
+ // min can't be < 0.
assertThrows(
IllegalArgumentException.class,
- () -> new SupportedVersionRange((short) 0, (short) 1));
- // max can't be < 1.
+ () -> new SupportedVersionRange((short) -1, (short) 0));
+ // max can't be < 0.
assertThrows(
IllegalArgumentException.class,
- () -> new SupportedVersionRange((short) 1, (short) 0));
+ () -> new SupportedVersionRange((short) 0, (short) -1));
// min can't be > max.
assertThrows(
IllegalArgumentException.class,
@@ -73,23 +73,23 @@ public void testFromToMap() {
@Test
public void testFromMapFailure() {
- // min_version can't be < 1.
+ // min_version can't be < 0.
Map invalidWithBadMinVersion =
- mkMap(mkEntry("min_version", (short) 0), mkEntry("max_version", (short) 1));
+ mkMap(mkEntry("min_version", (short) -1), mkEntry("max_version", (short) 0));
assertThrows(
IllegalArgumentException.class,
() -> SupportedVersionRange.fromMap(invalidWithBadMinVersion));
- // max_version can't be < 1.
+ // max_version can't be < 0.
Map invalidWithBadMaxVersion =
- mkMap(mkEntry("min_version", (short) 1), mkEntry("max_version", (short) 0));
+ mkMap(mkEntry("min_version", (short) 0), mkEntry("max_version", (short) -1));
assertThrows(
IllegalArgumentException.class,
() -> SupportedVersionRange.fromMap(invalidWithBadMaxVersion));
- // min_version and max_version can't be < 1.
+ // min_version and max_version can't be < 0.
Map invalidWithBadMinMaxVersion =
- mkMap(mkEntry("min_version", (short) 0), mkEntry("max_version", (short) 0));
+ mkMap(mkEntry("min_version", (short) -1), mkEntry("max_version", (short) -1));
assertThrows(
IllegalArgumentException.class,
() -> SupportedVersionRange.fromMap(invalidWithBadMinMaxVersion));
diff --git a/clients/src/test/java/org/apache/kafka/common/record/ControlRecordTypeTest.java b/clients/src/test/java/org/apache/kafka/common/record/ControlRecordTypeTest.java
index fcf6ffd3ab81..d23fe45932f6 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/ControlRecordTypeTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/ControlRecordTypeTest.java
@@ -16,10 +16,10 @@
*/
package org.apache.kafka.common.record;
-import org.junit.jupiter.api.Test;
-
import java.nio.ByteBuffer;
-
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class ControlRecordTypeTest {
@@ -45,4 +45,14 @@ public void testParseUnknownVersion() {
assertEquals(ControlRecordType.ABORT, type);
}
+ @ParameterizedTest
+ @EnumSource(value = ControlRecordType.class)
+ public void testRoundTrip(ControlRecordType expected) {
+ ByteBuffer buffer = ByteBuffer.allocate(32);
+ buffer.putShort(ControlRecordType.CURRENT_CONTROL_RECORD_KEY_VERSION);
+ buffer.putShort(expected.type());
+ buffer.flip();
+
+ assertEquals(expected, ControlRecordType.parse(buffer));
+ }
}
diff --git a/clients/src/test/java/org/apache/kafka/common/record/ControlRecordUtilsTest.java b/clients/src/test/java/org/apache/kafka/common/record/ControlRecordUtilsTest.java
index fc95f531512a..462660bdc58c 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/ControlRecordUtilsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/ControlRecordUtilsTest.java
@@ -19,9 +19,11 @@
import java.nio.ByteBuffer;
import java.util.Collections;
import org.apache.kafka.common.message.LeaderChangeMessage.Voter;
+import org.apache.kafka.common.message.KRaftVersionRecord;
import org.apache.kafka.common.message.LeaderChangeMessage;
import org.apache.kafka.common.message.SnapshotFooterRecord;
import org.apache.kafka.common.message.SnapshotHeaderRecord;
+import org.apache.kafka.common.message.VotersRecord;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.ObjectSerializationCache;
import org.junit.jupiter.api.Test;
@@ -46,6 +48,14 @@ public void testCurrentVersions() {
SnapshotFooterRecord.HIGHEST_SUPPORTED_VERSION,
ControlRecordUtils.SNAPSHOT_FOOTER_CURRENT_VERSION
);
+ assertEquals(
+ KRaftVersionRecord.HIGHEST_SUPPORTED_VERSION,
+ ControlRecordUtils.KRAFT_VERSION_CURRENT_VERSION
+ );
+ assertEquals(
+ VotersRecord.HIGHEST_SUPPORTED_VERSION,
+ ControlRecordUtils.KRAFT_VOTERS_CURRENT_VERSION
+ );
}
@Test
diff --git a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
index 0ea3301a5b88..15f87b9c247e 100644
--- a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
+++ b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
@@ -32,7 +32,13 @@ import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
import org.apache.kafka.raft.{Isolation, KafkaRaftClient, LogAppendInfo, LogFetchInfo, LogOffsetMetadata, OffsetAndEpoch, OffsetMetadata, ReplicatedLog, ValidOffsetAndEpoch}
import org.apache.kafka.server.config.{KRaftConfigs, ServerLogConfigs}
import org.apache.kafka.server.util.Scheduler
-import org.apache.kafka.snapshot.{FileRawSnapshotReader, FileRawSnapshotWriter, RawSnapshotReader, RawSnapshotWriter, SnapshotPath, Snapshots}
+import org.apache.kafka.snapshot.FileRawSnapshotReader
+import org.apache.kafka.snapshot.FileRawSnapshotWriter
+import org.apache.kafka.snapshot.NotifyingRawSnapshotWriter
+import org.apache.kafka.snapshot.RawSnapshotReader
+import org.apache.kafka.snapshot.RawSnapshotWriter
+import org.apache.kafka.snapshot.SnapshotPath
+import org.apache.kafka.snapshot.Snapshots
import org.apache.kafka.storage.internals
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchIsolation, LogConfig, LogDirFailureChannel, LogStartOffsetIncrementReason, ProducerStateManagerConfig}
@@ -264,10 +270,10 @@ final class KafkaMetadataLog private (
)
}
- storeSnapshot(snapshotId)
+ createNewSnapshotUnchecked(snapshotId)
}
- override def storeSnapshot(snapshotId: OffsetAndEpoch): Optional[RawSnapshotWriter] = {
+ override def createNewSnapshotUnchecked(snapshotId: OffsetAndEpoch): Optional[RawSnapshotWriter] = {
val containsSnapshotId = snapshots synchronized {
snapshots.contains(snapshotId)
}
@@ -275,7 +281,12 @@ final class KafkaMetadataLog private (
if (containsSnapshotId) {
Optional.empty()
} else {
- Optional.of(FileRawSnapshotWriter.create(log.dir.toPath, snapshotId, Optional.of(this)))
+ Optional.of(
+ new NotifyingRawSnapshotWriter(
+ FileRawSnapshotWriter.create(log.dir.toPath, snapshotId),
+ onSnapshotFrozen
+ )
+ )
}
}
diff --git a/core/src/main/scala/kafka/raft/RaftManager.scala b/core/src/main/scala/kafka/raft/RaftManager.scala
index f5cd6ba5aedf..0430952e17cb 100644
--- a/core/src/main/scala/kafka/raft/RaftManager.scala
+++ b/core/src/main/scala/kafka/raft/RaftManager.scala
@@ -17,11 +17,12 @@
package kafka.raft
import java.io.File
+import java.net.InetSocketAddress
import java.nio.file.Files
import java.nio.file.Paths
-import java.util
import java.util.OptionalInt
import java.util.concurrent.CompletableFuture
+import java.util.{Map => JMap}
import kafka.log.LogManager
import kafka.log.UnifiedLog
import kafka.server.KafkaConfig
@@ -40,7 +41,6 @@ import org.apache.kafka.common.requests.RequestHeader
import org.apache.kafka.common.security.JaasContext
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.{LogContext, Time, Utils}
-import org.apache.kafka.raft.QuorumConfig.{AddressSpec, InetAddressSpec, NON_ROUTABLE_ADDRESS, UnknownAddressSpec}
import org.apache.kafka.raft.{FileBasedStateStore, KafkaNetworkChannel, KafkaRaftClient, KafkaRaftClientDriver, LeaderAndEpoch, RaftClient, QuorumConfig, ReplicatedLog}
import org.apache.kafka.server.ProcessRole
import org.apache.kafka.server.common.serialization.RecordSerde
@@ -144,7 +144,7 @@ class KafkaRaftManager[T](
time: Time,
metrics: Metrics,
threadNamePrefixOpt: Option[String],
- val controllerQuorumVotersFuture: CompletableFuture[util.Map[Integer, AddressSpec]],
+ val controllerQuorumVotersFuture: CompletableFuture[JMap[Integer, InetSocketAddress]],
fatalFaultHandler: FaultHandler
) extends RaftManager[T] with Logging {
@@ -181,20 +181,12 @@ class KafkaRaftManager[T](
private val clientDriver = new KafkaRaftClientDriver[T](client, threadNamePrefix, fatalFaultHandler, logContext)
def startup(): Unit = {
- // Update the voter endpoints (if valid) with what's in RaftConfig
- val voterAddresses: util.Map[Integer, AddressSpec] = controllerQuorumVotersFuture.get()
- for (voterAddressEntry <- voterAddresses.entrySet.asScala) {
- voterAddressEntry.getValue match {
- case spec: InetAddressSpec =>
- netChannel.updateEndpoint(voterAddressEntry.getKey, spec)
- case _: UnknownAddressSpec =>
- info(s"Skipping channel update for destination ID: ${voterAddressEntry.getKey} " +
- s"because of non-routable endpoint: ${NON_ROUTABLE_ADDRESS.toString}")
- case invalid: AddressSpec =>
- warn(s"Unexpected address spec (type: ${invalid.getClass}) for channel update for " +
- s"destination ID: ${voterAddressEntry.getKey}")
- }
- }
+ client.initialize(
+ controllerQuorumVotersFuture.get(),
+ config.controllerListenerNames.head,
+ new FileBasedStateStore(new File(dataDir, FileBasedStateStore.DEFAULT_FILE_NAME)),
+ metrics
+ )
netChannel.start()
clientDriver.start()
}
@@ -224,23 +216,17 @@ class KafkaRaftManager[T](
}
private def buildRaftClient(): KafkaRaftClient[T] = {
- val quorumStateStore = new FileBasedStateStore(new File(dataDir, "quorum-state"))
- val nodeId = OptionalInt.of(config.nodeId)
-
val client = new KafkaRaftClient(
+ OptionalInt.of(config.nodeId),
recordSerde,
netChannel,
replicatedLog,
- quorumStateStore,
time,
- metrics,
expirationService,
logContext,
clusterId,
- nodeId,
raftConfig
)
- client.initialize()
client
}
@@ -267,7 +253,10 @@ class KafkaRaftManager[T](
private def buildNetworkClient(): NetworkClient = {
val controllerListenerName = new ListenerName(config.controllerListenerNames.head)
- val controllerSecurityProtocol = config.effectiveListenerSecurityProtocolMap.getOrElse(controllerListenerName, SecurityProtocol.forName(controllerListenerName.value()))
+ val controllerSecurityProtocol = config.effectiveListenerSecurityProtocolMap.getOrElse(
+ controllerListenerName,
+ SecurityProtocol.forName(controllerListenerName.value())
+ )
val channelBuilder = ChannelBuilders.clientChannelBuilder(
controllerSecurityProtocol,
JaasContext.Type.SERVER,
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 76d0d9d3c7e2..e433741463f0 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -1346,14 +1346,14 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
val advertisedListenerNames = effectiveAdvertisedListeners.map(_.listenerName).toSet
// validate KRaft-related configs
- val voterAddressSpecsByNodeId = QuorumConfig.parseVoterConnections(quorumVoters)
+ val voterIds = QuorumConfig.parseVoterIds(quorumVoters)
def validateNonEmptyQuorumVotersForKRaft(): Unit = {
- if (voterAddressSpecsByNodeId.isEmpty) {
+ if (voterIds.isEmpty) {
throw new ConfigException(s"If using ${KRaftConfigs.PROCESS_ROLES_CONFIG}, ${QuorumConfig.QUORUM_VOTERS_CONFIG} must contain a parseable set of voters.")
}
}
def validateNonEmptyQuorumVotersForMigration(): Unit = {
- if (voterAddressSpecsByNodeId.isEmpty) {
+ if (voterIds.isEmpty) {
throw new ConfigException(s"If using ${KRaftConfigs.MIGRATION_ENABLED_CONFIG}, ${QuorumConfig.QUORUM_VOTERS_CONFIG} must contain a parseable set of voters.")
}
}
@@ -1366,8 +1366,8 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
s"The advertised.listeners config must not contain KRaft controller listeners from ${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} when ${KRaftConfigs.PROCESS_ROLES_CONFIG} contains the broker role because Kafka clients that send requests via advertised listeners do not send requests to KRaft controllers -- they only send requests to KRaft brokers.")
}
def validateControllerQuorumVotersMustContainNodeIdForKRaftController(): Unit = {
- require(voterAddressSpecsByNodeId.containsKey(nodeId),
- s"If ${KRaftConfigs.PROCESS_ROLES_CONFIG} contains the 'controller' role, the node id $nodeId must be included in the set of voters ${QuorumConfig.QUORUM_VOTERS_CONFIG}=${voterAddressSpecsByNodeId.asScala.keySet.toSet}")
+ require(voterIds.contains(nodeId),
+ s"If ${KRaftConfigs.PROCESS_ROLES_CONFIG} contains the 'controller' role, the node id $nodeId must be included in the set of voters ${QuorumConfig.QUORUM_VOTERS_CONFIG}=${voterIds.asScala.toSet}")
}
def validateControllerListenerExistsForKRaftController(): Unit = {
require(controllerListeners.nonEmpty,
@@ -1389,8 +1389,8 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
validateControlPlaneListenerEmptyForKRaft()
validateAdvertisedListenersDoesNotContainControllerListenersForKRaftBroker()
// nodeId must not appear in controller.quorum.voters
- require(!voterAddressSpecsByNodeId.containsKey(nodeId),
- s"If ${KRaftConfigs.PROCESS_ROLES_CONFIG} contains just the 'broker' role, the node id $nodeId must not be included in the set of voters ${QuorumConfig.QUORUM_VOTERS_CONFIG}=${voterAddressSpecsByNodeId.asScala.keySet.toSet}")
+ require(!voterIds.contains(nodeId),
+ s"If ${KRaftConfigs.PROCESS_ROLES_CONFIG} contains just the 'broker' role, the node id $nodeId must not be included in the set of voters ${QuorumConfig.QUORUM_VOTERS_CONFIG}=${voterIds.asScala.toSet}")
// controller.listener.names must be non-empty...
require(controllerListenerNames.nonEmpty,
s"${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} must contain at least one value when running KRaft with just the broker role")
diff --git a/core/src/main/scala/kafka/server/KafkaRaftServer.scala b/core/src/main/scala/kafka/server/KafkaRaftServer.scala
index 69ec5284f4d3..d3200149f7a3 100644
--- a/core/src/main/scala/kafka/server/KafkaRaftServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaRaftServer.scala
@@ -65,15 +65,12 @@ class KafkaRaftServer(
metaPropsEnsemble.clusterId().get()
)
- private val controllerQuorumVotersFuture = CompletableFuture.completedFuture(
- QuorumConfig.parseVoterConnections(config.quorumVoters))
-
private val sharedServer = new SharedServer(
config,
metaPropsEnsemble,
time,
metrics,
- controllerQuorumVotersFuture,
+ CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(config.quorumVoters)),
new StandardFaultHandlerFactory(),
)
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 1a88c79d8987..4a490d7bbdb0 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -323,7 +323,8 @@ class KafkaServer(
if (config.migrationEnabled) {
kraftControllerNodes = QuorumConfig.voterConnectionsToNodes(
- QuorumConfig.parseVoterConnections(config.quorumVoters)).asScala
+ QuorumConfig.parseVoterConnections(config.quorumVoters)
+ ).asScala
} else {
kraftControllerNodes = Seq.empty
}
@@ -427,8 +428,7 @@ class KafkaServer(
logger.info("Successfully deleted local metadata log. It will be re-created.")
// If the ZK broker is in migration mode, start up a RaftManager to learn about the new KRaft controller
- val controllerQuorumVotersFuture = CompletableFuture.completedFuture(
- QuorumConfig.parseVoterConnections(config.quorumVoters))
+ val quorumVoters = QuorumConfig.parseVoterConnections(config.quorumVoters)
raftManager = new KafkaRaftManager[ApiMessageAndVersion](
metaPropsEnsemble.clusterId().get(),
config,
@@ -438,10 +438,10 @@ class KafkaServer(
time,
metrics,
threadNamePrefix,
- controllerQuorumVotersFuture,
+ CompletableFuture.completedFuture(quorumVoters),
fatalFaultHandler = new LoggingFaultHandler("raftManager", () => shutdown())
)
- val controllerNodes = QuorumConfig.voterConnectionsToNodes(controllerQuorumVotersFuture.get()).asScala
+ val controllerNodes = QuorumConfig.voterConnectionsToNodes(quorumVoters).asScala
val quorumControllerNodeProvider = RaftControllerNodeProvider(raftManager, config, controllerNodes)
val brokerToQuorumChannelManager = new NodeToControllerChannelManagerImpl(
controllerNodeProvider = quorumControllerNodeProvider,
diff --git a/core/src/main/scala/kafka/server/SharedServer.scala b/core/src/main/scala/kafka/server/SharedServer.scala
index 4cc05e7e9ebd..e2a5e338f7a6 100644
--- a/core/src/main/scala/kafka/server/SharedServer.scala
+++ b/core/src/main/scala/kafka/server/SharedServer.scala
@@ -31,16 +31,17 @@ import org.apache.kafka.image.publisher.{SnapshotEmitter, SnapshotGenerator}
import org.apache.kafka.image.publisher.metrics.SnapshotEmitterMetrics
import org.apache.kafka.metadata.MetadataRecordSerde
import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble
-import org.apache.kafka.raft.QuorumConfig.AddressSpec
import org.apache.kafka.server.ProcessRole
import org.apache.kafka.server.common.ApiMessageAndVersion
import org.apache.kafka.server.fault.{FaultHandler, LoggingFaultHandler, ProcessTerminatingFaultHandler}
import org.apache.kafka.server.metrics.KafkaYammerMetrics
-import java.util
+import java.net.InetSocketAddress
+import java.util.Arrays
import java.util.Optional
-import java.util.concurrent.{CompletableFuture, TimeUnit}
import java.util.concurrent.atomic.AtomicReference
+import java.util.concurrent.{CompletableFuture, TimeUnit}
+import java.util.{Map => JMap}
/**
@@ -92,7 +93,7 @@ class SharedServer(
val metaPropsEnsemble: MetaPropertiesEnsemble,
val time: Time,
private val _metrics: Metrics,
- val controllerQuorumVotersFuture: CompletableFuture[util.Map[Integer, AddressSpec]],
+ val controllerQuorumVotersFuture: CompletableFuture[JMap[Integer, InetSocketAddress]],
val faultHandlerFactory: FaultHandlerFactory
) extends Logging {
private val logContext: LogContext = new LogContext(s"[SharedServer id=${sharedServerConfig.nodeId}] ")
@@ -303,7 +304,7 @@ class SharedServer(
setThreadNamePrefix(s"kafka-${sharedServerConfig.nodeId}-").
build()
try {
- loader.installPublishers(util.Arrays.asList(snapshotGenerator)).get()
+ loader.installPublishers(Arrays.asList(snapshotGenerator)).get()
} catch {
case t: Throwable => {
error("Unable to install metadata publishers", t)
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index 8aa5995612cc..613bd0d95e13 100755
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -25,7 +25,10 @@ import kafka.log._
import kafka.serializer.Decoder
import kafka.utils._
import kafka.utils.Implicits._
-import org.apache.kafka.common.message.{SnapshotFooterRecordJsonConverter, SnapshotHeaderRecordJsonConverter}
+import org.apache.kafka.common.message.KRaftVersionRecordJsonConverter
+import org.apache.kafka.common.message.SnapshotFooterRecordJsonConverter
+import org.apache.kafka.common.message.SnapshotHeaderRecordJsonConverter
+import org.apache.kafka.common.message.VotersRecordJsonConverter
import org.apache.kafka.common.metadata.{MetadataJsonConverters, MetadataRecordType}
import org.apache.kafka.common.protocol.ByteBufferAccessor
import org.apache.kafka.common.record._
@@ -307,6 +310,12 @@ object DumpLogSegments {
case ControlRecordType.SNAPSHOT_FOOTER =>
val footer = ControlRecordUtils.deserializeSnapshotFooterRecord(record)
print(s" SnapshotFooter ${SnapshotFooterRecordJsonConverter.write(footer, footer.version())}")
+ case ControlRecordType.KRAFT_VERSION =>
+ val kraftVersion = ControlRecordUtils.deserializeKRaftVersionRecord(record)
+ print(s" KRaftVersion ${KRaftVersionRecordJsonConverter.write(kraftVersion, kraftVersion.version())}")
+ case ControlRecordType.KRAFT_VOTERS=>
+ val voters = ControlRecordUtils.deserializeVotersRecord(record)
+ print(s" KRaftVoters ${VotersRecordJsonConverter.write(voters, voters.version())}")
case controlType =>
print(s" controlType: $controlType($controlTypeId)")
}
diff --git a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
index 0c4cae2d7ea7..621b69094a2d 100644
--- a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
+++ b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
@@ -90,7 +90,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
*/
private static class ControllerQuorumVotersFutureManager implements AutoCloseable {
private final int expectedControllers;
- private final CompletableFuture