diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.java index c03384f4daaf..b59d695d9b0e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.java @@ -54,7 +54,10 @@ public void run() { while (isRunning.get()) { runOnce(time.milliseconds()); } - // TODO: add exception handling + } catch (final StreamsException e) { + handleException(e); + } catch (final Exception e) { + handleException(new StreamsException(e)); } finally { if (currentTask != null) { unassignCurrentTask(); @@ -65,6 +68,15 @@ public void run() { } } + private void handleException(final StreamsException e) { + if (currentTask != null) { + taskManager.setUncaughtException(e, currentTask.id()); + } else { + // If we do not currently have a task assigned and still get an error, this is fatal for the executor thread + throw e; + } + } + private void runOnce(final long nowMs) { final KafkaFutureImpl pauseFuture; if ((pauseFuture = pauseRequested.getAndSet(null)) != null) { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java index 3f97de85cebc..41526356d61c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.ReadOnlyTask; import org.apache.kafka.streams.processor.internals.StreamTask; @@ -55,6 +56,7 @@ public class DefaultTaskManager implements TaskManager { private final Lock tasksLock = new ReentrantLock(); private final List lockedTasks = new ArrayList<>(); + private final Map uncaughtExceptions = new HashMap<>(); private final Map assignedTasks = new HashMap<>(); private final List taskExecutors; @@ -225,6 +227,40 @@ public Set getTasks() { return returnWithTasksLocked(() -> tasks.activeTasks().stream().map(ReadOnlyTask::new).collect(Collectors.toSet())); } + @Override + public void setUncaughtException(final StreamsException exception, final TaskId taskId) { + executeWithTasksLocked(() -> { + + if (!assignedTasks.containsKey(taskId)) { + throw new IllegalArgumentException("An uncaught exception can only be set as long as the task is still assigned"); + } + + if (uncaughtExceptions.containsKey(taskId)) { + throw new IllegalArgumentException("The uncaught exception must be cleared before restarting processing"); + } + + uncaughtExceptions.put(taskId, exception); + }); + + log.info("Set an uncaught exception of type {} for task {}, with error message: {}", + exception.getClass().getName(), + taskId, + exception.getMessage()); + } + + public Map drainUncaughtExceptions() { + final Map returnValue = returnWithTasksLocked(() -> { + final Map result = new HashMap<>(uncaughtExceptions); + uncaughtExceptions.clear(); + return result; + }); + + log.info("Drained {} uncaught exceptions", returnValue.size()); + + return returnValue; + } + + private void executeWithTasksLocked(final Runnable action) { tasksLock.lock(); try { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/TaskManager.java index cfca9f75fe82..707719a00a60 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/TaskManager.java @@ -16,7 +16,9 @@ */ package org.apache.kafka.streams.processor.internals.tasks; +import java.util.Map; import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.ReadOnlyTask; import org.apache.kafka.streams.processor.internals.StreamTask; @@ -98,4 +100,25 @@ public interface TaskManager { * @return set of all managed active tasks */ Set getTasks(); + + /** + * Called whenever an existing task has thrown an uncaught exception. + * + * Setting an uncaught exception for a task prevents it from being reassigned until the + * corresponding exception has been handled in the polling thread. + * + */ + void setUncaughtException(StreamsException exception, TaskId taskId); + + /** + * Returns and clears all uncaught exceptions that were fell through to the processing + * threads and need to be handled in the polling thread. + * + * Called by the polling thread to handle processing exceptions, e.g. to abort + * transactions or shut down the application. + * + * @return A map from task ID to the exception that occurred. + */ + Map drainUncaughtExceptions(); + } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutorTest.java index c2faeef880ba..db33a0d7c677 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutorTest.java @@ -19,6 +19,7 @@ import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.internals.StreamTask; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -104,4 +105,18 @@ public void shouldUnassignTaskWhenRequired() throws Exception { assertTrue(future.isDone(), "Unassign is not completed"); assertEquals(task, future.get(), "Unexpected task was unassigned"); } + + @Test + public void shouldSetUncaughtStreamsException() { + final StreamsException exception = mock(StreamsException.class); + when(task.process(anyLong())).thenThrow(exception); + + taskExecutor.start(); + + verify(taskManager, timeout(VERIFICATION_TIMEOUT)).assignNextTask(taskExecutor); + verify(taskManager, timeout(VERIFICATION_TIMEOUT)).setUncaughtException(exception, task.id()); + verify(taskManager, timeout(VERIFICATION_TIMEOUT)).unassignTask(task, taskExecutor); + assertNull(taskExecutor.currentTask()); + } + } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManagerTest.java index e17a724f3652..16e944d97040 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManagerTest.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.StreamTask; import org.apache.kafka.streams.processor.internals.TasksRegistry; @@ -50,6 +51,7 @@ public class DefaultTaskManagerTest { private final StreamTask task = mock(StreamTask.class); private final TasksRegistry tasks = mock(TasksRegistry.class); private final TaskExecutor taskExecutor = mock(TaskExecutor.class); + private final StreamsException exception = mock(StreamsException.class); private final StreamsConfig config = new StreamsConfig(configProps()); private final TaskManager taskManager = new DefaultTaskManager(time, "TaskManager", tasks, config, @@ -166,6 +168,36 @@ public void shouldNotAssignAnyLockedTask() { assertNull(taskManager.assignNextTask(taskExecutor)); } + @Test + public void shouldNotSetUncaughtExceptionsForUnassignedTasks() { + taskManager.add(Collections.singleton(task)); + + final Exception e = assertThrows(IllegalArgumentException.class, () -> taskManager.setUncaughtException(exception, task.id())); + assertEquals("An uncaught exception can only be set as long as the task is still assigned", e.getMessage()); + } + + @Test + public void shouldNotSetUncaughtExceptionsTwice() { + taskManager.add(Collections.singleton(task)); + when(tasks.activeTasks()).thenReturn(Collections.singleton(task)); + taskManager.assignNextTask(taskExecutor); + taskManager.setUncaughtException(exception, task.id()); + + final Exception e = assertThrows(IllegalArgumentException.class, () -> taskManager.setUncaughtException(exception, task.id())); + assertEquals("The uncaught exception must be cleared before restarting processing", e.getMessage()); + } + + @Test + public void shouldReturnAndClearExceptionsOnDrainExceptions() { + taskManager.add(Collections.singleton(task)); + when(tasks.activeTasks()).thenReturn(Collections.singleton(task)); + taskManager.assignNextTask(taskExecutor); + taskManager.setUncaughtException(exception, task.id()); + + assertEquals(taskManager.drainUncaughtExceptions(), Collections.singletonMap(task.id(), exception)); + assertEquals(taskManager.drainUncaughtExceptions(), Collections.emptyMap()); + } + @Test public void shouldUnassignLockingTask() { final KafkaFutureImpl future = new KafkaFutureImpl<>();