Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions core/src/main/java/kafka/server/share/ShareFetchUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -274,4 +274,47 @@ public static int recordLockDurationMsOrDefault(GroupConfigManager groupConfigMa
}
return defaultValue;
}

/**
* Merges contiguous AcquiredRecords with the same delivery count into single records.
* <p>
* This method takes a list of AcquiredRecords where firstOffset and lastOffset typically are the
* same (representing single offsets), but not necessarily required, and merges contiguous offsets
* that have the same delivery count into ranges.
*
* @param result the list to accumulate merged AcquiredRecords into
* @param acquiredRecords the sorted list of AcquiredRecords to merge
*/
static void accumulateAcquiredRecords(List<AcquiredRecords> result, List<AcquiredRecords> acquiredRecords) {
if (acquiredRecords.isEmpty()) {
return;
}

long firstOffset = acquiredRecords.get(0).firstOffset();
long lastOffset = acquiredRecords.get(0).lastOffset();
short deliveryCount = acquiredRecords.get(0).deliveryCount();

for (int i = 1; i < acquiredRecords.size(); i++) {
AcquiredRecords current = acquiredRecords.get(i);
if (current.firstOffset() == lastOffset + 1 && deliveryCount == current.deliveryCount()) {
// Extend the last offset.
lastOffset = current.lastOffset();
} else {
// Append the current accumulated batch and start a new batch.
result.add(new AcquiredRecords()
.setFirstOffset(firstOffset)
.setLastOffset(lastOffset)
.setDeliveryCount(deliveryCount));
// Reset the accumulation variables to the current acquired records.
firstOffset = current.firstOffset();
lastOffset = current.lastOffset();
deliveryCount = current.deliveryCount();
}
}
// Add the last accumulated batch.
result.add(new AcquiredRecords()
.setFirstOffset(firstOffset)
.setLastOffset(lastOffset)
.setDeliveryCount(deliveryCount));
}
}
9 changes: 6 additions & 3 deletions core/src/main/java/kafka/server/share/SharePartition.java
Original file line number Diff line number Diff line change
Expand Up @@ -1921,10 +1921,11 @@ private int acquireSubsetBatchRecords(
InFlightBatch inFlightBatch,
List<AcquiredRecords> result
) {
lock.writeLock().lock();
int acquiredCount = 0;
long maxFetchRecordsWhileThrottledRecords = -1;
boolean hasThrottledRecord = false;
List<AcquiredRecords> offsetAcquiredRecords = new ArrayList<>();
lock.writeLock().lock();
try {
for (Map.Entry<Long, InFlightState> offsetState : inFlightBatch.offsetState().entrySet()) {
// For the first batch which might have offsets prior to the request base
Expand Down Expand Up @@ -1986,8 +1987,7 @@ private int acquireSubsetBatchRecords(
// Update acquisition lock timeout task for the offset.
offsetState.getValue().updateAcquisitionLockTimeoutTask(acquisitionLockTimeoutTask);

// TODO: Maybe we can club the continuous offsets here.
result.add(new AcquiredRecords()
offsetAcquiredRecords.add(new AcquiredRecords()
.setFirstOffset(offsetState.getKey())
.setLastOffset(offsetState.getKey())
.setDeliveryCount((short) offsetState.getValue().deliveryCount()));
Expand All @@ -2008,6 +2008,9 @@ private int acquireSubsetBatchRecords(
} finally {
lock.writeLock().unlock();
}

// Accumulate the acquired records for the offset acquired records in the result.
ShareFetchUtils.accumulateAcquiredRecords(result, offsetAcquiredRecords);
return acquiredCount;
}

Expand Down
99 changes: 99 additions & 0 deletions core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -68,6 +69,7 @@
import static org.apache.kafka.server.share.fetch.ShareFetchTestUtils.createFileRecords;
import static org.apache.kafka.server.share.fetch.ShareFetchTestUtils.createShareAcquiredRecords;
import static org.apache.kafka.server.share.fetch.ShareFetchTestUtils.memoryRecordsBuilder;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
Expand Down Expand Up @@ -628,6 +630,103 @@ public void testMaybeSliceFetchRecordsException(String name, Records records) {
assertEquals(records, slicedRecords);
}

@Test
void testAccumulateAcquiredRecords() {
List<AcquiredRecords> input = List.of(
new AcquiredRecords().setFirstOffset(0).setLastOffset(0).setDeliveryCount((short) 1),
new AcquiredRecords().setFirstOffset(1).setLastOffset(1).setDeliveryCount((short) 1),
new AcquiredRecords().setFirstOffset(2).setLastOffset(2).setDeliveryCount((short) 2),
new AcquiredRecords().setFirstOffset(4).setLastOffset(4).setDeliveryCount((short) 2),
new AcquiredRecords().setFirstOffset(5).setLastOffset(5).setDeliveryCount((short) 2)
);

List<AcquiredRecords> result = new ArrayList<>();
ShareFetchUtils.accumulateAcquiredRecords(result, input);
List<AcquiredRecords> expected = List.of(
new AcquiredRecords().setFirstOffset(0).setLastOffset(1).setDeliveryCount((short) 1),
new AcquiredRecords().setFirstOffset(2).setLastOffset(2).setDeliveryCount((short) 2),
new AcquiredRecords().setFirstOffset(4).setLastOffset(5).setDeliveryCount((short) 2)
);
assertArrayEquals(expected.toArray(), result.toArray());
}

@Test
void testAccumulateAcquiredRecordsAllBatches() {
List<AcquiredRecords> input = List.of(
new AcquiredRecords().setFirstOffset(0).setLastOffset(0).setDeliveryCount((short) 1),
new AcquiredRecords().setFirstOffset(1).setLastOffset(1).setDeliveryCount((short) 1),
new AcquiredRecords().setFirstOffset(2).setLastOffset(2).setDeliveryCount((short) 1),
new AcquiredRecords().setFirstOffset(3).setLastOffset(3).setDeliveryCount((short) 1),
new AcquiredRecords().setFirstOffset(4).setLastOffset(4).setDeliveryCount((short) 1)
);

List<AcquiredRecords> result = new ArrayList<>();
ShareFetchUtils.accumulateAcquiredRecords(result, input);
List<AcquiredRecords> expected = List.of(
new AcquiredRecords().setFirstOffset(0).setLastOffset(4).setDeliveryCount((short) 1)
);
assertArrayEquals(expected.toArray(), result.toArray());
}

@Test
void testAccumulateAcquiredRecordsWithRanges() {
List<AcquiredRecords> input = List.of(
new AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1),
new AcquiredRecords().setFirstOffset(4).setLastOffset(4).setDeliveryCount((short) 1),
new AcquiredRecords().setFirstOffset(5).setLastOffset(8).setDeliveryCount((short) 2),
new AcquiredRecords().setFirstOffset(10).setLastOffset(15).setDeliveryCount((short) 2),
new AcquiredRecords().setFirstOffset(16).setLastOffset(20).setDeliveryCount((short) 2)
);

List<AcquiredRecords> result = new ArrayList<>();
ShareFetchUtils.accumulateAcquiredRecords(result, input);
List<AcquiredRecords> expected = List.of(
new AcquiredRecords().setFirstOffset(0).setLastOffset(4).setDeliveryCount((short) 1),
new AcquiredRecords().setFirstOffset(5).setLastOffset(8).setDeliveryCount((short) 2),
new AcquiredRecords().setFirstOffset(10).setLastOffset(20).setDeliveryCount((short) 2)
);
assertArrayEquals(expected.toArray(), result.toArray());
}

@Test
void testAccumulateAcquiredRecordsEmptyList() {
List<AcquiredRecords> result = new ArrayList<>();
ShareFetchUtils.accumulateAcquiredRecords(result, List.of());
assertTrue(result.isEmpty());
}

@Test
void testAccumulateAcquiredRecordsSingleRecord() {
List<AcquiredRecords> result = new ArrayList<>();
List<AcquiredRecords> input = List.of(
new AcquiredRecords().setFirstOffset(5).setLastOffset(5).setDeliveryCount((short) 4));
ShareFetchUtils.accumulateAcquiredRecords(result, input);
assertArrayEquals(input.toArray(), result.toArray());
}

@Test
void testAccumulateAcquiredRecordsNoMerging() {
List<AcquiredRecords> input = List.of(
new AcquiredRecords().setFirstOffset(0).setLastOffset(0).setDeliveryCount((short) 1),
new AcquiredRecords().setFirstOffset(2).setLastOffset(2).setDeliveryCount((short) 1),
new AcquiredRecords().setFirstOffset(4).setLastOffset(4).setDeliveryCount((short) 1)
);

List<AcquiredRecords> result = new ArrayList<>();
ShareFetchUtils.accumulateAcquiredRecords(result, input);
assertArrayEquals(input.toArray(), result.toArray());

input = List.of(
new AcquiredRecords().setFirstOffset(0).setLastOffset(0).setDeliveryCount((short) 1),
new AcquiredRecords().setFirstOffset(1).setLastOffset(1).setDeliveryCount((short) 2),
new AcquiredRecords().setFirstOffset(2).setLastOffset(2).setDeliveryCount((short) 3)
);

result = new ArrayList<>();
ShareFetchUtils.accumulateAcquiredRecords(result, input);
assertArrayEquals(input.toArray(), result.toArray());
}

private static class RecordsArgumentsProvider implements ArgumentsProvider {
@Override
public Stream<? extends Arguments> provideArguments(ExtensionContext context) throws Exception {
Expand Down
Loading
Loading