Skip to content

Commit 96ef1c5

Browse files
KAFKA-19436: Restrict cache update for ongoing batch/offset state (#20041)
In the stress testing it was noticed that on acquisition lock timeout, some offsets were not found in the cache. The cache can be tried to be updated in different acknowledgement calls hence if there is an ongoing transition which is not yet finished but another parallel acknowledgement triggers the cache update then the cache can be updated incorrectly, while first transition is not yet finished. Though the cache update happens for Archived and Acknowldeged records hence this issue or existing implementation should not hamper the queues functionality. But it might update the cache early when persister call might fail or this issue triggers error logs with offset not found in cache when acquisition lock timeouts (in some scenarios). Reviewers: Abhinav Dixit <adixit@confluent.io>, Andrew Schofield <aschofield@confluent.io>
1 parent 293043d commit 96ef1c5

File tree

2 files changed

+115
-6
lines changed

2 files changed

+115
-6
lines changed

core/src/main/java/kafka/server/share/SharePartition.java

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2178,7 +2178,8 @@ be removed once all the messages (0-99) are acknowledged (ACCEPT or REJECT).
21782178
}
21792179
}
21802180

2181-
private boolean canMoveStartOffset() {
2181+
// Visible for testing.
2182+
boolean canMoveStartOffset() {
21822183
// The Share Partition Start Offset may be moved after acknowledge request is complete.
21832184
// The following conditions need to be met to move the startOffset:
21842185
// 1. When the cachedState is not empty.
@@ -2203,7 +2204,15 @@ private boolean canMoveStartOffset() {
22032204
"as there is an acquirable gap at the beginning. Cannot move the start offset.", startOffset, groupId, topicIdPartition);
22042205
return false;
22052206
}
2206-
RecordState startOffsetState = entry.getValue().offsetState == null ?
2207+
boolean isBatchState = entry.getValue().offsetState() == null;
2208+
boolean isOngoingTransition = isBatchState ?
2209+
entry.getValue().batchHasOngoingStateTransition() :
2210+
entry.getValue().offsetState().get(startOffset).hasOngoingStateTransition();
2211+
if (isOngoingTransition) {
2212+
return false;
2213+
}
2214+
2215+
RecordState startOffsetState = isBatchState ?
22072216
entry.getValue().batchState() :
22082217
entry.getValue().offsetState().get(startOffset).state();
22092218
return isRecordStateAcknowledged(startOffsetState);
@@ -2238,13 +2247,13 @@ long findLastOffsetAcknowledged() {
22382247
}
22392248

22402249
if (inFlightBatch.offsetState() == null) {
2241-
if (!isRecordStateAcknowledged(inFlightBatch.batchState())) {
2250+
if (inFlightBatch.batchHasOngoingStateTransition() || !isRecordStateAcknowledged(inFlightBatch.batchState())) {
22422251
return lastOffsetAcknowledged;
22432252
}
22442253
lastOffsetAcknowledged = inFlightBatch.lastOffset();
22452254
} else {
22462255
for (Map.Entry<Long, InFlightState> offsetState : inFlightBatch.offsetState.entrySet()) {
2247-
if (!isRecordStateAcknowledged(offsetState.getValue().state())) {
2256+
if (offsetState.getValue().hasOngoingStateTransition() || !isRecordStateAcknowledged(offsetState.getValue().state())) {
22482257
return lastOffsetAcknowledged;
22492258
}
22502259
lastOffsetAcknowledged = offsetState.getKey();
@@ -2921,7 +2930,8 @@ private InFlightState inFlightState() {
29212930
return batchState;
29222931
}
29232932

2924-
private boolean batchHasOngoingStateTransition() {
2933+
// Visible for testing.
2934+
boolean batchHasOngoingStateTransition() {
29252935
return inFlightState().hasOngoingStateTransition();
29262936
}
29272937

@@ -3042,7 +3052,8 @@ void cancelAndClearAcquisitionLockTimeoutTask() {
30423052
acquisitionLockTimeoutTask = null;
30433053
}
30443054

3045-
private boolean hasOngoingStateTransition() {
3055+
// Visible for testing.
3056+
boolean hasOngoingStateTransition() {
30463057
if (rollbackState == null) {
30473058
// This case could occur when the batch/offset hasn't transitioned even once or the state transitions have
30483059
// been committed.
@@ -3075,6 +3086,7 @@ private InFlightState tryUpdateState(RecordState newState, DeliveryCountOps ops,
30753086
return this;
30763087
} catch (IllegalStateException e) {
30773088
log.error("Failed to update state of the records", e);
3089+
rollbackState = null;
30783090
return null;
30793091
}
30803092
}

core/src/test/java/kafka/server/share/SharePartitionTest.java

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6479,6 +6479,103 @@ public void testFindLastOffsetAcknowledgedWhenGapAtBeginning() {
64796479
assertEquals(-1, lastOffsetAcknowledged);
64806480
}
64816481

6482+
@Test
6483+
public void testCacheUpdateWhenBatchHasOngoingTransition() {
6484+
Persister persister = Mockito.mock(Persister.class);
6485+
6486+
SharePartition sharePartition = SharePartitionBuilder.builder()
6487+
.withState(SharePartitionState.ACTIVE)
6488+
.withPersister(persister)
6489+
.build();
6490+
// Acquire a single batch.
6491+
fetchAcquiredRecords(
6492+
sharePartition.acquire(MEMBER_ID, BATCH_SIZE, MAX_FETCH_RECORDS, 21,
6493+
fetchPartitionData(memoryRecords(10, 21)), FETCH_ISOLATION_HWM
6494+
), 10
6495+
);
6496+
6497+
// Validate that there is no ongoing transition.
6498+
assertFalse(sharePartition.cachedState().get(21L).batchHasOngoingStateTransition());
6499+
// Return a future which will be completed later, so the batch state has ongoing transition.
6500+
CompletableFuture<WriteShareGroupStateResult> future = new CompletableFuture<>();
6501+
Mockito.when(persister.writeState(Mockito.any())).thenReturn(future);
6502+
// Acknowledge batch to create ongoing transition.
6503+
sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(21, 30, List.of(AcknowledgeType.ACCEPT.id))));
6504+
6505+
// Assert the start offset has not moved and batch has ongoing transition.
6506+
assertEquals(21L, sharePartition.startOffset());
6507+
assertEquals(1, sharePartition.cachedState().size());
6508+
assertTrue(sharePartition.cachedState().get(21L).batchHasOngoingStateTransition());
6509+
6510+
// Validate that offset can't be moved because batch has ongoing transition.
6511+
assertFalse(sharePartition.canMoveStartOffset());
6512+
assertEquals(-1, sharePartition.findLastOffsetAcknowledged());
6513+
6514+
// Complete the future so acknowledge API can be completed, which updates the cache.
6515+
WriteShareGroupStateResult writeShareGroupStateResult = Mockito.mock(WriteShareGroupStateResult.class);
6516+
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of(
6517+
new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
6518+
PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(), Errors.NONE.message())))));
6519+
future.complete(writeShareGroupStateResult);
6520+
6521+
// Validate the cache has been updated.
6522+
assertEquals(31L, sharePartition.startOffset());
6523+
assertTrue(sharePartition.cachedState().isEmpty());
6524+
}
6525+
6526+
@Test
6527+
public void testCacheUpdateWhenOffsetStateHasOngoingTransition() {
6528+
Persister persister = Mockito.mock(Persister.class);
6529+
6530+
SharePartition sharePartition = SharePartitionBuilder.builder()
6531+
.withState(SharePartitionState.ACTIVE)
6532+
.withPersister(persister)
6533+
.build();
6534+
// Acquire a single batch.
6535+
fetchAcquiredRecords(
6536+
sharePartition.acquire(MEMBER_ID, BATCH_SIZE, MAX_FETCH_RECORDS, 21,
6537+
fetchPartitionData(memoryRecords(10, 21)), FETCH_ISOLATION_HWM
6538+
), 10
6539+
);
6540+
6541+
// Validate that there is no ongoing transition.
6542+
assertFalse(sharePartition.cachedState().get(21L).batchHasOngoingStateTransition());
6543+
assertNull(sharePartition.cachedState().get(21L).offsetState());
6544+
// Return a future which will be completed later, so the batch state has ongoing transition.
6545+
CompletableFuture<WriteShareGroupStateResult> future = new CompletableFuture<>();
6546+
Mockito.when(persister.writeState(Mockito.any())).thenReturn(future);
6547+
// Acknowledge offsets to create ongoing transition.
6548+
sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(21, 23, List.of(AcknowledgeType.ACCEPT.id))));
6549+
6550+
// Assert the start offset has not moved and offset state is now maintained. Offset state should
6551+
// have ongoing transition.
6552+
assertEquals(21L, sharePartition.startOffset());
6553+
assertEquals(1, sharePartition.cachedState().size());
6554+
assertNotNull(sharePartition.cachedState().get(21L).offsetState());
6555+
assertTrue(sharePartition.cachedState().get(21L).offsetState().get(21L).hasOngoingStateTransition());
6556+
assertTrue(sharePartition.cachedState().get(21L).offsetState().get(22L).hasOngoingStateTransition());
6557+
assertTrue(sharePartition.cachedState().get(21L).offsetState().get(23L).hasOngoingStateTransition());
6558+
// Only 21, 22 and 23 offsets should have ongoing state transition as the acknowledge request
6559+
// contains 21-23 offsets.
6560+
assertFalse(sharePartition.cachedState().get(21L).offsetState().get(24L).hasOngoingStateTransition());
6561+
6562+
// Validate that offset can't be moved because batch has ongoing transition.
6563+
assertFalse(sharePartition.canMoveStartOffset());
6564+
assertEquals(-1, sharePartition.findLastOffsetAcknowledged());
6565+
6566+
// Complete the future so acknowledge API can be completed, which updates the cache.
6567+
WriteShareGroupStateResult writeShareGroupStateResult = Mockito.mock(WriteShareGroupStateResult.class);
6568+
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of(
6569+
new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
6570+
PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(), Errors.NONE.message())))));
6571+
future.complete(writeShareGroupStateResult);
6572+
6573+
// Validate the cache has been updated.
6574+
assertEquals(24L, sharePartition.startOffset());
6575+
assertEquals(1, sharePartition.cachedState().size());
6576+
assertNotNull(sharePartition.cachedState().get(21L));
6577+
}
6578+
64826579
/**
64836580
* Test the case where the fetch batch has first record offset greater than the record batch start offset.
64846581
* Such batches can exist for compacted topics.

0 commit comments

Comments
 (0)