diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java index a17ffa1fdc4d..4ba7d08a4e60 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java @@ -614,6 +614,8 @@ class CoordinatorWriteEvent implements CoordinatorEvent, DeferredEvent { */ final Duration writeTimeout; + private TimerTask writeTimeoutTask = null; + /** * The result of the write operation. It could be null * if an exception is thrown before it is assigned. @@ -751,7 +753,7 @@ public void run() { // Add the response to the deferred queue. if (!future.isDone()) { context.deferredEventQueue.add(offset, this); - timer.add(new TimerTask(writeTimeout.toMillis()) { + writeTimeoutTask = new TimerTask(writeTimeout.toMillis()) { @Override public void run() { if (!future.isDone()) { @@ -762,7 +764,8 @@ public void run() { ); } } - }); + }; + timer.add(writeTimeoutTask); } else { complete(null); } @@ -794,6 +797,11 @@ public void complete(Throwable exception) { if (appendFuture != null) result.appendFuture().completeExceptionally(exception); future.completeExceptionally(exception); } + + if (writeTimeoutTask != null) { + writeTimeoutTask.cancel(); + writeTimeoutTask = null; + } } @Override @@ -980,6 +988,8 @@ class CoordinatorCompleteTransactionEvent implements CoordinatorEvent, DeferredE */ final Duration writeTimeout; + private TimerTask writeTimeoutTask = null; + /** * The future that will be completed with the response * generated by the write operation or an error. @@ -1047,7 +1057,7 @@ public void run() { if (!future.isDone()) { context.deferredEventQueue.add(offset, this); - timer.add(new TimerTask(writeTimeout.toMillis()) { + writeTimeoutTask = new TimerTask(writeTimeout.toMillis()) { @Override public void run() { if (!future.isDone()) { @@ -1059,7 +1069,8 @@ public void run() { ); } } - }); + }; + timer.add(writeTimeoutTask); } else { complete(null); } @@ -1086,6 +1097,11 @@ public void complete(Throwable exception) { } else { future.completeExceptionally(exception); } + + if (writeTimeoutTask != null) { + writeTimeoutTask.cancel(); + writeTimeoutTask = null; + } } @Override diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java index c8a1dc337cba..41a2d2ec7e95 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java @@ -2680,6 +2680,147 @@ public void testHighWatermarkUpdate() { assertTrue(write2.isDone()); } + @Test + public void testWriteEventWriteTimeoutTaskIsCancelledWhenHighWatermarkIsUpdated() { + MockTimer timer = new MockTimer(); + MockPartitionWriter writer = new MockPartitionWriter(); + ManualEventProcessor processor = new ManualEventProcessor(); + + CoordinatorRuntime runtime = + new CoordinatorRuntime.Builder() + .withTime(timer.time()) + .withTimer(timer) + .withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT) + .withLoader(new MockCoordinatorLoader()) + .withEventProcessor(processor) + .withPartitionWriter(writer) + .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) + .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class)) + .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class)) + .build(); + + // Loads the coordinator. Poll once to execute the load operation and once + // to complete the load. + runtime.scheduleLoadOperation(TP, 10); + processor.poll(); + processor.poll(); + + // Write#1. + CompletableFuture write1 = runtime.scheduleWriteOperation("Write#1", TP, DEFAULT_WRITE_TIMEOUT, + state -> new CoordinatorResult<>(Collections.singletonList("record1"), "response1") + ); + processor.poll(); + + // Write#2. + CompletableFuture write2 = runtime.scheduleWriteOperation("Write#2", TP, DEFAULT_WRITE_TIMEOUT, + state -> new CoordinatorResult<>(Collections.singletonList("record2"), "response2") + ); + processor.poll(); + + // Records have been written to the log. + assertEquals(Arrays.asList( + InMemoryPartitionWriter.LogEntry.value("record1"), + InMemoryPartitionWriter.LogEntry.value("record2") + ), writer.entries(TP)); + + // The write timeout tasks exist. + assertEquals(2, timer.size()); + + // Commit the first record. + writer.commit(TP, 1); + + // Commit the second record. + writer.commit(TP, 2); + + // We should still have one pending event and the pending high watermark should be updated. + assertEquals(1, processor.size()); + assertEquals(2, runtime.contextOrThrow(TP).highWatermarklistener.lastHighWatermark()); + + // The write timeout tasks should have not yet been cancelled. + assertEquals(2, timer.size()); + timer.taskQueue().forEach(taskEntry -> assertFalse(taskEntry.cancelled())); + + // Poll once to process the high watermark update and complete the writes. + processor.poll(); + + assertEquals(-1, runtime.contextOrThrow(TP).highWatermarklistener.lastHighWatermark()); + assertEquals(2, runtime.contextOrThrow(TP).coordinator.lastCommittedOffset()); + assertTrue(write1.isDone()); + assertTrue(write2.isDone()); + + // All timer tasks have been cancelled. TimerTask entries are not removed in MockTimer. + assertEquals(2, timer.size()); + timer.taskQueue().forEach(taskEntry -> assertTrue(taskEntry.cancelled())); + } + + @Test + public void testCoordinatorCompleteTransactionEventWriteTimeoutTaskIsCancelledWhenHighWatermarkIsUpdated() { + MockTimer timer = new MockTimer(); + MockPartitionWriter writer = new MockPartitionWriter(); + ManualEventProcessor processor = new ManualEventProcessor(); + + CoordinatorRuntime runtime = + new CoordinatorRuntime.Builder() + .withTime(timer.time()) + .withTimer(timer) + .withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT) + .withLoader(new MockCoordinatorLoader()) + .withEventProcessor(processor) + .withPartitionWriter(writer) + .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) + .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class)) + .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class)) + .build(); + + // Loads the coordinator. Poll once to execute the load operation and once + // to complete the load. + runtime.scheduleLoadOperation(TP, 10); + processor.poll(); + processor.poll(); + + // transaction completion. + CompletableFuture write1 = runtime.scheduleTransactionCompletion( + "transactional-write", + TP, + 100L, + (short) 50, + 1, + TransactionResult.COMMIT, + DEFAULT_WRITE_TIMEOUT + ); + processor.poll(); + + // Records have been written to the log. + assertEquals(Collections.singletonList( + InMemoryPartitionWriter.LogEntry.control(100, (short) 50, 1, TransactionResult.COMMIT) + ), writer.entries(TP)); + + // The write timeout tasks exist. + assertEquals(1, timer.size()); + + // Commit the first record. + writer.commit(TP, 1); + + // We should still have one pending event and the pending high watermark should be updated. + assertEquals(1, processor.size()); + assertEquals(1, runtime.contextOrThrow(TP).highWatermarklistener.lastHighWatermark()); + + // The write timeout tasks should have not yet been cancelled. + assertEquals(1, timer.size()); + timer.taskQueue().forEach(taskEntry -> assertFalse(taskEntry.cancelled())); + + // Poll once to process the high watermark update and complete the writes. + processor.poll(); + + assertEquals(-1, runtime.contextOrThrow(TP).highWatermarklistener.lastHighWatermark()); + assertEquals(1, runtime.contextOrThrow(TP).coordinator.lastCommittedOffset()); + assertTrue(write1.isDone()); + + // All timer tasks have been cancelled. TimerTask entries are not removed in MockTimer. + assertEquals(1, timer.size()); + timer.taskQueue().forEach(taskEntry -> assertTrue(taskEntry.cancelled())); + } + private static , U> ArgumentMatcher> coordinatorMatcher( CoordinatorRuntime runtime, TopicPartition tp diff --git a/server-common/src/test/java/org/apache/kafka/server/util/timer/MockTimer.java b/server-common/src/test/java/org/apache/kafka/server/util/timer/MockTimer.java index 460fd56690f6..8de1b91c32dd 100644 --- a/server-common/src/test/java/org/apache/kafka/server/util/timer/MockTimer.java +++ b/server-common/src/test/java/org/apache/kafka/server/util/timer/MockTimer.java @@ -84,6 +84,10 @@ public int size() { return taskQueue.size(); } + public PriorityQueue taskQueue() { + return taskQueue; + } + @Override public void close() throws Exception {} }