-
Notifications
You must be signed in to change notification settings - Fork 13.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-16207; KRaft's internal log listener to update voter set #15671
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not all the way through, but thought I'd post the comments I have.
@@ -26,7 +26,7 @@ | |||
/** | |||
* Represents an immutable basic version range using 2 attributes: min and max, each of type short. | |||
* The min and max attributes need to satisfy 2 rules: | |||
* - they are each expected to be >= 1, as we only consider positive version values to be valid. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was it a bug that we only allowed version 1 and above? I'm wondering if we really need to change it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. I think this was missed during the original implementation. The default value for any feature version is 0 but that cannot be expressed in the range of supported versions since it doesn't allow 0 as the min or max value.
LEADER_CHANGE((short) 2), | ||
SNAPSHOT_HEADER((short) 3), | ||
SNAPSHOT_FOOTER((short) 4), | ||
|
||
// KRaft membership changes messages | ||
KRAFT_VERSION((short) 5), | ||
VOTERS((short) 6), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: maybe we can start using the prefix consistently. KRAFT_VOTERS
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good. Fixed for KRAFT_VOTERS. I'll fix the rest in another PR.
) { | ||
try (MemoryRecordsBuilder builder = new MemoryRecordsBuilder( | ||
buffer, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE, | ||
TimestampType.CREATE_TIME, initialOffset, timestamp, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: as long as we're separating args into lines, how about one argument per line?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
ByteBuffer buffer, | ||
VotersRecord votersRecord | ||
) { | ||
writeVotersRecord(buffer, initialOffset, timestamp, leaderEpoch, votersRecord); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: This looks a little odd. We create two MemoryRecords
instances. Why not just create one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you mean we by "we create two MemoryRecords instances"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are discarding the MemoryRecords
created by the builder and creating a new one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah. I see that. Looks like this is an existing issue with existing control record builders. Let me fix the ones that are specific for KRaft. We can fix the other ones in another PR.
raft/src/main/java/org/apache/kafka/raft/internals/PartitionListener.java
Outdated
Show resolved
Hide resolved
} | ||
|
||
private void handleBatch(Batch<?> batch, OptionalLong overrideOffset) { | ||
int index = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if it is better to get the record offsets from the Batch
object. This works because we know the partition is not compacted, but it still feels a bit suspicious.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The issue is that the offset in the batch are only meaningful when handling batches that come from the log. The offset of batches that come from the snapshot don't have any relation to the log offset. This is why we pass OptionalLong overrideOffset
.
I change this name to offsetDelta
.
raft/src/main/java/org/apache/kafka/raft/internals/History.java
Outdated
Show resolved
Hide resolved
raft/src/main/java/org/apache/kafka/raft/internals/TreeMapHistory.java
Outdated
Show resolved
Hide resolved
Optional<History.Entry<VoterSet>> result = votersHistory.lastEntry(); | ||
if (result.isPresent()) return result; | ||
|
||
return staticVoterSet.map(value -> new History.Entry<>(-1, value)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems a bit dangerous. Maybe we need a better type option. I think I had a comment about making History
generic in the comparable type as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm. I only implemented this so that VoterSetHistory
can implement History
. This method is not used by KRaftControlRecordStateMachine
and I don't think we'll need it in the future. I'll just remove this method and removeimplements History
.
client.initialize( | ||
controllerQuorumVotersFuture.get(), | ||
config.controllerListenerNames.head, | ||
new FileBasedStateStore(new File(dataDir, "quorum-state")), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can have a constant for the file name. Not sure if it is used elsewhere, but it would be nice to have a nice descriptive name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
return parseVoterConnections(voterEntries, false).keySet(); | ||
} | ||
|
||
private static Map<Integer, InetSocketAddress> parseVoterConnections(List<String> voterEntries, boolean routableOnly) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I think requireRoutableAddresses
might convey the expectation more clearly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
@@ -261,7 +260,7 @@ default long truncateToEndOffset(OffsetAndEpoch endOffset) { | |||
* @param snapshotId the end offset and epoch that identifies the snapshot | |||
* @return a writable snapshot if it doesn't already exist | |||
*/ | |||
Optional<RawSnapshotWriter> storeSnapshot(OffsetAndEpoch snapshotId); | |||
Optional<RawSnapshotWriter> createNewSnapshotUnchecked(OffsetAndEpoch snapshotId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need the method? What would happen if the leader did violate the expected invariant? Would this data structure update itself to remain consistent?
*/ | ||
private void appendControlMessage(Function<ByteBuffer, MemoryRecords> valueCreator) { | ||
public void appendControlMessages(Function<ByteBuffer, CreatedRecords> valueCreator) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we have made this public, I wonder if it would be useful to validate that the created batch is in fact a control batch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, I was trying to avoid decoding the MemoryRecords
. if we are going to read the first batch, we don't even need CreatedRecords
.
/** | ||
* A object tracks values of {@code T} at different offsets. | ||
*/ | ||
public interface History<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. In that case, perhaps we can qualify the general History
name. Perhaps it should be RecordHistory
or LogHistory
or something like that?
import static org.junit.jupiter.api.Assertions.assertEquals; | ||
import static org.junit.jupiter.api.Assertions.assertThrows; | ||
|
||
final public class VoterSetHistoryTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we have some cases where we try to add a non-overlapping voter set?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Good catch. Added two more tests.
private final RaftMetadataLogCleanerManager snapshotCleaner; | ||
|
||
private final Map<Listener<T>, ListenerContext> listenerContexts = new IdentityHashMap<>(); | ||
private final ConcurrentLinkedQueue<Registration<T>> pendingRegistrations = new ConcurrentLinkedQueue<>(); | ||
|
||
// These components need to be initialized by the method initialize() because they depend on the voter set | ||
/* | ||
* The key invariant for the kraft control record state machine is that it has always read to the LEO. This is achived by: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: achieved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
Map<Integer, InetSocketAddress> voterAddresses, | ||
String listenerName, | ||
QuorumStateStore quorumStateStore, | ||
Metrics metrics |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to delay initialization of metrics?
I can feel that the parameters for this method might grow over time. Perhaps we should package it up into some kind of initialization object.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah. I am also not happy with this move. We only do this delay initialization because of integration tests (QuorumTestHarness
, KRaftClusterTestKit
). This is not needed by **/src/main
.
Once we have KIP-853 fully implemented, I should be able to fix the integration tests to not use the static voter set and the delayed initialization.
I create Remove delayed initialization because of static voter set to track this work.
Metrics metrics | ||
) { | ||
partitionState = new KRaftControlRecordStateMachine( | ||
Optional.of(VoterSet.fromAddressSpecs(listenerName, voterAddresses)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: maybe we should rename the method fromInetSocketAddresses
or something like that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
} | ||
|
||
return log.createNewSnapshot(snapshotId).map(writer -> { | ||
long lastContainedLogOffset = snapshotId.offset() - 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: maybe we could have a helper for this logic
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. I have this issue KAFKA-14620 to introduce the SnapshotId
type. I can fix this on that PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
…e#15671) Adds support for the KafkaRaftClient to read the control records KRaftVersionRecord and VotersRecord in the snapshot and log. As the control records in the KRaft partition are read, the replica's known set of voters are updated. This change also contains the necessary changes to include the control records when a snapshot is generated by the KRaft state machine. It is important to note that this commit changes the code and the in-memory state to track the sets of voters but it doesn't change any data that is externally exposed. It doesn't change the RPCs, data stored on disk or configuration. When the KRaft replica starts the PartitionListener reads the latest snapshot and then log segments up to the LEO, updating the in-memory state as it reads KRaftVersionRecord and VotersRecord. When the replica (leader and follower) appends to the log, the PartitionListener catches up to the new LEO. When the replica truncates the log because of a diverging epoch, the PartitionListener also truncates the in-memory state to the new LEO. When the state machine generate a new snapshot the PartitionListener trims any prefix entries that are not needed. This is all done to minimize the amount of data tracked in-memory and to make sure that it matches the state on disk. To implement the functionality described above this commit also makes the following changes: Adds control records for KRaftVersionRecord and VotersRecord. KRaftVersionRecord describes the finalized kraft.version supported by all of the replicas. VotersRecords describes the set of voters at a specific offset. Changes Kafka's feature version to support 0 as the smallest valid value. This is needed because the default value for kraft.version is 0. Refactors FileRawSnapshotWriter so that it doesn't directly call the onSnapshotFrozen callback. It adds NotifyingRawSnapshotWriter for calling such callbacks. This reorganization is needed because in this change both the KafkaMetadataLog and the KafkaRaftClient need to react to snapshots getting frozen. Cleans up KafkaRaftClient's initialization. Removes initialize from RaftClient - this is an implementation detail that doesn't need to be exposed in the interface. Removes RaftConfig.AddressSpec and simplifies the bootstrapping of the static voter's address. The bootstrapping of the address is delayed because of tests. We should be able to simplify this further in future commits. Update the DumpLogSegment CLI to support the new control records KRaftVersionRecord and VotersRecord. Fix the RecordsSnapshotReader implementations so that the iterator includes control records. RecordsIterator is extended to support reading the new control records. Improve the BatchAccumulator implementation to allow multiple control records in one control batch. This is needed so that KRaft can make sure that VotersRecord is included in the same batch as the control record (KRaftVersionRecord) that upgrades the kraft.version to 1. Add a History interface and default implementation TreeMapHistory. This is used to track all of the sets of voters between the latest snapshot and the LEO. This is needed so that KafkaRaftClient can query for the latest set of voters and so that KafkaRaftClient can include the correct set of voters when the state machine generates a new snapshot at a given offset. Add a builder pattern for RecordsSnapshotWriter. The new builder pattern also implements including the KRaftVersionRecord and VotersRecord control records in the snapshot as necessary. A KRaftVersionRecord should be appended if the kraft.version is greater than 0 at the snapshot's offset. Similarly, a VotersRecord should be appended to the snapshot with the latest value up to the snapshot's offset. Reviewers: Jason Gustafson <jason@confluent.io>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jsancio : Thanks for the PR and sorry for the late review. Made a pass of non-testing files and left a few comments.
} | ||
|
||
private static void writeSnapshotFooterRecord( | ||
ByteBuffer buffer, | ||
private static MemoryRecordsBuilder createKraftControlReccordBuilder( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo Reccord
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I submitted this #15912 as a followup to these comments.
@@ -1470,6 +1524,10 @@ private boolean handleFetchSnapshotResponse( | |||
quorum.leaderIdOrSentinel() | |||
); | |||
|
|||
// This will aways reload the snapshot because the internal next offset |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo aways
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
* This type keeps track of changes to the finalized kraft.version and the sets of voters between | ||
* the latest snasphot and the log end offset. | ||
* | ||
* The are two actors/threads for this type. One is the KRaft driver which indirectly call a lot of |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The are => There are
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
// so there are no unknown voter connections. Report this metric as 0. | ||
kafkaRaftMetrics.updateNumUnknownVoterConnections(0); | ||
|
||
VoterSet lastVoterSet = partitionState.lastVoterSet(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be better to move this to before line 389 to avoid calling partitionState.lastVoterSet() multiple times?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
@@ -245,6 +255,42 @@ private void appendControlMessage(Function<ByteBuffer, MemoryRecords> valueCreat | |||
} | |||
} | |||
|
|||
private int validateMemoryRecordAndReturnCount(MemoryRecords memoryRecord) { | |||
// Confirm that it is at most one batch and it is a control record |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess we support a batch with more than one control record?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah. I fixed the comment and the implementation.
* A type for storing the historical value of the set of voters. | ||
* | ||
* This type can be use to keep track in-memory the sets for voters stored in the latest snapshot | ||
* and log. This is useful when generating a new snapshot at a given offset or when evaulating |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo evaulating
* offset of all previous calls to this method. | ||
* | ||
* @param offset the offset | ||
* @param value the value to store |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
value => voters
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
* Computes the value of the voter set at a given offset. | ||
* | ||
* This function will only return values provided through {@code addAt} and it would never | ||
* include the {@code staticVoterSet} provided through the constructoer. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo constructoer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
* Snapshots created using this method will be validated against the existing snapshots | ||
* and the replicated log. | ||
* The snapshot id will be validated against the existing snapshots and the log. The snapshot id | ||
* must not alread exist, it must be greater than the log start offset, it must be less than |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo alread
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
if (!batch.records().isEmpty()) { | ||
return Optional.of(batch); | ||
} | ||
return Optional.of(batch); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With this change, the while
statement doesn't loop. Should we remove while
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Good catch. We need an if
statement. I updated the java doc too.
…e#15671) Adds support for the KafkaRaftClient to read the control records KRaftVersionRecord and VotersRecord in the snapshot and log. As the control records in the KRaft partition are read, the replica's known set of voters are updated. This change also contains the necessary changes to include the control records when a snapshot is generated by the KRaft state machine. It is important to note that this commit changes the code and the in-memory state to track the sets of voters but it doesn't change any data that is externally exposed. It doesn't change the RPCs, data stored on disk or configuration. When the KRaft replica starts the PartitionListener reads the latest snapshot and then log segments up to the LEO, updating the in-memory state as it reads KRaftVersionRecord and VotersRecord. When the replica (leader and follower) appends to the log, the PartitionListener catches up to the new LEO. When the replica truncates the log because of a diverging epoch, the PartitionListener also truncates the in-memory state to the new LEO. When the state machine generate a new snapshot the PartitionListener trims any prefix entries that are not needed. This is all done to minimize the amount of data tracked in-memory and to make sure that it matches the state on disk. To implement the functionality described above this commit also makes the following changes: Adds control records for KRaftVersionRecord and VotersRecord. KRaftVersionRecord describes the finalized kraft.version supported by all of the replicas. VotersRecords describes the set of voters at a specific offset. Changes Kafka's feature version to support 0 as the smallest valid value. This is needed because the default value for kraft.version is 0. Refactors FileRawSnapshotWriter so that it doesn't directly call the onSnapshotFrozen callback. It adds NotifyingRawSnapshotWriter for calling such callbacks. This reorganization is needed because in this change both the KafkaMetadataLog and the KafkaRaftClient need to react to snapshots getting frozen. Cleans up KafkaRaftClient's initialization. Removes initialize from RaftClient - this is an implementation detail that doesn't need to be exposed in the interface. Removes RaftConfig.AddressSpec and simplifies the bootstrapping of the static voter's address. The bootstrapping of the address is delayed because of tests. We should be able to simplify this further in future commits. Update the DumpLogSegment CLI to support the new control records KRaftVersionRecord and VotersRecord. Fix the RecordsSnapshotReader implementations so that the iterator includes control records. RecordsIterator is extended to support reading the new control records. Improve the BatchAccumulator implementation to allow multiple control records in one control batch. This is needed so that KRaft can make sure that VotersRecord is included in the same batch as the control record (KRaftVersionRecord) that upgrades the kraft.version to 1. Add a History interface and default implementation TreeMapHistory. This is used to track all of the sets of voters between the latest snapshot and the LEO. This is needed so that KafkaRaftClient can query for the latest set of voters and so that KafkaRaftClient can include the correct set of voters when the state machine generates a new snapshot at a given offset. Add a builder pattern for RecordsSnapshotWriter. The new builder pattern also implements including the KRaftVersionRecord and VotersRecord control records in the snapshot as necessary. A KRaftVersionRecord should be appended if the kraft.version is greater than 0 at the snapshot's offset. Similarly, a VotersRecord should be appended to the snapshot with the latest value up to the snapshot's offset. Reviewers: Jason Gustafson <jason@confluent.io>
Adds support for the KafkaRaftClient to read the control records KRaftVersionRecord and VotersRecord in the snapshot and log. As the control records in the KRaft partition are read, the replica's known set of voters are updated. This change also contains the necessary changes to include the control records when a snapshot is generated by the KRaft state machine.
It is important to note that this commit changes the code and the in-memory state to track the sets of voters but it doesn't change any data that is externally exposed. It doesn't change the RPCs, data stored on disk or configuration.
When the KRaft replica starts the PartitionListener reads the latest snapshot and then log segments up to the LEO, updating the in-memory state as it reads KRaftVersionRecord and VotersRecord. When the replica (leader and follower) appends to the log, the PartitionListener catches up to the new LEO. When the replica truncates the log because of a diverging epoch, the PartitionListener also truncates the in-memory state to the new LEO. When the state machine generate a new snapshot the PartitionListener trims any prefix entries that are not needed. This is all done to minimize the amount of data tracked in-memory and to make sure that it matches the state on disk.
To implement the functionality described above this commit also makes the following changes:
Adds control records for KRaftVersionRecord and VotersRecord. KRaftVersionRecord describes the finalized kraft.version supported by all of the replicas. VotersRecords describes the set of voters at a specific offset.
Changes Kafka's feature version to support 0 as the smallest valid value. This is needed because the default value for kraft.version is 0.
Refactors FileRawSnapshotWriter so that it doesn't directly call the onSnapshotFrozen callback. It adds NotifyingRawSnapshotWriter for calling such callbacks. This reorganization is needed because in this change both the KafkaMetadataLog and the KafkaRaftClient need to react to snapshots getting frozen.
Cleans up KafkaRaftClient's initialization. Removes initialize from RaftClient - this is an implementation detail that doesn't need to be exposed in the interface. Removes RaftConfig.AddressSpec and simplifies the bootstrapping of the static voter's address. The bootstrapping of the address is delayed because of tests. We should be able to simplify this further in future commits.
Update the DumpLogSegment CLI to support the new control records KRaftVersionRecord and VotersRecord.
Fix the RecordsSnapshotReader implementations so that the iterator includes control records. RecordsIterator is extended to support reading the new control records.
Improve the BatchAccumulator implementation to allow multiple control records in one control batch. This is needed so that KRaft can make sure that VotersRecord is included in the same batch as the control record (KRaftVersionRecord) that upgrades the kraft.version to 1.
Add a History interface and default implementation TreeMapHistory. This is used to track all of the sets of voters between the latest snapshot and the LEO. This is needed so that KafkaRaftClient can query for the latest set of voters and so that KafkaRaftClient can include the correct set of voters when the state machine generates a new snapshot at a given offset.
Add a builder pattern for RecordsSnapshotWriter. The new builder pattern also implements including the KRaftVersionRecord and VotersRecord control records in the snapshot as necessary. A KRaftVersionRecord should be appended if the kraft.version is greater than 0 at the snapshot's offset. Similarly, a VotersRecord should be appended to the snapshot with the latest value up to the snapshot's offset.