diff --git a/core/src/main/java/kafka/server/share/SharePartition.java b/core/src/main/java/kafka/server/share/SharePartition.java index 2c33007673363..222769ac8751b 100644 --- a/core/src/main/java/kafka/server/share/SharePartition.java +++ b/core/src/main/java/kafka/server/share/SharePartition.java @@ -373,7 +373,7 @@ enum SharePartitionState { * or completes with an exception if the share partition is in non-initializable state. */ public CompletableFuture maybeInitialize() { - log.debug("Maybe initialize share partition: {}-{}", groupId, topicIdPartition); + log.trace("Maybe initialize share partition: {}-{}", groupId, topicIdPartition); // Check if the share partition is already initialized. try { if (initializedOrThrowException()) return CompletableFuture.completedFuture(null); @@ -487,6 +487,8 @@ public CompletableFuture maybeInitialize() { } // Set the partition state to Active and complete the future. partitionState = SharePartitionState.ACTIVE; + log.debug("Initialized share partition: {}-{} with persister read state: {}, cached state: {}", + groupId, topicIdPartition, result.topicsData(), cachedState); } catch (Exception e) { throwable = e; } finally { @@ -738,16 +740,16 @@ public ShareAcquiredRecords acquire( baseOffset = floorEntry.getKey(); } // Validate if the fetch records are already part of existing batches and if available. - NavigableMap subMap = cachedState.subMap(baseOffset, true, lastBatch.lastOffset(), true); + NavigableMap subMap = cachedState.subMap(baseOffset, true, lastOffsetToAcquire, true); // No overlap with request offsets in the cache for in-flight records. Acquire the complete // batch. if (subMap.isEmpty()) { log.trace("No cached data exists for the share partition for requested fetch batch: {}-{}", groupId, topicIdPartition); - // Do not send the lastOffsetToAcquire as when the subMap is empty, it means that - // there isn't any overlap itself. + // It's safe to use lastOffsetToAcquire instead of lastBatch.lastOffset() because there is no + // overlap hence the lastOffsetToAcquire is same as lastBatch.lastOffset() or before that. ShareAcquiredRecords shareAcquiredRecords = acquireNewBatchRecords(memberId, fetchPartitionData.records.batches(), - firstBatch.baseOffset(), lastBatch.lastOffset(), batchSize, maxRecordsToAcquire); + firstBatch.baseOffset(), lastOffsetToAcquire, batchSize, maxRecordsToAcquire); return maybeFilterAbortedTransactionalAcquiredRecords(fetchPartitionData, isolationLevel, shareAcquiredRecords); } @@ -776,6 +778,15 @@ public ShareAcquiredRecords acquire( // If nextBatchStartOffset is less than the key of the entry, this means the fetch happened for a gap in the cachedState. // Thus, a new batch needs to be acquired for the gap. if (maybeGapStartOffset < entry.getKey()) { + // It's safe to use entry.getKey() - 1 as the last offset to acquire for the + // gap as the sub map should contain either the next batch or this line should + // not have been executed i.e. say there is a gap from 10-20 and cache contains + // [0-9, 21-30], when fetch returns single/multiple batches from 0-15, then + // first sub map entry has no gap and there exists only 1 entry in sub map. + // Hence, for next batch the following code will not be executed and records + // from 10-15 will be acquired later in the code. In other case, when + // fetch returns batches from 0-25, then the sub map will have 2 entries and + // gap will be computed correctly. ShareAcquiredRecords shareAcquiredRecords = acquireNewBatchRecords(memberId, fetchPartitionData.records.batches(), maybeGapStartOffset, entry.getKey() - 1, batchSize, maxRecordsToAcquire); result.addAll(shareAcquiredRecords.acquiredRecords()); @@ -791,13 +802,13 @@ public ShareAcquiredRecords acquire( } // Compute if the batch is a full match. - boolean fullMatch = checkForFullMatch(inFlightBatch, firstBatch.baseOffset(), lastBatch.lastOffset()); + boolean fullMatch = checkForFullMatch(inFlightBatch, firstBatch.baseOffset(), lastOffsetToAcquire); if (!fullMatch || inFlightBatch.offsetState() != null) { log.trace("Subset or offset tracked batch record found for share partition," + " batch: {} request offsets - first: {}, last: {} for the share" + " partition: {}-{}", inFlightBatch, firstBatch.baseOffset(), - lastBatch.lastOffset(), groupId, topicIdPartition); + lastOffsetToAcquire, groupId, topicIdPartition); if (inFlightBatch.offsetState() == null) { // Though the request is a subset of in-flight batch but the offset // tracking has not been initialized yet which means that we could only @@ -884,7 +895,7 @@ public CompletableFuture acknowledge( CompletableFuture future = new CompletableFuture<>(); Throwable throwable = null; - List persisterBatches = new ArrayList<>(); + List persisterBatches = new ArrayList<>(); lock.writeLock().lock(); try { // Avoided using enhanced for loop as need to check if the last batch have offsets @@ -1518,8 +1529,15 @@ private LastOffsetAndMaxRecords lastOffsetAndMaxRecordsToAcquire(long fetchOffse // in-flight records limit. This can happen when the fetch is happening in-between // the in-flight batches and the share partition has reached the max in-flight records limit. maxRecordsToAcquire = Math.min(maxFetchRecords, (int) (endOffset() - fetchOffset + 1)); - // Adjust the last offset to acquire to the endOffset of the share partition. - lastOffsetToAcquire = endOffset(); + // Adjust the last offset to acquire to the minimum of fetched data's last or + // endOffset of the share partition. This is required as partition fetch bytes + // are dynamic and subsequent fetches can fetch lesser data i.e. lastOffset can + // be lesser than endOffset. + lastOffsetToAcquire = Math.min(lastOffset, endOffset()); + log.debug("Share partition {}-{} is at max in-flight records limit: {}. " + + "However, fetch is happening in-between the in-flight batches, hence adjusting " + + "last offset to: {} and max records to: {}", groupId, topicIdPartition, + maxInFlightRecords, lastOffsetToAcquire, maxRecordsToAcquire); } else { // The share partition is already at max in-flight records, hence cannot acquire more records. log.debug("Share partition {}-{} has reached max in-flight records limit: {}. Cannot acquire more records, inflight records count: {}", diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java b/core/src/test/java/kafka/server/share/SharePartitionTest.java index b3068abbedaa0..70f2099bd6abd 100644 --- a/core/src/test/java/kafka/server/share/SharePartitionTest.java +++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java @@ -2323,6 +2323,109 @@ public void testAcquireWithMaxInFlightRecordsAndReleaseLastOffset() { assertEquals(30, sharePartition.nextFetchOffset()); } + @Test + public void testAcquireWithMaxInFlightRecordsReleaseBatchAndAcquireSubsetRecords() { + SharePartition sharePartition = SharePartitionBuilder.builder() + .withState(SharePartitionState.ACTIVE) + .withSharePartitionMetrics(sharePartitionMetrics) + .withMaxInflightRecords(20) + .build(); + + // Acquire records, should be acquired till maxInFlightRecords i.e. 25 records till 24 offset. + fetchAcquiredRecords(sharePartition, memoryRecords(5), 5); + fetchAcquiredRecords(sharePartition, memoryRecords(5, 10), 10); + fetchAcquiredRecords(sharePartition, memoryRecords(15, 10), 10); + + // Validate 3 batches are fetched and fourth batch should be skipped. Max in-flight records + // limit is reached. + assertEquals(3, sharePartition.cachedState().size()); + assertEquals(0, sharePartition.cachedState().get(0L).firstOffset()); + assertEquals(4, sharePartition.cachedState().get(0L).lastOffset()); + assertEquals(5, sharePartition.cachedState().get(5L).firstOffset()); + assertEquals(14, sharePartition.cachedState().get(5L).lastOffset()); + assertEquals(15, sharePartition.cachedState().get(15L).firstOffset()); + assertEquals(24, sharePartition.cachedState().get(15L).lastOffset()); + assertEquals(25, sharePartition.nextFetchOffset()); + + // Release middle batch. + CompletableFuture ackResult = sharePartition.acknowledge( + MEMBER_ID, + List.of(new ShareAcknowledgementBatch(5, 14, List.of((byte) 2)))); + assertNull(ackResult.join()); + assertFalse(ackResult.isCompletedExceptionally()); + // Validate the nextFetchOffset is updated to 5. + assertEquals(5, sharePartition.nextFetchOffset()); + + // The complete released batch should be acquired but not any other batch as the lastOffset + // is adjusted according to the minimum of fetched batch and endOffset. + List acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire( + MEMBER_ID, + BATCH_SIZE, + 500 /* Max fetch records */, + 5 /* Fetch Offset */, + fetchPartitionData(memoryRecords(5, 10), 0), + FETCH_ISOLATION_HWM), + 10); + + // Validate 1 batch is fetched, with 10 records till end of batch. + assertArrayEquals(expectedAcquiredRecord(5, 14, 2).toArray(), acquiredRecordsList.toArray()); + assertEquals(25, sharePartition.nextFetchOffset()); + } + + @Test + public void testAcquireWithMaxInFlightRecordsReleaseBatchAndAcquireSubsetRecordsOverlap() { + SharePartition sharePartition = SharePartitionBuilder.builder() + .withState(SharePartitionState.ACTIVE) + .withSharePartitionMetrics(sharePartitionMetrics) + .withMaxInflightRecords(20) + .build(); + + // Acquire records, should be acquired till maxInFlightRecords i.e. 25 records till 24 offset. + fetchAcquiredRecords(sharePartition, memoryRecords(5), 5); + fetchAcquiredRecords(sharePartition, memoryRecords(5, 5), 5); + fetchAcquiredRecords(sharePartition, memoryRecords(10, 5), 5); + fetchAcquiredRecords(sharePartition, memoryRecords(15, 10), 10); + + // Validate 4 batches are fetched and fourth batch should be skipped. Max in-flight records + // limit is reached. + assertEquals(4, sharePartition.cachedState().size()); + assertEquals(0, sharePartition.cachedState().get(0L).firstOffset()); + assertEquals(4, sharePartition.cachedState().get(0L).lastOffset()); + assertEquals(5, sharePartition.cachedState().get(5L).firstOffset()); + assertEquals(9, sharePartition.cachedState().get(5L).lastOffset()); + assertEquals(10, sharePartition.cachedState().get(10L).firstOffset()); + assertEquals(14, sharePartition.cachedState().get(10L).lastOffset()); + assertEquals(15, sharePartition.cachedState().get(15L).firstOffset()); + assertEquals(24, sharePartition.cachedState().get(15L).lastOffset()); + assertEquals(25, sharePartition.nextFetchOffset()); + + // Release only 1 middle batch. + CompletableFuture ackResult = sharePartition.acknowledge( + MEMBER_ID, + List.of(new ShareAcknowledgementBatch(5, 9, List.of((byte) 2)))); + assertNull(ackResult.join()); + assertFalse(ackResult.isCompletedExceptionally()); + // Validate the nextFetchOffset is updated to 5. + assertEquals(5, sharePartition.nextFetchOffset()); + + // Adjust the max fetch records to 6 so it's just 1 record more than the released batch size. + // This shall not impact the acquired records as only the released batch should be acquired. + // However, this previously caused an issue where the subset of records were acquired from the + // next batch due to incorrect calculation. + List acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire( + MEMBER_ID, + BATCH_SIZE, + 6 /* Max fetch records */, + 5 /* Fetch Offset */, + fetchPartitionData(memoryRecords(5, 5), 0), + FETCH_ISOLATION_HWM), + 5); + + // Validate 1 batch is fetched, with 5 records till end of batch. + assertArrayEquals(expectedAcquiredRecord(5, 9, 2).toArray(), acquiredRecordsList.toArray()); + assertEquals(25, sharePartition.nextFetchOffset()); + } + @Test public void testNextFetchOffsetInitialState() { SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build();