Skip to content

Commit

Permalink
KAFKA-16663; Cancel write timeout TimerTask on successful event compl…
Browse files Browse the repository at this point in the history
…etion (#15902)

Write events create and add a TimerTask to schedule the timeout operation. The issue is that we pile up the number of timer tasks which are essentially no-ops if replication was successful. They stay in memory for 15 seconds (default write timeout) and as the rate of write increases, the impact on memory usage increases.

Instead, cancel the corresponding write timeout task when the write event is committed to the log. This also applies to complete transaction events.

Reviewers: David Jacot <djacot@confluent.io>
  • Loading branch information
jeffkbkim committed May 13, 2024
1 parent 334d5d5 commit 8a9dd2b
Show file tree
Hide file tree
Showing 3 changed files with 165 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,8 @@ class CoordinatorWriteEvent<T> implements CoordinatorEvent, DeferredEvent {
*/
final Duration writeTimeout;

private TimerTask writeTimeoutTask = null;

/**
* The result of the write operation. It could be null
* if an exception is thrown before it is assigned.
Expand Down Expand Up @@ -751,7 +753,7 @@ public void run() {
// Add the response to the deferred queue.
if (!future.isDone()) {
context.deferredEventQueue.add(offset, this);
timer.add(new TimerTask(writeTimeout.toMillis()) {
writeTimeoutTask = new TimerTask(writeTimeout.toMillis()) {
@Override
public void run() {
if (!future.isDone()) {
Expand All @@ -762,7 +764,8 @@ public void run() {
);
}
}
});
};
timer.add(writeTimeoutTask);
} else {
complete(null);
}
Expand Down Expand Up @@ -794,6 +797,11 @@ public void complete(Throwable exception) {
if (appendFuture != null) result.appendFuture().completeExceptionally(exception);
future.completeExceptionally(exception);
}

if (writeTimeoutTask != null) {
writeTimeoutTask.cancel();
writeTimeoutTask = null;
}
}

@Override
Expand Down Expand Up @@ -980,6 +988,8 @@ class CoordinatorCompleteTransactionEvent implements CoordinatorEvent, DeferredE
*/
final Duration writeTimeout;

private TimerTask writeTimeoutTask = null;

/**
* The future that will be completed with the response
* generated by the write operation or an error.
Expand Down Expand Up @@ -1047,7 +1057,7 @@ public void run() {

if (!future.isDone()) {
context.deferredEventQueue.add(offset, this);
timer.add(new TimerTask(writeTimeout.toMillis()) {
writeTimeoutTask = new TimerTask(writeTimeout.toMillis()) {
@Override
public void run() {
if (!future.isDone()) {
Expand All @@ -1059,7 +1069,8 @@ public void run() {
);
}
}
});
};
timer.add(writeTimeoutTask);
} else {
complete(null);
}
Expand All @@ -1086,6 +1097,11 @@ public void complete(Throwable exception) {
} else {
future.completeExceptionally(exception);
}

if (writeTimeoutTask != null) {
writeTimeoutTask.cancel();
writeTimeoutTask = null;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2680,6 +2680,147 @@ public void testHighWatermarkUpdate() {
assertTrue(write2.isDone());
}

@Test
public void testWriteEventWriteTimeoutTaskIsCancelledWhenHighWatermarkIsUpdated() {
MockTimer timer = new MockTimer();
MockPartitionWriter writer = new MockPartitionWriter();
ManualEventProcessor processor = new ManualEventProcessor();

CoordinatorRuntime<MockCoordinatorShard, String> runtime =
new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
.withTime(timer.time())
.withTimer(timer)
.withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT)
.withLoader(new MockCoordinatorLoader())
.withEventProcessor(processor)
.withPartitionWriter(writer)
.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
.build();

// Loads the coordinator. Poll once to execute the load operation and once
// to complete the load.
runtime.scheduleLoadOperation(TP, 10);
processor.poll();
processor.poll();

// Write#1.
CompletableFuture<String> write1 = runtime.scheduleWriteOperation("Write#1", TP, DEFAULT_WRITE_TIMEOUT,
state -> new CoordinatorResult<>(Collections.singletonList("record1"), "response1")
);
processor.poll();

// Write#2.
CompletableFuture<String> write2 = runtime.scheduleWriteOperation("Write#2", TP, DEFAULT_WRITE_TIMEOUT,
state -> new CoordinatorResult<>(Collections.singletonList("record2"), "response2")
);
processor.poll();

// Records have been written to the log.
assertEquals(Arrays.asList(
InMemoryPartitionWriter.LogEntry.value("record1"),
InMemoryPartitionWriter.LogEntry.value("record2")
), writer.entries(TP));

// The write timeout tasks exist.
assertEquals(2, timer.size());

// Commit the first record.
writer.commit(TP, 1);

// Commit the second record.
writer.commit(TP, 2);

// We should still have one pending event and the pending high watermark should be updated.
assertEquals(1, processor.size());
assertEquals(2, runtime.contextOrThrow(TP).highWatermarklistener.lastHighWatermark());

// The write timeout tasks should have not yet been cancelled.
assertEquals(2, timer.size());
timer.taskQueue().forEach(taskEntry -> assertFalse(taskEntry.cancelled()));

// Poll once to process the high watermark update and complete the writes.
processor.poll();

assertEquals(-1, runtime.contextOrThrow(TP).highWatermarklistener.lastHighWatermark());
assertEquals(2, runtime.contextOrThrow(TP).coordinator.lastCommittedOffset());
assertTrue(write1.isDone());
assertTrue(write2.isDone());

// All timer tasks have been cancelled. TimerTask entries are not removed in MockTimer.
assertEquals(2, timer.size());
timer.taskQueue().forEach(taskEntry -> assertTrue(taskEntry.cancelled()));
}

@Test
public void testCoordinatorCompleteTransactionEventWriteTimeoutTaskIsCancelledWhenHighWatermarkIsUpdated() {
MockTimer timer = new MockTimer();
MockPartitionWriter writer = new MockPartitionWriter();
ManualEventProcessor processor = new ManualEventProcessor();

CoordinatorRuntime<MockCoordinatorShard, String> runtime =
new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
.withTime(timer.time())
.withTimer(timer)
.withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT)
.withLoader(new MockCoordinatorLoader())
.withEventProcessor(processor)
.withPartitionWriter(writer)
.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
.build();

// Loads the coordinator. Poll once to execute the load operation and once
// to complete the load.
runtime.scheduleLoadOperation(TP, 10);
processor.poll();
processor.poll();

// transaction completion.
CompletableFuture<Void> write1 = runtime.scheduleTransactionCompletion(
"transactional-write",
TP,
100L,
(short) 50,
1,
TransactionResult.COMMIT,
DEFAULT_WRITE_TIMEOUT
);
processor.poll();

// Records have been written to the log.
assertEquals(Collections.singletonList(
InMemoryPartitionWriter.LogEntry.control(100, (short) 50, 1, TransactionResult.COMMIT)
), writer.entries(TP));

// The write timeout tasks exist.
assertEquals(1, timer.size());

// Commit the first record.
writer.commit(TP, 1);

// We should still have one pending event and the pending high watermark should be updated.
assertEquals(1, processor.size());
assertEquals(1, runtime.contextOrThrow(TP).highWatermarklistener.lastHighWatermark());

// The write timeout tasks should have not yet been cancelled.
assertEquals(1, timer.size());
timer.taskQueue().forEach(taskEntry -> assertFalse(taskEntry.cancelled()));

// Poll once to process the high watermark update and complete the writes.
processor.poll();

assertEquals(-1, runtime.contextOrThrow(TP).highWatermarklistener.lastHighWatermark());
assertEquals(1, runtime.contextOrThrow(TP).coordinator.lastCommittedOffset());
assertTrue(write1.isDone());

// All timer tasks have been cancelled. TimerTask entries are not removed in MockTimer.
assertEquals(1, timer.size());
timer.taskQueue().forEach(taskEntry -> assertTrue(taskEntry.cancelled()));
}

private static <S extends CoordinatorShard<U>, U> ArgumentMatcher<CoordinatorPlayback<U>> coordinatorMatcher(
CoordinatorRuntime<S, U> runtime,
TopicPartition tp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ public int size() {
return taskQueue.size();
}

public PriorityQueue<TimerTaskEntry> taskQueue() {
return taskQueue;
}

@Override
public void close() throws Exception {}
}

0 comments on commit 8a9dd2b

Please sign in to comment.