Skip to content

Commit

Permalink
MINOR; Validate at least one control record (#15912)
Browse files Browse the repository at this point in the history
Validate that a control batch in the batch accumulator has at least one control record.

Reviewers: Jun Rao <junrao@apache.org>, Chia-Ping Tsai <chia7712@apache.org>
  • Loading branch information
jsancio authored May 14, 2024
1 parent 8ac32d6 commit 440f5f6
Show file tree
Hide file tree
Showing 9 changed files with 61 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -730,7 +730,7 @@ public static MemoryRecords withLeaderChangeMessage(
ByteBuffer buffer,
LeaderChangeMessage leaderChangeMessage
) {
try (MemoryRecordsBuilder builder = createKraftControlReccordBuilder(
try (MemoryRecordsBuilder builder = createKraftControlRecordBuilder(
initialOffset,
timestamp,
leaderEpoch,
Expand All @@ -749,7 +749,7 @@ public static MemoryRecords withSnapshotHeaderRecord(
ByteBuffer buffer,
SnapshotHeaderRecord snapshotHeaderRecord
) {
try (MemoryRecordsBuilder builder = createKraftControlReccordBuilder(
try (MemoryRecordsBuilder builder = createKraftControlRecordBuilder(
initialOffset,
timestamp,
leaderEpoch,
Expand All @@ -768,7 +768,7 @@ public static MemoryRecords withSnapshotFooterRecord(
ByteBuffer buffer,
SnapshotFooterRecord snapshotFooterRecord
) {
try (MemoryRecordsBuilder builder = createKraftControlReccordBuilder(
try (MemoryRecordsBuilder builder = createKraftControlRecordBuilder(
initialOffset,
timestamp,
leaderEpoch,
Expand All @@ -787,7 +787,7 @@ public static MemoryRecords withKRaftVersionRecord(
ByteBuffer buffer,
KRaftVersionRecord kraftVersionRecord
) {
try (MemoryRecordsBuilder builder = createKraftControlReccordBuilder(
try (MemoryRecordsBuilder builder = createKraftControlRecordBuilder(
initialOffset,
timestamp,
leaderEpoch,
Expand All @@ -806,7 +806,7 @@ public static MemoryRecords withVotersRecord(
ByteBuffer buffer,
VotersRecord votersRecord
) {
try (MemoryRecordsBuilder builder = createKraftControlReccordBuilder(
try (MemoryRecordsBuilder builder = createKraftControlRecordBuilder(
initialOffset,
timestamp,
leaderEpoch,
Expand All @@ -818,7 +818,7 @@ public static MemoryRecords withVotersRecord(
}
}

private static MemoryRecordsBuilder createKraftControlReccordBuilder(
private static MemoryRecordsBuilder createKraftControlRecordBuilder(
long initialOffset,
long timestamp,
int leaderEpoch,
Expand Down
8 changes: 4 additions & 4 deletions raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -386,16 +386,17 @@ public void initialize(
logger.info("Reading KRaft snapshot and log as part of the initialization");
partitionState.updateState();

VoterSet lastVoterSet = partitionState.lastVoterSet();
requestManager = new RequestManager(
partitionState.lastVoterSet().voterIds(),
lastVoterSet.voterIds(),
quorumConfig.retryBackoffMs(),
quorumConfig.requestTimeoutMs(),
random
);

quorum = new QuorumState(
nodeId,
partitionState.lastVoterSet().voterIds(),
lastVoterSet.voterIds(),
quorumConfig.electionTimeoutMs(),
quorumConfig.fetchTimeoutMs(),
quorumStateStore,
Expand All @@ -409,7 +410,6 @@ public void initialize(
// so there are no unknown voter connections. Report this metric as 0.
kafkaRaftMetrics.updateNumUnknownVoterConnections(0);

VoterSet lastVoterSet = partitionState.lastVoterSet();
for (Integer voterId : lastVoterSet.voterIds()) {
channel.updateEndpoint(voterId, lastVoterSet.voterAddress(voterId, listenerName).get());
}
Expand Down Expand Up @@ -1524,7 +1524,7 @@ private boolean handleFetchSnapshotResponse(
quorum.leaderIdOrSentinel()
);

// This will aways reload the snapshot because the internal next offset
// This will always reload the snapshot because the internal next offset
// is always less than the snapshot id just downloaded.
partitionState.updateState();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ default long truncateToEndOffset(OffsetAndEpoch endOffset) {
* {@link Optional#empty()}.
*
* The snapshot id will be validated against the existing snapshots and the log. The snapshot id
* must not alread exist, it must be greater than the log start offset, it must be less than
* must not already exist, it must be greater than the log start offset, it must be less than
* the high-watermark and it must exist in the log.
*
* @param snapshotId the end offset and epoch that identifies the snapshot
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ public void appendControlMessages(MemoryRecordsCreator valueCreator) {
forceDrain();
MemoryRecords memoryRecords = valueCreator.create(nextOffset, epoch, buffer);

int numberOfRecords = validateMemoryRecordAndReturnCount(memoryRecords);
int numberOfRecords = validateMemoryRecordsAndReturnCount(memoryRecords);

completed.add(
new CompletedBatch<>(
Expand All @@ -255,17 +255,17 @@ public void appendControlMessages(MemoryRecordsCreator valueCreator) {
}
}

private int validateMemoryRecordAndReturnCount(MemoryRecords memoryRecord) {
// Confirm that it is at most one batch and it is a control record
Iterator<MutableRecordBatch> batches = memoryRecord.batches().iterator();
private int validateMemoryRecordsAndReturnCount(MemoryRecords memoryRecords) {
// Confirm that it is one control batch and it is at least one control record
Iterator<MutableRecordBatch> batches = memoryRecords.batches().iterator();
if (!batches.hasNext()) {
throw new IllegalArgumentException("valueCreator didn't create a batch");
}

MutableRecordBatch batch = batches.next();
Integer numberOfRecords = batch.countOrNull();
if (!batch.isControlBatch()) {
throw new IllegalArgumentException("valueCreator didn't creatte a control batch");
throw new IllegalArgumentException("valueCreator didn't create a control batch");
} else if (batch.baseOffset() != nextOffset) {
throw new IllegalArgumentException(
String.format(
Expand All @@ -284,6 +284,8 @@ private int validateMemoryRecordAndReturnCount(MemoryRecords memoryRecord) {
);
} else if (numberOfRecords == null) {
throw new IllegalArgumentException("valueCreator didn't create a batch with the count");
} else if (numberOfRecords < 1) {
throw new IllegalArgumentException("valueCreator didn't create at least one control record");
} else if (batches.hasNext()) {
throw new IllegalArgumentException("valueCreator created more than one batch");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@
* This type keeps track of changes to the finalized kraft.version and the sets of voters between
* the latest snasphot and the log end offset.
*
* The are two actors/threads for this type. One is the KRaft driver which indirectly call a lot of
* the public methods. The other are the callers of {@code RaftClient::createSnapshot} which
* There are two type of actors/threads accessing this type. One is the KRaft driver which indirectly call a lot of
* the public methods. The other actors/threads are the callers of {@code RaftClient.createSnapshot} which
* indirectly call {@code voterSetAtOffset} and {@code kraftVersionAtOffset} when freezing a snapshot.
*/
final public class KRaftControlRecordStateMachine {
Expand Down Expand Up @@ -115,7 +115,7 @@ public void truncateNewEntries(long endOffset) {
/**
* Remove the tail of the log until the given offset.
*
* @param @startOffset the start offset (inclusive)
* @param startOffset the start offset (inclusive)
*/
public void truncateOldEntries(long startOffset) {
synchronized (voterSetHistory) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
*
* It encapsulates static information like a voter's endpoint and their supported kraft.version.
*
* It providees functionality for converting to and from {@code VotersRecord} and for converting
* It provides functionality for converting to and from {@code VotersRecord} and for converting
* from the static configuration.
*/
final public class VoterSet {
Expand Down Expand Up @@ -161,8 +161,8 @@ public VotersRecord toVotersRecord(short version) {
* An overlapping majority means that for all majorities in {@code this} set of voters and for
* all majority in {@code that} set of voters, they have at least one voter in common.
*
* If this function returns true is means that one of the voter set commits an offset, it means
* that the other voter set cannot commit a conflicting offset.
* If this function returns true, it means that if one of the set of voters commits an offset,
* the other set of voters cannot commit a conflicting offset.
*
* @param that the other voter set to compare
* @return true if they have an overlapping majority, false otherwise
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
/**
* A type for storing the historical value of the set of voters.
*
* This type can be use to keep track in-memory the sets for voters stored in the latest snapshot
* and log. This is useful when generating a new snapshot at a given offset or when evaulating
* the latest set of voters.
* This type can be used to keep track, in-memory, of the sets for voters stored in the latest snapshot
* and the log segments. This is useful when generating a new snapshot at a given offset or when
* evaluating the latest set of voters.
*/
final public class VoterSetHistory {
private final Optional<VoterSet> staticVoterSet;
Expand All @@ -40,7 +40,7 @@ final public class VoterSetHistory {
* offset of all previous calls to this method.
*
* @param offset the offset
* @param value the value to store
* @param voters the voters to store
* @throws IllegalArgumentException if the offset is not greater than all previous offsets
*/
public void addAt(long offset, VoterSet voters) {
Expand Down Expand Up @@ -69,7 +69,7 @@ public void addAt(long offset, VoterSet voters) {
* Computes the value of the voter set at a given offset.
*
* This function will only return values provided through {@code addAt} and it would never
* include the {@code staticVoterSet} provided through the constructoer.
* include the {@code staticVoterSet} provided through the constructor.
*
* @param offset the offset (inclusive)
* @return the voter set if one exist, otherwise {@code Optional.empty()}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,10 @@ public static <T> RecordsSnapshotReader<T> of(
}

/**
* Returns the next non-control Batch
* Returns the next batch
*/
private Optional<Batch<T>> nextBatch() {
while (iterator.hasNext()) {
if (iterator.hasNext()) {
Batch<T> batch = iterator.next();

if (!lastContainedLogTimestamp.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -683,6 +683,36 @@ public void testInvalidControlRecordEpoch() {
}
}

@Test
public void testEmptyControlBatch() {
int leaderEpoch = 17;
long baseOffset = 157;
int lingerMs = 50;
int maxBatchSize = 512;

ByteBuffer buffer = ByteBuffer.allocate(maxBatchSize);
Mockito.when(memoryPool.tryAllocate(maxBatchSize))
.thenReturn(buffer);

BatchAccumulator.MemoryRecordsCreator creator = (offset, epoch, buf) -> {
long now = 1234;
try (MemoryRecordsBuilder builder = controlRecordsBuilder(offset, epoch, now, buf)) {
// Create a control batch without any records
return builder.build();
}
};

try (BatchAccumulator<String> acc = buildAccumulator(
leaderEpoch,
baseOffset,
lingerMs,
maxBatchSize
)
) {
assertThrows(IllegalArgumentException.class, () -> acc.appendControlMessages(creator));
}
}

private static MemoryRecordsBuilder controlRecordsBuilder(
long baseOffset,
int epoch,
Expand Down

0 comments on commit 440f5f6

Please sign in to comment.