Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINOR; Validate at least one control record #15912

Merged
merged 5 commits into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -730,7 +730,7 @@ public static MemoryRecords withLeaderChangeMessage(
ByteBuffer buffer,
LeaderChangeMessage leaderChangeMessage
) {
try (MemoryRecordsBuilder builder = createKraftControlReccordBuilder(
try (MemoryRecordsBuilder builder = createKraftControlRecordBuilder(
initialOffset,
timestamp,
leaderEpoch,
Expand All @@ -749,7 +749,7 @@ public static MemoryRecords withSnapshotHeaderRecord(
ByteBuffer buffer,
SnapshotHeaderRecord snapshotHeaderRecord
) {
try (MemoryRecordsBuilder builder = createKraftControlReccordBuilder(
try (MemoryRecordsBuilder builder = createKraftControlRecordBuilder(
initialOffset,
timestamp,
leaderEpoch,
Expand All @@ -768,7 +768,7 @@ public static MemoryRecords withSnapshotFooterRecord(
ByteBuffer buffer,
SnapshotFooterRecord snapshotFooterRecord
) {
try (MemoryRecordsBuilder builder = createKraftControlReccordBuilder(
try (MemoryRecordsBuilder builder = createKraftControlRecordBuilder(
initialOffset,
timestamp,
leaderEpoch,
Expand All @@ -787,7 +787,7 @@ public static MemoryRecords withKRaftVersionRecord(
ByteBuffer buffer,
KRaftVersionRecord kraftVersionRecord
) {
try (MemoryRecordsBuilder builder = createKraftControlReccordBuilder(
try (MemoryRecordsBuilder builder = createKraftControlRecordBuilder(
initialOffset,
timestamp,
leaderEpoch,
Expand All @@ -806,7 +806,7 @@ public static MemoryRecords withVotersRecord(
ByteBuffer buffer,
VotersRecord votersRecord
) {
try (MemoryRecordsBuilder builder = createKraftControlReccordBuilder(
try (MemoryRecordsBuilder builder = createKraftControlRecordBuilder(
initialOffset,
timestamp,
leaderEpoch,
Expand All @@ -818,7 +818,7 @@ public static MemoryRecords withVotersRecord(
}
}

private static MemoryRecordsBuilder createKraftControlReccordBuilder(
private static MemoryRecordsBuilder createKraftControlRecordBuilder(
long initialOffset,
long timestamp,
int leaderEpoch,
Expand Down
8 changes: 4 additions & 4 deletions raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -386,16 +386,17 @@ public void initialize(
logger.info("Reading KRaft snapshot and log as part of the initialization");
partitionState.updateState();

VoterSet lastVoterSet = partitionState.lastVoterSet();
requestManager = new RequestManager(
partitionState.lastVoterSet().voterIds(),
lastVoterSet.voterIds(),
quorumConfig.retryBackoffMs(),
quorumConfig.requestTimeoutMs(),
random
);

quorum = new QuorumState(
nodeId,
partitionState.lastVoterSet().voterIds(),
lastVoterSet.voterIds(),
quorumConfig.electionTimeoutMs(),
quorumConfig.fetchTimeoutMs(),
quorumStateStore,
Expand All @@ -409,7 +410,6 @@ public void initialize(
// so there are no unknown voter connections. Report this metric as 0.
kafkaRaftMetrics.updateNumUnknownVoterConnections(0);

VoterSet lastVoterSet = partitionState.lastVoterSet();
for (Integer voterId : lastVoterSet.voterIds()) {
channel.updateEndpoint(voterId, lastVoterSet.voterAddress(voterId, listenerName).get());
}
Expand Down Expand Up @@ -1524,7 +1524,7 @@ private boolean handleFetchSnapshotResponse(
quorum.leaderIdOrSentinel()
);

// This will aways reload the snapshot because the internal next offset
// This will always reload the snapshot because the internal next offset
// is always less than the snapshot id just downloaded.
partitionState.updateState();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ default long truncateToEndOffset(OffsetAndEpoch endOffset) {
* {@link Optional#empty()}.
*
* The snapshot id will be validated against the existing snapshots and the log. The snapshot id
* must not alread exist, it must be greater than the log start offset, it must be less than
* must not already exist, it must be greater than the log start offset, it must be less than
* the high-watermark and it must exist in the log.
*
* @param snapshotId the end offset and epoch that identifies the snapshot
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ public void appendControlMessages(MemoryRecordsCreator valueCreator) {
}

private int validateMemoryRecordAndReturnCount(MemoryRecords memoryRecord) {
// Confirm that it is at most one batch and it is a control record
// Confirm that it is one control batch and it is at least one control record
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

validateMemoryRecordAndReturnCount => validateMemoryRecordsAndReturnCount
memoryRecord => memoryRecords
Also, there is an existing typo creatte on line 268.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

Iterator<MutableRecordBatch> batches = memoryRecord.batches().iterator();
if (!batches.hasNext()) {
throw new IllegalArgumentException("valueCreator didn't create a batch");
Expand Down Expand Up @@ -284,6 +284,8 @@ private int validateMemoryRecordAndReturnCount(MemoryRecords memoryRecord) {
);
} else if (numberOfRecords == null) {
throw new IllegalArgumentException("valueCreator didn't create a batch with the count");
} else if (numberOfRecords < 1) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to add new UT to BatchAccumulatorTest for this case?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the test.

throw new IllegalArgumentException("valueCreator didn't create at least one control record");
} else if (batches.hasNext()) {
throw new IllegalArgumentException("valueCreator created more than one batch");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@
* This type keeps track of changes to the finalized kraft.version and the sets of voters between
* the latest snasphot and the log end offset.
*
* The are two actors/threads for this type. One is the KRaft driver which indirectly call a lot of
* the public methods. The other are the callers of {@code RaftClient::createSnapshot} which
* There are two type of actors/threads accessing this type. One is the KRaft driver which indirectly call a lot of
* the public methods. The other actors/threads are the callers of {@code RaftClient.createSnapshot} which
* indirectly call {@code voterSetAtOffset} and {@code kraftVersionAtOffset} when freezing a snapshot.
*/
final public class KRaftControlRecordStateMachine {
Expand Down Expand Up @@ -115,7 +115,7 @@ public void truncateNewEntries(long endOffset) {
/**
* Remove the tail of the log until the given offset.
*
* @param @startOffset the start offset (inclusive)
* @param startOffset the start offset (inclusive)
*/
public void truncateOldEntries(long startOffset) {
synchronized (voterSetHistory) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
*
* It encapsulates static information like a voter's endpoint and their supported kraft.version.
*
* It providees functionality for converting to and from {@code VotersRecord} and for converting
* It provides functionality for converting to and from {@code VotersRecord} and for converting
* from the static configuration.
*/
final public class VoterSet {
Expand Down Expand Up @@ -161,8 +161,8 @@ public VotersRecord toVotersRecord(short version) {
* An overlapping majority means that for all majorities in {@code this} set of voters and for
* all majority in {@code that} set of voters, they have at least one voter in common.
*
* If this function returns true is means that one of the voter set commits an offset, it means
* that the other voter set cannot commit a conflicting offset.
* If this function returns true, it means that if one of the set of voters commits an offset, the
* the other set of voters cannot commit a conflicting offset.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the the other => the other

*
* @param that the other voter set to compare
* @return true if they have an overlapping majority, false otherwise
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
/**
* A type for storing the historical value of the set of voters.
*
* This type can be use to keep track in-memory the sets for voters stored in the latest snapshot
* and log. This is useful when generating a new snapshot at a given offset or when evaulating
* the latest set of voters.
* This type can be used to keep track, in-memory, of the sets for voters stored in the latest snapshot
* and the log segments. This is useful when generating a new snapshot at a given offset or when
* evaluating the latest set of voters.
*/
final public class VoterSetHistory {
private final Optional<VoterSet> staticVoterSet;
Expand All @@ -40,7 +40,7 @@ final public class VoterSetHistory {
* offset of all previous calls to this method.
*
* @param offset the offset
* @param value the value to store
* @param voters the voters to store
* @throws IllegalArgumentException if the offset is not greater than all previous offsets
*/
public void addAt(long offset, VoterSet voters) {
Expand Down Expand Up @@ -69,7 +69,7 @@ public void addAt(long offset, VoterSet voters) {
* Computes the value of the voter set at a given offset.
*
* This function will only return values provided through {@code addAt} and it would never
* include the {@code staticVoterSet} provided through the constructoer.
* include the {@code staticVoterSet} provided through the constructor.
*
* @param offset the offset (inclusive)
* @return the voter set if one exist, otherwise {@code Optional.empty()}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,10 @@ public static <T> RecordsSnapshotReader<T> of(
}

/**
* Returns the next non-control Batch
* Returns the next batch
*/
private Optional<Batch<T>> nextBatch() {
while (iterator.hasNext()) {
if (iterator.hasNext()) {
Batch<T> batch = iterator.next();

if (!lastContainedLogTimestamp.isPresent()) {
Expand Down