Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16663: cancel write timeout TimerTask on successful event completion #15902

Merged
merged 2 commits into from
May 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,8 @@ class CoordinatorWriteEvent<T> implements CoordinatorEvent, DeferredEvent {
*/
final Duration writeTimeout;

private TimerTask writeTimeoutTask = null;

/**
* The result of the write operation. It could be null
* if an exception is thrown before it is assigned.
Expand Down Expand Up @@ -751,7 +753,7 @@ public void run() {
// Add the response to the deferred queue.
if (!future.isDone()) {
context.deferredEventQueue.add(offset, this);
timer.add(new TimerTask(writeTimeout.toMillis()) {
writeTimeoutTask = new TimerTask(writeTimeout.toMillis()) {
@Override
public void run() {
if (!future.isDone()) {
Expand All @@ -762,7 +764,8 @@ public void run() {
);
}
}
});
};
timer.add(writeTimeoutTask);
} else {
complete(null);
}
Expand Down Expand Up @@ -794,6 +797,11 @@ public void complete(Throwable exception) {
if (appendFuture != null) result.appendFuture().completeExceptionally(exception);
future.completeExceptionally(exception);
}

if (writeTimeoutTask != null) {
writeTimeoutTask.cancel();
writeTimeoutTask = null;
}
}

@Override
Expand Down Expand Up @@ -980,6 +988,8 @@ class CoordinatorCompleteTransactionEvent implements CoordinatorEvent, DeferredE
*/
final Duration writeTimeout;

private TimerTask writeTimeoutTask = null;

/**
* The future that will be completed with the response
* generated by the write operation or an error.
Expand Down Expand Up @@ -1047,7 +1057,7 @@ public void run() {

if (!future.isDone()) {
context.deferredEventQueue.add(offset, this);
timer.add(new TimerTask(writeTimeout.toMillis()) {
writeTimeoutTask = new TimerTask(writeTimeout.toMillis()) {
@Override
public void run() {
if (!future.isDone()) {
Expand All @@ -1059,7 +1069,8 @@ public void run() {
);
}
}
});
};
timer.add(writeTimeoutTask);
} else {
complete(null);
}
Expand All @@ -1086,6 +1097,11 @@ public void complete(Throwable exception) {
} else {
future.completeExceptionally(exception);
}

if (writeTimeoutTask != null) {
writeTimeoutTask.cancel();
writeTimeoutTask = null;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2680,6 +2680,147 @@ public void testHighWatermarkUpdate() {
assertTrue(write2.isDone());
}

@Test
public void testWriteEventWriteTimeoutTaskIsCancelledWhenHighWatermarkIsUpdated() {
MockTimer timer = new MockTimer();
MockPartitionWriter writer = new MockPartitionWriter();
ManualEventProcessor processor = new ManualEventProcessor();

CoordinatorRuntime<MockCoordinatorShard, String> runtime =
new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
.withTime(timer.time())
.withTimer(timer)
.withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT)
.withLoader(new MockCoordinatorLoader())
.withEventProcessor(processor)
.withPartitionWriter(writer)
.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
.build();

// Loads the coordinator. Poll once to execute the load operation and once
// to complete the load.
runtime.scheduleLoadOperation(TP, 10);
processor.poll();
processor.poll();

// Write#1.
CompletableFuture<String> write1 = runtime.scheduleWriteOperation("Write#1", TP, DEFAULT_WRITE_TIMEOUT,
state -> new CoordinatorResult<>(Collections.singletonList("record1"), "response1")
);
processor.poll();

// Write#2.
CompletableFuture<String> write2 = runtime.scheduleWriteOperation("Write#2", TP, DEFAULT_WRITE_TIMEOUT,
state -> new CoordinatorResult<>(Collections.singletonList("record2"), "response2")
);
processor.poll();

// Records have been written to the log.
assertEquals(Arrays.asList(
InMemoryPartitionWriter.LogEntry.value("record1"),
InMemoryPartitionWriter.LogEntry.value("record2")
), writer.entries(TP));

// The write timeout tasks exist.
assertEquals(2, timer.size());

// Commit the first record.
writer.commit(TP, 1);

// Commit the second record.
writer.commit(TP, 2);

// We should still have one pending event and the pending high watermark should be updated.
assertEquals(1, processor.size());
assertEquals(2, runtime.contextOrThrow(TP).highWatermarklistener.lastHighWatermark());

// The write timeout tasks should have not yet been cancelled.
assertEquals(2, timer.size());
timer.taskQueue().forEach(taskEntry -> assertFalse(taskEntry.cancelled()));

// Poll once to process the high watermark update and complete the writes.
processor.poll();

assertEquals(-1, runtime.contextOrThrow(TP).highWatermarklistener.lastHighWatermark());
assertEquals(2, runtime.contextOrThrow(TP).coordinator.lastCommittedOffset());
assertTrue(write1.isDone());
assertTrue(write2.isDone());

// All timer tasks have been cancelled. TimerTask entries are not removed in MockTimer.
assertEquals(2, timer.size());
timer.taskQueue().forEach(taskEntry -> assertTrue(taskEntry.cancelled()));
}

@Test
public void testCoordinatorCompleteTransactionEventWriteTimeoutTaskIsCancelledWhenHighWatermarkIsUpdated() {
MockTimer timer = new MockTimer();
MockPartitionWriter writer = new MockPartitionWriter();
ManualEventProcessor processor = new ManualEventProcessor();

CoordinatorRuntime<MockCoordinatorShard, String> runtime =
new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
.withTime(timer.time())
.withTimer(timer)
.withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT)
.withLoader(new MockCoordinatorLoader())
.withEventProcessor(processor)
.withPartitionWriter(writer)
.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
.build();

// Loads the coordinator. Poll once to execute the load operation and once
// to complete the load.
runtime.scheduleLoadOperation(TP, 10);
processor.poll();
processor.poll();

// transaction completion.
CompletableFuture<Void> write1 = runtime.scheduleTransactionCompletion(
"transactional-write",
TP,
100L,
(short) 50,
1,
TransactionResult.COMMIT,
DEFAULT_WRITE_TIMEOUT
);
processor.poll();

// Records have been written to the log.
assertEquals(Collections.singletonList(
InMemoryPartitionWriter.LogEntry.control(100, (short) 50, 1, TransactionResult.COMMIT)
), writer.entries(TP));

// The write timeout tasks exist.
assertEquals(1, timer.size());

// Commit the first record.
writer.commit(TP, 1);

// We should still have one pending event and the pending high watermark should be updated.
assertEquals(1, processor.size());
assertEquals(1, runtime.contextOrThrow(TP).highWatermarklistener.lastHighWatermark());

// The write timeout tasks should have not yet been cancelled.
assertEquals(1, timer.size());
timer.taskQueue().forEach(taskEntry -> assertFalse(taskEntry.cancelled()));

// Poll once to process the high watermark update and complete the writes.
processor.poll();

assertEquals(-1, runtime.contextOrThrow(TP).highWatermarklistener.lastHighWatermark());
assertEquals(1, runtime.contextOrThrow(TP).coordinator.lastCommittedOffset());
assertTrue(write1.isDone());

// All timer tasks have been cancelled. TimerTask entries are not removed in MockTimer.
assertEquals(1, timer.size());
timer.taskQueue().forEach(taskEntry -> assertTrue(taskEntry.cancelled()));
}

private static <S extends CoordinatorShard<U>, U> ArgumentMatcher<CoordinatorPlayback<U>> coordinatorMatcher(
CoordinatorRuntime<S, U> runtime,
TopicPartition tp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ public int size() {
return taskQueue.size();
}

public PriorityQueue<TimerTaskEntry> taskQueue() {
return taskQueue;
}

@Override
public void close() throws Exception {}
}