diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java index 4eab7f7b658f..5d6f9227875f 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java @@ -730,7 +730,7 @@ public static MemoryRecords withLeaderChangeMessage( ByteBuffer buffer, LeaderChangeMessage leaderChangeMessage ) { - try (MemoryRecordsBuilder builder = createKraftControlReccordBuilder( + try (MemoryRecordsBuilder builder = createKraftControlRecordBuilder( initialOffset, timestamp, leaderEpoch, @@ -749,7 +749,7 @@ public static MemoryRecords withSnapshotHeaderRecord( ByteBuffer buffer, SnapshotHeaderRecord snapshotHeaderRecord ) { - try (MemoryRecordsBuilder builder = createKraftControlReccordBuilder( + try (MemoryRecordsBuilder builder = createKraftControlRecordBuilder( initialOffset, timestamp, leaderEpoch, @@ -768,7 +768,7 @@ public static MemoryRecords withSnapshotFooterRecord( ByteBuffer buffer, SnapshotFooterRecord snapshotFooterRecord ) { - try (MemoryRecordsBuilder builder = createKraftControlReccordBuilder( + try (MemoryRecordsBuilder builder = createKraftControlRecordBuilder( initialOffset, timestamp, leaderEpoch, @@ -787,7 +787,7 @@ public static MemoryRecords withKRaftVersionRecord( ByteBuffer buffer, KRaftVersionRecord kraftVersionRecord ) { - try (MemoryRecordsBuilder builder = createKraftControlReccordBuilder( + try (MemoryRecordsBuilder builder = createKraftControlRecordBuilder( initialOffset, timestamp, leaderEpoch, @@ -806,7 +806,7 @@ public static MemoryRecords withVotersRecord( ByteBuffer buffer, VotersRecord votersRecord ) { - try (MemoryRecordsBuilder builder = createKraftControlReccordBuilder( + try (MemoryRecordsBuilder builder = createKraftControlRecordBuilder( initialOffset, timestamp, leaderEpoch, @@ -818,7 +818,7 @@ public static MemoryRecords withVotersRecord( } } - private static MemoryRecordsBuilder createKraftControlReccordBuilder( + private static MemoryRecordsBuilder createKraftControlRecordBuilder( long initialOffset, long timestamp, int leaderEpoch, diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java index 0662af6875c4..70408c73beaf 100644 --- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java +++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java @@ -386,8 +386,9 @@ public void initialize( logger.info("Reading KRaft snapshot and log as part of the initialization"); partitionState.updateState(); + VoterSet lastVoterSet = partitionState.lastVoterSet(); requestManager = new RequestManager( - partitionState.lastVoterSet().voterIds(), + lastVoterSet.voterIds(), quorumConfig.retryBackoffMs(), quorumConfig.requestTimeoutMs(), random @@ -395,7 +396,7 @@ public void initialize( quorum = new QuorumState( nodeId, - partitionState.lastVoterSet().voterIds(), + lastVoterSet.voterIds(), quorumConfig.electionTimeoutMs(), quorumConfig.fetchTimeoutMs(), quorumStateStore, @@ -409,7 +410,6 @@ public void initialize( // so there are no unknown voter connections. Report this metric as 0. kafkaRaftMetrics.updateNumUnknownVoterConnections(0); - VoterSet lastVoterSet = partitionState.lastVoterSet(); for (Integer voterId : lastVoterSet.voterIds()) { channel.updateEndpoint(voterId, lastVoterSet.voterAddress(voterId, listenerName).get()); } @@ -1524,7 +1524,7 @@ private boolean handleFetchSnapshotResponse( quorum.leaderIdOrSentinel() ); - // This will aways reload the snapshot because the internal next offset + // This will always reload the snapshot because the internal next offset // is always less than the snapshot id just downloaded. partitionState.updateState(); diff --git a/raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java b/raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java index a8bd7f13d9a2..fdc5c5fb0df3 100644 --- a/raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java +++ b/raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java @@ -236,7 +236,7 @@ default long truncateToEndOffset(OffsetAndEpoch endOffset) { * {@link Optional#empty()}. * * The snapshot id will be validated against the existing snapshots and the log. The snapshot id - * must not alread exist, it must be greater than the log start offset, it must be less than + * must not already exist, it must be greater than the log start offset, it must be less than * the high-watermark and it must exist in the log. * * @param snapshotId the end offset and epoch that identifies the snapshot diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java b/raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java index 1163d68b47f2..f16ec2f2d0ea 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java @@ -230,7 +230,7 @@ public void appendControlMessages(MemoryRecordsCreator valueCreator) { forceDrain(); MemoryRecords memoryRecords = valueCreator.create(nextOffset, epoch, buffer); - int numberOfRecords = validateMemoryRecordAndReturnCount(memoryRecords); + int numberOfRecords = validateMemoryRecordsAndReturnCount(memoryRecords); completed.add( new CompletedBatch<>( @@ -255,9 +255,9 @@ public void appendControlMessages(MemoryRecordsCreator valueCreator) { } } - private int validateMemoryRecordAndReturnCount(MemoryRecords memoryRecord) { - // Confirm that it is at most one batch and it is a control record - Iterator batches = memoryRecord.batches().iterator(); + private int validateMemoryRecordsAndReturnCount(MemoryRecords memoryRecords) { + // Confirm that it is one control batch and it is at least one control record + Iterator batches = memoryRecords.batches().iterator(); if (!batches.hasNext()) { throw new IllegalArgumentException("valueCreator didn't create a batch"); } @@ -265,7 +265,7 @@ private int validateMemoryRecordAndReturnCount(MemoryRecords memoryRecord) { MutableRecordBatch batch = batches.next(); Integer numberOfRecords = batch.countOrNull(); if (!batch.isControlBatch()) { - throw new IllegalArgumentException("valueCreator didn't creatte a control batch"); + throw new IllegalArgumentException("valueCreator didn't create a control batch"); } else if (batch.baseOffset() != nextOffset) { throw new IllegalArgumentException( String.format( @@ -284,6 +284,8 @@ private int validateMemoryRecordAndReturnCount(MemoryRecords memoryRecord) { ); } else if (numberOfRecords == null) { throw new IllegalArgumentException("valueCreator didn't create a batch with the count"); + } else if (numberOfRecords < 1) { + throw new IllegalArgumentException("valueCreator didn't create at least one control record"); } else if (batches.hasNext()) { throw new IllegalArgumentException("valueCreator created more than one batch"); } diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java b/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java index 4dab03bf7980..dd6e6a0cd39a 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java @@ -39,8 +39,8 @@ * This type keeps track of changes to the finalized kraft.version and the sets of voters between * the latest snasphot and the log end offset. * - * The are two actors/threads for this type. One is the KRaft driver which indirectly call a lot of - * the public methods. The other are the callers of {@code RaftClient::createSnapshot} which + * There are two type of actors/threads accessing this type. One is the KRaft driver which indirectly call a lot of + * the public methods. The other actors/threads are the callers of {@code RaftClient.createSnapshot} which * indirectly call {@code voterSetAtOffset} and {@code kraftVersionAtOffset} when freezing a snapshot. */ final public class KRaftControlRecordStateMachine { @@ -115,7 +115,7 @@ public void truncateNewEntries(long endOffset) { /** * Remove the tail of the log until the given offset. * - * @param @startOffset the start offset (inclusive) + * @param startOffset the start offset (inclusive) */ public void truncateOldEntries(long startOffset) { synchronized (voterSetHistory) { diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java b/raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java index ae0ec4d4b669..9ca38368c031 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java @@ -38,7 +38,7 @@ * * It encapsulates static information like a voter's endpoint and their supported kraft.version. * - * It providees functionality for converting to and from {@code VotersRecord} and for converting + * It provides functionality for converting to and from {@code VotersRecord} and for converting * from the static configuration. */ final public class VoterSet { @@ -161,8 +161,8 @@ public VotersRecord toVotersRecord(short version) { * An overlapping majority means that for all majorities in {@code this} set of voters and for * all majority in {@code that} set of voters, they have at least one voter in common. * - * If this function returns true is means that one of the voter set commits an offset, it means - * that the other voter set cannot commit a conflicting offset. + * If this function returns true, it means that if one of the set of voters commits an offset, + * the other set of voters cannot commit a conflicting offset. * * @param that the other voter set to compare * @return true if they have an overlapping majority, false otherwise diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/VoterSetHistory.java b/raft/src/main/java/org/apache/kafka/raft/internals/VoterSetHistory.java index 3a15d62c5a91..fa44660af671 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/VoterSetHistory.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/VoterSetHistory.java @@ -21,9 +21,9 @@ /** * A type for storing the historical value of the set of voters. * - * This type can be use to keep track in-memory the sets for voters stored in the latest snapshot - * and log. This is useful when generating a new snapshot at a given offset or when evaulating - * the latest set of voters. + * This type can be used to keep track, in-memory, of the sets for voters stored in the latest snapshot + * and the log segments. This is useful when generating a new snapshot at a given offset or when + * evaluating the latest set of voters. */ final public class VoterSetHistory { private final Optional staticVoterSet; @@ -40,7 +40,7 @@ final public class VoterSetHistory { * offset of all previous calls to this method. * * @param offset the offset - * @param value the value to store + * @param voters the voters to store * @throws IllegalArgumentException if the offset is not greater than all previous offsets */ public void addAt(long offset, VoterSet voters) { @@ -69,7 +69,7 @@ public void addAt(long offset, VoterSet voters) { * Computes the value of the voter set at a given offset. * * This function will only return values provided through {@code addAt} and it would never - * include the {@code staticVoterSet} provided through the constructoer. + * include the {@code staticVoterSet} provided through the constructor. * * @param offset the offset (inclusive) * @return the voter set if one exist, otherwise {@code Optional.empty()} diff --git a/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotReader.java b/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotReader.java index 3e6a9a5732db..20b6c0eaab85 100644 --- a/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotReader.java +++ b/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotReader.java @@ -120,10 +120,10 @@ public static RecordsSnapshotReader of( } /** - * Returns the next non-control Batch + * Returns the next batch */ private Optional> nextBatch() { - while (iterator.hasNext()) { + if (iterator.hasNext()) { Batch batch = iterator.next(); if (!lastContainedLogTimestamp.isPresent()) { diff --git a/raft/src/test/java/org/apache/kafka/raft/internals/BatchAccumulatorTest.java b/raft/src/test/java/org/apache/kafka/raft/internals/BatchAccumulatorTest.java index cf63f4384f9e..30f5691e2e24 100644 --- a/raft/src/test/java/org/apache/kafka/raft/internals/BatchAccumulatorTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/internals/BatchAccumulatorTest.java @@ -683,6 +683,36 @@ public void testInvalidControlRecordEpoch() { } } + @Test + public void testEmptyControlBatch() { + int leaderEpoch = 17; + long baseOffset = 157; + int lingerMs = 50; + int maxBatchSize = 512; + + ByteBuffer buffer = ByteBuffer.allocate(maxBatchSize); + Mockito.when(memoryPool.tryAllocate(maxBatchSize)) + .thenReturn(buffer); + + BatchAccumulator.MemoryRecordsCreator creator = (offset, epoch, buf) -> { + long now = 1234; + try (MemoryRecordsBuilder builder = controlRecordsBuilder(offset, epoch, now, buf)) { + // Create a control batch without any records + return builder.build(); + } + }; + + try (BatchAccumulator acc = buildAccumulator( + leaderEpoch, + baseOffset, + lingerMs, + maxBatchSize + ) + ) { + assertThrows(IllegalArgumentException.class, () -> acc.appendControlMessages(creator)); + } + } + private static MemoryRecordsBuilder controlRecordsBuilder( long baseOffset, int epoch,