Skip to content

Commit

Permalink
Kafka Streams Threading: Exception handling (#13957)
Browse files Browse the repository at this point in the history
Catch any exceptions that escape the processing logic
inside TaskExecutors and record them in the TaskManager.
Make sure the TaskExecutor survives, but the task is
unassigned. Add a method to TaskManager to drain the
exceptions. The aim here is that the polling thread will
drain the exceptions to be able to execute the
uncaught exception handler, abort transactions, etc.

Reviewer: Bruno Cadonna <cadonna@apache.org>
  • Loading branch information
lucasbru committed Jul 13, 2023
1 parent 8d24716 commit 5f20750
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 1 deletion.
Expand Up @@ -54,7 +54,10 @@ public void run() {
while (isRunning.get()) {
runOnce(time.milliseconds());
}
// TODO: add exception handling
} catch (final StreamsException e) {
handleException(e);
} catch (final Exception e) {
handleException(new StreamsException(e));
} finally {
if (currentTask != null) {
unassignCurrentTask();
Expand All @@ -65,6 +68,15 @@ public void run() {
}
}

private void handleException(final StreamsException e) {
if (currentTask != null) {
taskManager.setUncaughtException(e, currentTask.id());
} else {
// If we do not currently have a task assigned and still get an error, this is fatal for the executor thread
throw e;
}
}

private void runOnce(final long nowMs) {
final KafkaFutureImpl<StreamTask> pauseFuture;
if ((pauseFuture = pauseRequested.getAndSet(null)) != null) {
Expand Down
Expand Up @@ -21,6 +21,7 @@
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.ReadOnlyTask;
import org.apache.kafka.streams.processor.internals.StreamTask;
Expand Down Expand Up @@ -55,6 +56,7 @@ public class DefaultTaskManager implements TaskManager {

private final Lock tasksLock = new ReentrantLock();
private final List<TaskId> lockedTasks = new ArrayList<>();
private final Map<TaskId, StreamsException> uncaughtExceptions = new HashMap<>();
private final Map<TaskId, TaskExecutor> assignedTasks = new HashMap<>();

private final List<TaskExecutor> taskExecutors;
Expand Down Expand Up @@ -225,6 +227,40 @@ public Set<ReadOnlyTask> getTasks() {
return returnWithTasksLocked(() -> tasks.activeTasks().stream().map(ReadOnlyTask::new).collect(Collectors.toSet()));
}

@Override
public void setUncaughtException(final StreamsException exception, final TaskId taskId) {
executeWithTasksLocked(() -> {

if (!assignedTasks.containsKey(taskId)) {
throw new IllegalArgumentException("An uncaught exception can only be set as long as the task is still assigned");
}

if (uncaughtExceptions.containsKey(taskId)) {
throw new IllegalArgumentException("The uncaught exception must be cleared before restarting processing");
}

uncaughtExceptions.put(taskId, exception);
});

log.info("Set an uncaught exception of type {} for task {}, with error message: {}",
exception.getClass().getName(),
taskId,
exception.getMessage());
}

public Map<TaskId, StreamsException> drainUncaughtExceptions() {
final Map<TaskId, StreamsException> returnValue = returnWithTasksLocked(() -> {
final Map<TaskId, StreamsException> result = new HashMap<>(uncaughtExceptions);
uncaughtExceptions.clear();
return result;
});

log.info("Drained {} uncaught exceptions", returnValue.size());

return returnValue;
}


private void executeWithTasksLocked(final Runnable action) {
tasksLock.lock();
try {
Expand Down
Expand Up @@ -16,7 +16,9 @@
*/
package org.apache.kafka.streams.processor.internals.tasks;

import java.util.Map;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.ReadOnlyTask;
import org.apache.kafka.streams.processor.internals.StreamTask;
Expand Down Expand Up @@ -98,4 +100,25 @@ public interface TaskManager {
* @return set of all managed active tasks
*/
Set<ReadOnlyTask> getTasks();

/**
* Called whenever an existing task has thrown an uncaught exception.
*
* Setting an uncaught exception for a task prevents it from being reassigned until the
* corresponding exception has been handled in the polling thread.
*
*/
void setUncaughtException(StreamsException exception, TaskId taskId);

/**
* Returns and clears all uncaught exceptions that were fell through to the processing
* threads and need to be handled in the polling thread.
*
* Called by the polling thread to handle processing exceptions, e.g. to abort
* transactions or shut down the application.
*
* @return A map from task ID to the exception that occurred.
*/
Map<TaskId, StreamsException> drainUncaughtExceptions();

}
Expand Up @@ -19,6 +19,7 @@
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -104,4 +105,18 @@ public void shouldUnassignTaskWhenRequired() throws Exception {
assertTrue(future.isDone(), "Unassign is not completed");
assertEquals(task, future.get(), "Unexpected task was unassigned");
}

@Test
public void shouldSetUncaughtStreamsException() {
final StreamsException exception = mock(StreamsException.class);
when(task.process(anyLong())).thenThrow(exception);

taskExecutor.start();

verify(taskManager, timeout(VERIFICATION_TIMEOUT)).assignNextTask(taskExecutor);
verify(taskManager, timeout(VERIFICATION_TIMEOUT)).setUncaughtException(exception, task.id());
verify(taskManager, timeout(VERIFICATION_TIMEOUT)).unassignTask(task, taskExecutor);
assertNull(taskExecutor.currentTask());
}

}
Expand Up @@ -21,6 +21,7 @@
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.processor.internals.TasksRegistry;
Expand Down Expand Up @@ -50,6 +51,7 @@ public class DefaultTaskManagerTest {
private final StreamTask task = mock(StreamTask.class);
private final TasksRegistry tasks = mock(TasksRegistry.class);
private final TaskExecutor taskExecutor = mock(TaskExecutor.class);
private final StreamsException exception = mock(StreamsException.class);

private final StreamsConfig config = new StreamsConfig(configProps());
private final TaskManager taskManager = new DefaultTaskManager(time, "TaskManager", tasks, config,
Expand Down Expand Up @@ -166,6 +168,36 @@ public void shouldNotAssignAnyLockedTask() {
assertNull(taskManager.assignNextTask(taskExecutor));
}

@Test
public void shouldNotSetUncaughtExceptionsForUnassignedTasks() {
taskManager.add(Collections.singleton(task));

final Exception e = assertThrows(IllegalArgumentException.class, () -> taskManager.setUncaughtException(exception, task.id()));
assertEquals("An uncaught exception can only be set as long as the task is still assigned", e.getMessage());
}

@Test
public void shouldNotSetUncaughtExceptionsTwice() {
taskManager.add(Collections.singleton(task));
when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
taskManager.assignNextTask(taskExecutor);
taskManager.setUncaughtException(exception, task.id());

final Exception e = assertThrows(IllegalArgumentException.class, () -> taskManager.setUncaughtException(exception, task.id()));
assertEquals("The uncaught exception must be cleared before restarting processing", e.getMessage());
}

@Test
public void shouldReturnAndClearExceptionsOnDrainExceptions() {
taskManager.add(Collections.singleton(task));
when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
taskManager.assignNextTask(taskExecutor);
taskManager.setUncaughtException(exception, task.id());

assertEquals(taskManager.drainUncaughtExceptions(), Collections.singletonMap(task.id(), exception));
assertEquals(taskManager.drainUncaughtExceptions(), Collections.emptyMap());
}

@Test
public void shouldUnassignLockingTask() {
final KafkaFutureImpl<StreamTask> future = new KafkaFutureImpl<>();
Expand Down

0 comments on commit 5f20750

Please sign in to comment.