From 5f20750dc14c56538e904c363d27264f02e74d82 Mon Sep 17 00:00:00 2001 From: Lucas Brutschy Date: Thu, 13 Jul 2023 14:33:39 +0200 Subject: [PATCH] Kafka Streams Threading: Exception handling (#13957) Catch any exceptions that escape the processing logic inside TaskExecutors and record them in the TaskManager. Make sure the TaskExecutor survives, but the task is unassigned. Add a method to TaskManager to drain the exceptions. The aim here is that the polling thread will drain the exceptions to be able to execute the uncaught exception handler, abort transactions, etc. Reviewer: Bruno Cadonna --- .../internals/tasks/DefaultTaskExecutor.java | 14 +++++++- .../internals/tasks/DefaultTaskManager.java | 36 +++++++++++++++++++ .../internals/tasks/TaskManager.java | 23 ++++++++++++ .../tasks/DefaultTaskExecutorTest.java | 15 ++++++++ .../tasks/DefaultTaskManagerTest.java | 32 +++++++++++++++++ 5 files changed, 119 insertions(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.java index c03384f4daaf..b59d695d9b0e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.java @@ -54,7 +54,10 @@ public void run() { while (isRunning.get()) { runOnce(time.milliseconds()); } - // TODO: add exception handling + } catch (final StreamsException e) { + handleException(e); + } catch (final Exception e) { + handleException(new StreamsException(e)); } finally { if (currentTask != null) { unassignCurrentTask(); @@ -65,6 +68,15 @@ public void run() { } } + private void handleException(final StreamsException e) { + if (currentTask != null) { + taskManager.setUncaughtException(e, currentTask.id()); + } else { + // If we do not currently have a task assigned and still get an error, this is fatal for the executor thread + throw e; + } + } + private void runOnce(final long nowMs) { final KafkaFutureImpl pauseFuture; if ((pauseFuture = pauseRequested.getAndSet(null)) != null) { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java index 3f97de85cebc..41526356d61c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.ReadOnlyTask; import org.apache.kafka.streams.processor.internals.StreamTask; @@ -55,6 +56,7 @@ public class DefaultTaskManager implements TaskManager { private final Lock tasksLock = new ReentrantLock(); private final List lockedTasks = new ArrayList<>(); + private final Map uncaughtExceptions = new HashMap<>(); private final Map assignedTasks = new HashMap<>(); private final List taskExecutors; @@ -225,6 +227,40 @@ public Set getTasks() { return returnWithTasksLocked(() -> tasks.activeTasks().stream().map(ReadOnlyTask::new).collect(Collectors.toSet())); } + @Override + public void setUncaughtException(final StreamsException exception, final TaskId taskId) { + executeWithTasksLocked(() -> { + + if (!assignedTasks.containsKey(taskId)) { + throw new IllegalArgumentException("An uncaught exception can only be set as long as the task is still assigned"); + } + + if (uncaughtExceptions.containsKey(taskId)) { + throw new IllegalArgumentException("The uncaught exception must be cleared before restarting processing"); + } + + uncaughtExceptions.put(taskId, exception); + }); + + log.info("Set an uncaught exception of type {} for task {}, with error message: {}", + exception.getClass().getName(), + taskId, + exception.getMessage()); + } + + public Map drainUncaughtExceptions() { + final Map returnValue = returnWithTasksLocked(() -> { + final Map result = new HashMap<>(uncaughtExceptions); + uncaughtExceptions.clear(); + return result; + }); + + log.info("Drained {} uncaught exceptions", returnValue.size()); + + return returnValue; + } + + private void executeWithTasksLocked(final Runnable action) { tasksLock.lock(); try { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/TaskManager.java index cfca9f75fe82..707719a00a60 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/TaskManager.java @@ -16,7 +16,9 @@ */ package org.apache.kafka.streams.processor.internals.tasks; +import java.util.Map; import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.ReadOnlyTask; import org.apache.kafka.streams.processor.internals.StreamTask; @@ -98,4 +100,25 @@ public interface TaskManager { * @return set of all managed active tasks */ Set getTasks(); + + /** + * Called whenever an existing task has thrown an uncaught exception. + * + * Setting an uncaught exception for a task prevents it from being reassigned until the + * corresponding exception has been handled in the polling thread. + * + */ + void setUncaughtException(StreamsException exception, TaskId taskId); + + /** + * Returns and clears all uncaught exceptions that were fell through to the processing + * threads and need to be handled in the polling thread. + * + * Called by the polling thread to handle processing exceptions, e.g. to abort + * transactions or shut down the application. + * + * @return A map from task ID to the exception that occurred. + */ + Map drainUncaughtExceptions(); + } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutorTest.java index c2faeef880ba..db33a0d7c677 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutorTest.java @@ -19,6 +19,7 @@ import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.internals.StreamTask; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -104,4 +105,18 @@ public void shouldUnassignTaskWhenRequired() throws Exception { assertTrue(future.isDone(), "Unassign is not completed"); assertEquals(task, future.get(), "Unexpected task was unassigned"); } + + @Test + public void shouldSetUncaughtStreamsException() { + final StreamsException exception = mock(StreamsException.class); + when(task.process(anyLong())).thenThrow(exception); + + taskExecutor.start(); + + verify(taskManager, timeout(VERIFICATION_TIMEOUT)).assignNextTask(taskExecutor); + verify(taskManager, timeout(VERIFICATION_TIMEOUT)).setUncaughtException(exception, task.id()); + verify(taskManager, timeout(VERIFICATION_TIMEOUT)).unassignTask(task, taskExecutor); + assertNull(taskExecutor.currentTask()); + } + } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManagerTest.java index e17a724f3652..16e944d97040 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManagerTest.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.StreamTask; import org.apache.kafka.streams.processor.internals.TasksRegistry; @@ -50,6 +51,7 @@ public class DefaultTaskManagerTest { private final StreamTask task = mock(StreamTask.class); private final TasksRegistry tasks = mock(TasksRegistry.class); private final TaskExecutor taskExecutor = mock(TaskExecutor.class); + private final StreamsException exception = mock(StreamsException.class); private final StreamsConfig config = new StreamsConfig(configProps()); private final TaskManager taskManager = new DefaultTaskManager(time, "TaskManager", tasks, config, @@ -166,6 +168,36 @@ public void shouldNotAssignAnyLockedTask() { assertNull(taskManager.assignNextTask(taskExecutor)); } + @Test + public void shouldNotSetUncaughtExceptionsForUnassignedTasks() { + taskManager.add(Collections.singleton(task)); + + final Exception e = assertThrows(IllegalArgumentException.class, () -> taskManager.setUncaughtException(exception, task.id())); + assertEquals("An uncaught exception can only be set as long as the task is still assigned", e.getMessage()); + } + + @Test + public void shouldNotSetUncaughtExceptionsTwice() { + taskManager.add(Collections.singleton(task)); + when(tasks.activeTasks()).thenReturn(Collections.singleton(task)); + taskManager.assignNextTask(taskExecutor); + taskManager.setUncaughtException(exception, task.id()); + + final Exception e = assertThrows(IllegalArgumentException.class, () -> taskManager.setUncaughtException(exception, task.id())); + assertEquals("The uncaught exception must be cleared before restarting processing", e.getMessage()); + } + + @Test + public void shouldReturnAndClearExceptionsOnDrainExceptions() { + taskManager.add(Collections.singleton(task)); + when(tasks.activeTasks()).thenReturn(Collections.singleton(task)); + taskManager.assignNextTask(taskExecutor); + taskManager.setUncaughtException(exception, task.id()); + + assertEquals(taskManager.drainUncaughtExceptions(), Collections.singletonMap(task.id(), exception)); + assertEquals(taskManager.drainUncaughtExceptions(), Collections.emptyMap()); + } + @Test public void shouldUnassignLockingTask() { final KafkaFutureImpl future = new KafkaFutureImpl<>();