Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16207; KRaft's internal log listener to update voter set #15671

Merged
merged 41 commits into from
May 4, 2024
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
dd84479
KAFKA-16207; Add KRaftVersionRecord and VotersRecord
jsancio Jan 31, 2024
87a43c6
Some observation while reviewing the KIP and implementation
jsancio Feb 14, 2024
c9de8a1
Fix compilation errors
jsancio Mar 25, 2024
54b164e
KAFKA-16207; Match KIP schema
jsancio Mar 25, 2024
04fcf4c
KAFKA-16207; Implement VoterSet and VoterSetHistory
jsancio Mar 27, 2024
702e739
KAFKA-16207; Add impl for internal log listener
jsancio Mar 28, 2024
fae8b84
KAFKA-16207; Add historical kraft version
jsancio Mar 28, 2024
25d9dfd
KAFKA-16207; Import refactor to delay voter address evaluation
jsancio Apr 4, 2024
b3e65bd
Merge remote-tracking branch 'upstream/trunk' into kafka-16207-intern…
jsancio Apr 5, 2024
9a6b4c2
KAFKA-16207; Fix core and metadata after raft changes
jsancio Apr 5, 2024
24661b1
KAFKA-16207; More voter parsing clean ups
jsancio Apr 5, 2024
cd3277c
KAFKA-16207; User the voter set from the config or log
jsancio Apr 5, 2024
f3a0fcd
KAFKA-16207; Return node id before calling initialize
jsancio Apr 7, 2024
08e1831
KAFKA-16207; Add a builder for record snaapshot writer
jsancio Apr 7, 2024
89ac698
KAFKA-16207; Include kraft.version and voter set in the snapshots
jsancio Apr 7, 2024
2889b42
KAFKA-16207; Use SupportedVersionRange
jsancio Apr 8, 2024
1188b94
KAFKA-16207; Fix version range documentation
jsancio Apr 8, 2024
92909fb
KAFKA-16207; Snapshots should return control records
jsancio Apr 8, 2024
dd28ef4
KAFKA-16207; Document internal listener invariant
jsancio Apr 8, 2024
deb7920
KAFKA-16207; Update some of the documentation
jsancio Apr 8, 2024
0db1a06
KAFKA-16207; Fix cluster snapshot test
jsancio Apr 9, 2024
ab1886a
KAFKA-16207; Implement truncating and prefix trimming the internal li…
jsancio Apr 9, 2024
a962188
KAFKA-16207; Finish documenting all of the type and methods
jsancio Apr 10, 2024
1829e06
KAFKA-16207; Implement some of the unittest
jsancio Apr 14, 2024
a108e9c
KAFKA-16207; Add 2 more test suites
jsancio Apr 15, 2024
795858f
KAFKA-16207; Make the control batch buffer configurable
jsancio Apr 16, 2024
a79660e
KAFKA-16207; Add tests for snapshot builder
jsancio Apr 16, 2024
625aabd
KAFKA-16207; Add test for internal partition listener
jsancio Apr 16, 2024
855a13e
Merge remote-tracking branch 'upstream/trunk' into kafka-16207-intern…
jsancio Apr 17, 2024
629c7da
Merge remote-tracking branch 'upstream/trunk' into kafka-16207-intern…
jsancio Apr 22, 2024
bc26f59
Merge remote-tracking branch 'upstream/trunk' into kafka-16207-intern…
jsancio Apr 24, 2024
b6fc7f6
KAFKA-16207; Use directory id instead of uuid
jsancio Apr 26, 2024
742c969
Merge remote-tracking branch 'upstream/trunk' into kafka-16207-intern…
jsancio Apr 26, 2024
e55564d
KAFKA-16207; Many fixes and improvements
jsancio Apr 26, 2024
de19062
KAFKA-16207; Rename PartitionListener
jsancio Apr 27, 2024
66c3793
Merge remote-tracking branch 'upstream/trunk' into kafka-16207-intern…
jsancio Apr 27, 2024
c821a83
KAFKA-16207; Review feedback changes
jsancio May 1, 2024
c2a7a9a
KAFKA-16207; Better validate the control records in the accumulator
jsancio May 2, 2024
d1c17e0
Merge remote-tracking branch 'upstream/trunk' into kafka-16207-intern…
jsancio May 2, 2024
bc80295
KAFKA-16207; Minor fix
jsancio May 2, 2024
c363bf1
KAFKA-16207; kraft version default of 1 for tests
jsancio May 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -456,12 +456,13 @@
<allow pkg="org.apache.kafka.snapshot" />
<allow pkg="org.apache.kafka.clients" />
<allow pkg="org.apache.kafka.common.config" />
<allow pkg="org.apache.kafka.common.feature" />
<allow pkg="org.apache.kafka.common.message" />
<allow pkg="org.apache.kafka.common.metadata" />
<allow pkg="org.apache.kafka.common.metrics" />
<allow pkg="org.apache.kafka.common.protocol" />
<allow pkg="org.apache.kafka.common.record" />
<allow pkg="org.apache.kafka.common.requests" />
<allow pkg="org.apache.kafka.common.protocol" />
<allow pkg="org.apache.kafka.server.common" />
<allow pkg="org.apache.kafka.server.common.serialization" />
<allow pkg="org.apache.kafka.server.fault"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class SupportedVersionRange {

/**
* Raises an exception unless the following conditions are met:
* 1 <= minVersion <= maxVersion.
* 0 <= minVersion <= maxVersion.
*
* @param minVersion The minimum version value.
* @param maxVersion The maximum version value.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
/**
* Represents an immutable basic version range using 2 attributes: min and max, each of type short.
* The min and max attributes need to satisfy 2 rules:
* - they are each expected to be >= 1, as we only consider positive version values to be valid.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was it a bug that we only allowed version 1 and above? I'm wondering if we really need to change it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I think this was missed during the original implementation. The default value for any feature version is 0 but that cannot be expressed in the range of supported versions since it doesn't allow 0 as the min or max value.

* - they are each expected to be >= 0, as we only consider positive version values to be valid.
* - max should be >= min.
*
* The class also provides API to convert the version range to a map.
Expand All @@ -48,22 +48,22 @@ class BaseVersionRange {

/**
* Raises an exception unless the following condition is met:
* minValue >= 1 and maxValue >= 1 and maxValue >= minValue.
* minValue >= 0 and maxValue >= 0 and maxValue >= minValue.
*
* @param minKeyLabel Label for the min version key, that's used only to convert to/from a map.
* @param minValue The minimum version value.
* @param maxKeyLabel Label for the max version key, that's used only to convert to/from a map.
* @param maxValue The maximum version value.
*
* @throws IllegalArgumentException If any of the following conditions are true:
* - (minValue < 1) OR (maxValue < 1) OR (maxValue < minValue).
* - (minValue < 0) OR (maxValue < 0) OR (maxValue < minValue).
* - minKeyLabel is empty, OR, minKeyLabel is empty.
*/
protected BaseVersionRange(String minKeyLabel, short minValue, String maxKeyLabel, short maxValue) {
if (minValue < 1 || maxValue < 1 || maxValue < minValue) {
if (minValue < 0 || maxValue < 0 || maxValue < minValue) {
throw new IllegalArgumentException(
String.format(
"Expected minValue >= 1, maxValue >= 1 and maxValue >= minValue, but received" +
"Expected minValue >= 0, maxValue >= 0 and maxValue >= minValue, but received" +
" minValue: %d, maxValue: %d", minValue, maxValue));
}
if (minKeyLabel.isEmpty()) {
Expand All @@ -86,6 +86,7 @@ public short max() {
return maxValue;
}

@Override
public String toString() {
return String.format(
"%s[%s]",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public SupportedVersionRange(short minVersion, short maxVersion) {
}

public SupportedVersionRange(short maxVersion) {
this((short) 1, maxVersion);
this((short) 0, maxVersion);
}

public static SupportedVersionRange fromMap(Map<String, Short> versionRangeMap) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,15 @@ public enum ControlRecordType {
ABORT((short) 0),
COMMIT((short) 1),

// Raft quorum related control messages.
// KRaft quorum related control messages
LEADER_CHANGE((short) 2),
SNAPSHOT_HEADER((short) 3),
SNAPSHOT_FOOTER((short) 4),

// KRaft membership changes messages
KRAFT_VERSION((short) 5),
VOTERS((short) 6),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe we can start using the prefix consistently. KRAFT_VOTERS?

Copy link
Member Author

@jsancio jsancio Apr 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. Fixed for KRAFT_VOTERS. I'll fix the rest in another PR.


// UNKNOWN is used to indicate a control type which the client is not aware of and should be ignored
UNKNOWN((short) -1);

Expand Down Expand Up @@ -108,6 +112,10 @@ public static ControlRecordType fromTypeId(short typeId) {
return SNAPSHOT_HEADER;
case 4:
return SNAPSHOT_FOOTER;
case 5:
return KRAFT_VERSION;
case 6:
return VOTERS;

default:
return UNKNOWN;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@
*/
package org.apache.kafka.common.record;

import org.apache.kafka.common.message.KRaftVersionRecord;
import org.apache.kafka.common.message.LeaderChangeMessage;
import org.apache.kafka.common.message.SnapshotHeaderRecord;
import org.apache.kafka.common.message.SnapshotFooterRecord;
import org.apache.kafka.common.message.SnapshotHeaderRecord;
import org.apache.kafka.common.message.VotersRecord;
import org.apache.kafka.common.protocol.ByteBufferAccessor;

import java.nio.ByteBuffer;
Expand All @@ -27,49 +29,77 @@
* Utility class for easy interaction with control records.
*/
public class ControlRecordUtils {
public static final short KRAFT_VERSION_CURRENT_VERSION = 0;
public static final short LEADER_CHANGE_CURRENT_VERSION = 0;
public static final short SNAPSHOT_HEADER_CURRENT_VERSION = 0;
public static final short SNAPSHOT_FOOTER_CURRENT_VERSION = 0;
public static final short SNAPSHOT_HEADER_CURRENT_VERSION = 0;
public static final short VOTERS_CURRENT_VERSION = 0;

public static LeaderChangeMessage deserializeLeaderChangeMessage(Record record) {
ControlRecordType recordType = ControlRecordType.parse(record.key());
if (recordType != ControlRecordType.LEADER_CHANGE) {
throw new IllegalArgumentException(
"Expected LEADER_CHANGE control record type(2), but found " + recordType.toString());
}
validateControlRecordType(ControlRecordType.LEADER_CHANGE, recordType);

return deserializeLeaderChangeMessage(record.value());
}

public static LeaderChangeMessage deserializeLeaderChangeMessage(ByteBuffer data) {
ByteBufferAccessor byteBufferAccessor = new ByteBufferAccessor(data.slice());
return new LeaderChangeMessage(byteBufferAccessor, LEADER_CHANGE_CURRENT_VERSION);
return new LeaderChangeMessage(new ByteBufferAccessor(data.slice()), LEADER_CHANGE_CURRENT_VERSION);
}

public static SnapshotHeaderRecord deserializeSnapshotHeaderRecord(Record record) {
ControlRecordType recordType = ControlRecordType.parse(record.key());
if (recordType != ControlRecordType.SNAPSHOT_HEADER) {
throw new IllegalArgumentException(
"Expected SNAPSHOT_HEADER control record type(3), but found " + recordType.toString());
}
validateControlRecordType(ControlRecordType.SNAPSHOT_HEADER, recordType);

return deserializeSnapshotHeaderRecord(record.value());
}

public static SnapshotHeaderRecord deserializeSnapshotHeaderRecord(ByteBuffer data) {
ByteBufferAccessor byteBufferAccessor = new ByteBufferAccessor(data.slice());
return new SnapshotHeaderRecord(byteBufferAccessor, SNAPSHOT_HEADER_CURRENT_VERSION);
return new SnapshotHeaderRecord(new ByteBufferAccessor(data.slice()), SNAPSHOT_HEADER_CURRENT_VERSION);
}

public static SnapshotFooterRecord deserializeSnapshotFooterRecord(Record record) {
ControlRecordType recordType = ControlRecordType.parse(record.key());
if (recordType != ControlRecordType.SNAPSHOT_FOOTER) {
throw new IllegalArgumentException(
"Expected SNAPSHOT_FOOTER control record type(4), but found " + recordType.toString());
}
validateControlRecordType(ControlRecordType.SNAPSHOT_FOOTER, recordType);

return deserializeSnapshotFooterRecord(record.value());
}

public static SnapshotFooterRecord deserializeSnapshotFooterRecord(ByteBuffer data) {
ByteBufferAccessor byteBufferAccessor = new ByteBufferAccessor(data.slice());
return new SnapshotFooterRecord(byteBufferAccessor, SNAPSHOT_FOOTER_CURRENT_VERSION);
return new SnapshotFooterRecord(new ByteBufferAccessor(data.slice()), SNAPSHOT_FOOTER_CURRENT_VERSION);
}

public static KRaftVersionRecord deserializeKRaftVersionRecord(Record record) {
ControlRecordType recordType = ControlRecordType.parse(record.key());
validateControlRecordType(ControlRecordType.KRAFT_VERSION, recordType);

return deserializeKRaftVersionRecord(record.value());
}

public static KRaftVersionRecord deserializeKRaftVersionRecord(ByteBuffer data) {
return new KRaftVersionRecord(new ByteBufferAccessor(data.slice()), KRAFT_VERSION_CURRENT_VERSION);
}

public static VotersRecord deserializeVotersRecord(Record record) {
ControlRecordType recordType = ControlRecordType.parse(record.key());
validateControlRecordType(ControlRecordType.VOTERS, recordType);

return deserializeVotersRecord(record.value());
}

public static VotersRecord deserializeVotersRecord(ByteBuffer data) {
return new VotersRecord(new ByteBufferAccessor(data.slice()), VOTERS_CURRENT_VERSION);
}

private static void validateControlRecordType(ControlRecordType expected, ControlRecordType actual) {
if (actual != expected) {
throw new IllegalArgumentException(
String.format(
"Expected %s control record type(%d), but found %s",
expected,
expected.type(),
actual
)
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.CorruptRecordException;
import org.apache.kafka.common.message.KRaftVersionRecord;
import org.apache.kafka.common.message.LeaderChangeMessage;
import org.apache.kafka.common.message.SnapshotHeaderRecord;
import org.apache.kafka.common.message.SnapshotFooterRecord;
import org.apache.kafka.common.message.SnapshotHeaderRecord;
import org.apache.kafka.common.message.VotersRecord;
import org.apache.kafka.common.network.TransferableChannel;
import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention;
import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetentionResult;
Expand Down Expand Up @@ -154,7 +156,7 @@ public FilterResult filterTo(TopicPartition partition, RecordFilter filter, Byte

/**
* Note: This method is also used to convert the first timestamp of the batch (which is usually the timestamp of the first record)
* to the delete horizon of the tombstones or txn markers which are present in the batch.
* to the delete horizon of the tombstones or txn markers which are present in the batch.
*/
private static FilterResult filterTo(TopicPartition partition, Iterable<MutableRecordBatch> batches,
RecordFilter filter, ByteBuffer destinationBuffer, int maxRecordBatchSize,
Expand Down Expand Up @@ -807,4 +809,62 @@ private static void writeSnapshotFooterRecord(
builder.appendSnapshotFooterMessage(timestamp, snapshotFooterRecord);
}
}

public static MemoryRecords withKRaftVersionRecord(
long initialOffset,
long timestamp,
int leaderEpoch,
ByteBuffer buffer,
KRaftVersionRecord kraftVersionRecord
) {
writeKRaftVersionRecord(buffer, initialOffset, timestamp, leaderEpoch, kraftVersionRecord);
buffer.flip();
return MemoryRecords.readableRecords(buffer);
}

private static void writeKRaftVersionRecord(
ByteBuffer buffer,
long initialOffset,
long timestamp,
int leaderEpoch,
KRaftVersionRecord kraftVersionRecord
) {
try (MemoryRecordsBuilder builder = new MemoryRecordsBuilder(
buffer, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE,
TimestampType.CREATE_TIME, initialOffset, timestamp,
RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE,
false, true, leaderEpoch, buffer.capacity())
) {
builder.appendKRaftVersionMessage(timestamp, kraftVersionRecord);
}
}

public static MemoryRecords withVotersRecord(
long initialOffset,
long timestamp,
int leaderEpoch,
ByteBuffer buffer,
VotersRecord votersRecord
) {
writeVotersRecord(buffer, initialOffset, timestamp, leaderEpoch, votersRecord);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: This looks a little odd. We create two MemoryRecords instances. Why not just create one?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean we by "we create two MemoryRecords instances"?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are discarding the MemoryRecords created by the builder and creating a new one.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. I see that. Looks like this is an existing issue with existing control record builders. Let me fix the ones that are specific for KRaft. We can fix the other ones in another PR.

buffer.flip();
return MemoryRecords.readableRecords(buffer);
}

private static void writeVotersRecord(
ByteBuffer buffer,
long initialOffset,
long timestamp,
int leaderEpoch,
VotersRecord votersRecord
) {
try (MemoryRecordsBuilder builder = new MemoryRecordsBuilder(
buffer, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE,
TimestampType.CREATE_TIME, initialOffset, timestamp,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: as long as we're separating args into lines, how about one argument per line?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE,
false, true, leaderEpoch, buffer.capacity())
) {
builder.appendVotersMessage(timestamp, votersRecord);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@

import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.message.KRaftVersionRecord;
import org.apache.kafka.common.message.LeaderChangeMessage;
import org.apache.kafka.common.message.SnapshotHeaderRecord;
import org.apache.kafka.common.message.SnapshotFooterRecord;
import org.apache.kafka.common.message.SnapshotHeaderRecord;
import org.apache.kafka.common.message.VotersRecord;
import org.apache.kafka.common.protocol.MessageUtil;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.utils.ByteBufferOutputStream;
Expand Down Expand Up @@ -602,11 +604,12 @@ public void append(SimpleRecord record) {

/**
* Append a control record at the next sequential offset.
*
* @param timestamp The record timestamp
* @param type The control record type (cannot be UNKNOWN)
* @param value The control record value
*/
private void appendControlRecord(long timestamp, ControlRecordType type, ByteBuffer value) {
public void appendControlRecord(long timestamp, ControlRecordType type, ByteBuffer value) {
Struct keyStruct = type.recordKey();
ByteBuffer key = ByteBuffer.allocate(keyStruct.sizeOf());
keyStruct.writeTo(key);
Expand Down Expand Up @@ -650,6 +653,22 @@ public void appendSnapshotFooterMessage(long timestamp, SnapshotFooterRecord sna
);
}

public void appendKRaftVersionMessage(long timestamp, KRaftVersionRecord kraftVersionRecord) {
appendControlRecord(
timestamp,
ControlRecordType.KRAFT_VERSION,
MessageUtil.toByteBuffer(kraftVersionRecord, ControlRecordUtils.KRAFT_VERSION_CURRENT_VERSION)
);
}

public void appendVotersMessage(long timestamp, VotersRecord votersRecord) {
appendControlRecord(
timestamp,
ControlRecordType.VOTERS,
MessageUtil.toByteBuffer(votersRecord, ControlRecordUtils.VOTERS_CURRENT_VERSION)
);
}

/**
* Add a legacy record without doing offset/magic validation (this should only be used in testing).
* @param offset The offset of the record
Expand Down
27 changes: 27 additions & 0 deletions clients/src/main/resources/common/message/KRaftVersionRecord.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

{
"type": "data",
"name": "KRaftVersionRecord",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "Version", "type": "int16", "versions": "0+",
"about": "The version of the kraft version record" },
{ "name": "KRaftVersion", "type": "int16", "versions": "0+",
"about": "The kraft protocol version" }
]
}
Loading