Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 28 additions & 10 deletions core/src/main/java/kafka/server/share/SharePartition.java
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ enum SharePartitionState {
* or completes with an exception if the share partition is in non-initializable state.
*/
public CompletableFuture<Void> maybeInitialize() {
log.debug("Maybe initialize share partition: {}-{}", groupId, topicIdPartition);
log.trace("Maybe initialize share partition: {}-{}", groupId, topicIdPartition);
// Check if the share partition is already initialized.
try {
if (initializedOrThrowException()) return CompletableFuture.completedFuture(null);
Expand Down Expand Up @@ -487,6 +487,8 @@ public CompletableFuture<Void> maybeInitialize() {
}
// Set the partition state to Active and complete the future.
partitionState = SharePartitionState.ACTIVE;
log.debug("Initialized share partition: {}-{} with persister read state: {}, cached state: {}",
groupId, topicIdPartition, result.topicsData(), cachedState);
} catch (Exception e) {
throwable = e;
} finally {
Expand Down Expand Up @@ -738,16 +740,16 @@ public ShareAcquiredRecords acquire(
baseOffset = floorEntry.getKey();
}
// Validate if the fetch records are already part of existing batches and if available.
NavigableMap<Long, InFlightBatch> subMap = cachedState.subMap(baseOffset, true, lastBatch.lastOffset(), true);
NavigableMap<Long, InFlightBatch> subMap = cachedState.subMap(baseOffset, true, lastOffsetToAcquire, true);
// No overlap with request offsets in the cache for in-flight records. Acquire the complete
// batch.
if (subMap.isEmpty()) {
log.trace("No cached data exists for the share partition for requested fetch batch: {}-{}",
groupId, topicIdPartition);
// Do not send the lastOffsetToAcquire as when the subMap is empty, it means that
// there isn't any overlap itself.
// It's safe to use lastOffsetToAcquire instead of lastBatch.lastOffset() because there is no
// overlap hence the lastOffsetToAcquire is same as lastBatch.lastOffset() or before that.
ShareAcquiredRecords shareAcquiredRecords = acquireNewBatchRecords(memberId, fetchPartitionData.records.batches(),
firstBatch.baseOffset(), lastBatch.lastOffset(), batchSize, maxRecordsToAcquire);
firstBatch.baseOffset(), lastOffsetToAcquire, batchSize, maxRecordsToAcquire);
return maybeFilterAbortedTransactionalAcquiredRecords(fetchPartitionData, isolationLevel, shareAcquiredRecords);
}

Expand Down Expand Up @@ -776,6 +778,15 @@ public ShareAcquiredRecords acquire(
// If nextBatchStartOffset is less than the key of the entry, this means the fetch happened for a gap in the cachedState.
// Thus, a new batch needs to be acquired for the gap.
if (maybeGapStartOffset < entry.getKey()) {
// It's safe to use entry.getKey() - 1 as the last offset to acquire for the
// gap as the sub map should contain either the next batch or this line should
// not have been executed i.e. say there is a gap from 10-20 and cache contains
// [0-9, 21-30], when fetch returns single/multiple batches from 0-15, then
// first sub map entry has no gap and there exists only 1 entry in sub map.
// Hence, for next batch the following code will not be executed and records
// from 10-15 will be acquired later in the code. In other case, when
// fetch returns batches from 0-25, then the sub map will have 2 entries and
// gap will be computed correctly.
ShareAcquiredRecords shareAcquiredRecords = acquireNewBatchRecords(memberId, fetchPartitionData.records.batches(),
maybeGapStartOffset, entry.getKey() - 1, batchSize, maxRecordsToAcquire);
result.addAll(shareAcquiredRecords.acquiredRecords());
Expand All @@ -791,13 +802,13 @@ public ShareAcquiredRecords acquire(
}

// Compute if the batch is a full match.
boolean fullMatch = checkForFullMatch(inFlightBatch, firstBatch.baseOffset(), lastBatch.lastOffset());
boolean fullMatch = checkForFullMatch(inFlightBatch, firstBatch.baseOffset(), lastOffsetToAcquire);

if (!fullMatch || inFlightBatch.offsetState() != null) {
log.trace("Subset or offset tracked batch record found for share partition,"
+ " batch: {} request offsets - first: {}, last: {} for the share"
+ " partition: {}-{}", inFlightBatch, firstBatch.baseOffset(),
lastBatch.lastOffset(), groupId, topicIdPartition);
lastOffsetToAcquire, groupId, topicIdPartition);
if (inFlightBatch.offsetState() == null) {
// Though the request is a subset of in-flight batch but the offset
// tracking has not been initialized yet which means that we could only
Expand Down Expand Up @@ -884,7 +895,7 @@ public CompletableFuture<Void> acknowledge(

CompletableFuture<Void> future = new CompletableFuture<>();
Throwable throwable = null;
List<PersisterBatch> persisterBatches = new ArrayList<>();
List<PersisterBatch> persisterBatches = new ArrayList<>();
lock.writeLock().lock();
try {
// Avoided using enhanced for loop as need to check if the last batch have offsets
Expand Down Expand Up @@ -1518,8 +1529,15 @@ private LastOffsetAndMaxRecords lastOffsetAndMaxRecordsToAcquire(long fetchOffse
// in-flight records limit. This can happen when the fetch is happening in-between
// the in-flight batches and the share partition has reached the max in-flight records limit.
maxRecordsToAcquire = Math.min(maxFetchRecords, (int) (endOffset() - fetchOffset + 1));
// Adjust the last offset to acquire to the endOffset of the share partition.
lastOffsetToAcquire = endOffset();
// Adjust the last offset to acquire to the minimum of fetched data's last or
// endOffset of the share partition. This is required as partition fetch bytes
// are dynamic and subsequent fetches can fetch lesser data i.e. lastOffset can
// be lesser than endOffset.
lastOffsetToAcquire = Math.min(lastOffset, endOffset());
log.debug("Share partition {}-{} is at max in-flight records limit: {}. "
+ "However, fetch is happening in-between the in-flight batches, hence adjusting "
+ "last offset to: {} and max records to: {}", groupId, topicIdPartition,
maxInFlightRecords, lastOffsetToAcquire, maxRecordsToAcquire);
} else {
// The share partition is already at max in-flight records, hence cannot acquire more records.
log.debug("Share partition {}-{} has reached max in-flight records limit: {}. Cannot acquire more records, inflight records count: {}",
Expand Down
103 changes: 103 additions & 0 deletions core/src/test/java/kafka/server/share/SharePartitionTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -2323,6 +2323,109 @@ public void testAcquireWithMaxInFlightRecordsAndReleaseLastOffset() {
assertEquals(30, sharePartition.nextFetchOffset());
}

@Test
public void testAcquireWithMaxInFlightRecordsReleaseBatchAndAcquireSubsetRecords() {
SharePartition sharePartition = SharePartitionBuilder.builder()
.withState(SharePartitionState.ACTIVE)
.withSharePartitionMetrics(sharePartitionMetrics)
.withMaxInflightRecords(20)
.build();

// Acquire records, should be acquired till maxInFlightRecords i.e. 25 records till 24 offset.
fetchAcquiredRecords(sharePartition, memoryRecords(5), 5);
fetchAcquiredRecords(sharePartition, memoryRecords(5, 10), 10);
fetchAcquiredRecords(sharePartition, memoryRecords(15, 10), 10);

// Validate 3 batches are fetched and fourth batch should be skipped. Max in-flight records
// limit is reached.
assertEquals(3, sharePartition.cachedState().size());
assertEquals(0, sharePartition.cachedState().get(0L).firstOffset());
assertEquals(4, sharePartition.cachedState().get(0L).lastOffset());
assertEquals(5, sharePartition.cachedState().get(5L).firstOffset());
assertEquals(14, sharePartition.cachedState().get(5L).lastOffset());
assertEquals(15, sharePartition.cachedState().get(15L).firstOffset());
assertEquals(24, sharePartition.cachedState().get(15L).lastOffset());
assertEquals(25, sharePartition.nextFetchOffset());

// Release middle batch.
CompletableFuture<Void> ackResult = sharePartition.acknowledge(
MEMBER_ID,
List.of(new ShareAcknowledgementBatch(5, 14, List.of((byte) 2))));
assertNull(ackResult.join());
assertFalse(ackResult.isCompletedExceptionally());
// Validate the nextFetchOffset is updated to 5.
assertEquals(5, sharePartition.nextFetchOffset());

// The complete released batch should be acquired but not any other batch as the lastOffset
// is adjusted according to the minimum of fetched batch and endOffset.
List<AcquiredRecords> acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
MEMBER_ID,
BATCH_SIZE,
500 /* Max fetch records */,
5 /* Fetch Offset */,
fetchPartitionData(memoryRecords(5, 10), 0),
FETCH_ISOLATION_HWM),
10);

// Validate 1 batch is fetched, with 10 records till end of batch.
assertArrayEquals(expectedAcquiredRecord(5, 14, 2).toArray(), acquiredRecordsList.toArray());
assertEquals(25, sharePartition.nextFetchOffset());
}

@Test
public void testAcquireWithMaxInFlightRecordsReleaseBatchAndAcquireSubsetRecordsOverlap() {
SharePartition sharePartition = SharePartitionBuilder.builder()
.withState(SharePartitionState.ACTIVE)
.withSharePartitionMetrics(sharePartitionMetrics)
.withMaxInflightRecords(20)
.build();

// Acquire records, should be acquired till maxInFlightRecords i.e. 25 records till 24 offset.
fetchAcquiredRecords(sharePartition, memoryRecords(5), 5);
fetchAcquiredRecords(sharePartition, memoryRecords(5, 5), 5);
fetchAcquiredRecords(sharePartition, memoryRecords(10, 5), 5);
fetchAcquiredRecords(sharePartition, memoryRecords(15, 10), 10);

// Validate 4 batches are fetched and fourth batch should be skipped. Max in-flight records
// limit is reached.
assertEquals(4, sharePartition.cachedState().size());
assertEquals(0, sharePartition.cachedState().get(0L).firstOffset());
assertEquals(4, sharePartition.cachedState().get(0L).lastOffset());
assertEquals(5, sharePartition.cachedState().get(5L).firstOffset());
assertEquals(9, sharePartition.cachedState().get(5L).lastOffset());
assertEquals(10, sharePartition.cachedState().get(10L).firstOffset());
assertEquals(14, sharePartition.cachedState().get(10L).lastOffset());
assertEquals(15, sharePartition.cachedState().get(15L).firstOffset());
assertEquals(24, sharePartition.cachedState().get(15L).lastOffset());
assertEquals(25, sharePartition.nextFetchOffset());

// Release only 1 middle batch.
CompletableFuture<Void> ackResult = sharePartition.acknowledge(
MEMBER_ID,
List.of(new ShareAcknowledgementBatch(5, 9, List.of((byte) 2))));
assertNull(ackResult.join());
assertFalse(ackResult.isCompletedExceptionally());
// Validate the nextFetchOffset is updated to 5.
assertEquals(5, sharePartition.nextFetchOffset());

// Adjust the max fetch records to 6 so it's just 1 record more than the released batch size.
// This shall not impact the acquired records as only the released batch should be acquired.
// However, this previously caused an issue where the subset of records were acquired from the
// next batch due to incorrect calculation.
List<AcquiredRecords> acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
MEMBER_ID,
BATCH_SIZE,
6 /* Max fetch records */,
5 /* Fetch Offset */,
fetchPartitionData(memoryRecords(5, 5), 0),
FETCH_ISOLATION_HWM),
5);

// Validate 1 batch is fetched, with 5 records till end of batch.
assertArrayEquals(expectedAcquiredRecord(5, 9, 2).toArray(), acquiredRecordsList.toArray());
assertEquals(25, sharePartition.nextFetchOffset());
}

@Test
public void testNextFetchOffsetInitialState() {
SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build();
Expand Down