From 5f85e52becb5e04e1b8b1496aca723aa622f579a Mon Sep 17 00:00:00 2001 From: Apoorv Mittal Date: Mon, 24 Nov 2025 21:52:17 +0000 Subject: [PATCH] MINOR: Adding check for invalid batch from persister in Share Partition (#20979) While writing unit tests, encounterd incorrect further batches when persister provides corrupt batches. Added a check to fail early rather sending incorrect batches to clients later. Reviewers: Andrew Schofield --- .../kafka/server/share/SharePartition.java | 9 +++++++++ .../server/share/SharePartitionTest.java | 20 +++++++++++++++++++ 2 files changed, 29 insertions(+) diff --git a/core/src/main/java/kafka/server/share/SharePartition.java b/core/src/main/java/kafka/server/share/SharePartition.java index 6f7041b92c0be..78ab7c29897ce 100644 --- a/core/src/main/java/kafka/server/share/SharePartition.java +++ b/core/src/main/java/kafka/server/share/SharePartition.java @@ -490,6 +490,15 @@ public CompletableFuture maybeInitialize() { throwable = new IllegalStateException(String.format("Failed to initialize the share partition %s-%s", groupId, topicIdPartition)); return; } + + if (stateBatch.lastOffset() < stateBatch.firstOffset()) { + log.error("Invalid state batch found for the share partition: {}-{}. The first offset: {}" + + " is less than the last offset of the batch: {}.", groupId, topicIdPartition, + stateBatch.firstOffset(), stateBatch.lastOffset()); + throwable = new IllegalStateException(String.format("Failed to initialize the share partition %s-%s", groupId, topicIdPartition)); + return; + } + if (gapStartOffset == -1 && stateBatch.firstOffset() > previousBatchLastOffset + 1) { gapStartOffset = previousBatchLastOffset + 1; } diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java b/core/src/test/java/kafka/server/share/SharePartitionTest.java index c8be18c027ead..73c0242b81f66 100644 --- a/core/src/test/java/kafka/server/share/SharePartitionTest.java +++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java @@ -1804,6 +1804,26 @@ public void testMaybeInitializeAndAcquireWithMultipleBatchesPriorStartOffset() { assertEquals(3, sharePartition.deliveryCompleteCount()); } + @Test + public void testMaybeInitializeWithInvalidOffsetInBatch() { + Persister persister = Mockito.mock(Persister.class); + ReadShareGroupStateResult readShareGroupStateResult = Mockito.mock(ReadShareGroupStateResult.class); + Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of( + new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of( + PartitionFactory.newPartitionAllData(0, 3, 5L, Errors.NONE.code(), Errors.NONE.message(), + List.of( + new PersisterStateBatch(5L, 10L, RecordState.AVAILABLE.id, (short) 2), + new PersisterStateBatch(11L, 10L, RecordState.ARCHIVED.id, (short) 3))))))); + Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult)); + SharePartition sharePartition = SharePartitionBuilder.builder().withPersister(persister).build(); + + CompletableFuture result = sharePartition.maybeInitialize(); + assertTrue(result.isDone()); + assertTrue(result.isCompletedExceptionally()); + assertFutureThrows(IllegalStateException.class, result); + assertEquals(SharePartitionState.FAILED, sharePartition.partitionState()); + } + @Test public void testAcquireSingleRecord() throws InterruptedException { SharePartition sharePartition = SharePartitionBuilder.builder()