Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka Streams Threading: Exception handling #13957

Merged
merged 4 commits into from Jul 13, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -54,7 +54,10 @@ public void run() {
while (isRunning.get()) {
runOnce(time.milliseconds());
}
// TODO: add exception handling
} catch (final StreamsException e) {
handleException(e);
} catch (final Exception e) {
handleException(new StreamsException(e));
} finally {
if (currentTask != null) {
unassignCurrentTask();
Expand All @@ -65,6 +68,15 @@ public void run() {
}
}

private void handleException(final StreamsException e) {
if (currentTask != null) {
taskManager.setUncaughtException(e, currentTask.id());
} else {
// If we do not currently have a task assigned and still get an error, this is fatal for the executor thread
throw e;
}
}

private void runOnce(final long nowMs) {
final KafkaFutureImpl<StreamTask> pauseFuture;
if ((pauseFuture = pauseRequested.getAndSet(null)) != null) {
Expand Down
Expand Up @@ -21,6 +21,7 @@
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.ReadOnlyTask;
import org.apache.kafka.streams.processor.internals.StreamTask;
Expand Down Expand Up @@ -55,6 +56,7 @@ public class DefaultTaskManager implements TaskManager {

private final Lock tasksLock = new ReentrantLock();
private final List<TaskId> lockedTasks = new ArrayList<>();
private final Map<TaskId, StreamsException> uncaughtExceptions = new HashMap<>();
private final Map<TaskId, TaskExecutor> assignedTasks = new HashMap<>();

private final List<TaskExecutor> taskExecutors;
Expand Down Expand Up @@ -225,6 +227,37 @@ public Set<ReadOnlyTask> getTasks() {
return returnWithTasksLocked(() -> tasks.activeTasks().stream().map(ReadOnlyTask::new).collect(Collectors.toSet()));
}

@Override
public void setUncaughtException(final StreamsException exception, final TaskId taskId) {
executeWithTasksLocked(() -> {

if (!assignedTasks.containsKey(taskId)) {
throw new IllegalArgumentException("An uncaught exception can only be set as long as the task is still assigned");
}

if (uncaughtExceptions.containsKey(taskId)) {
throw new IllegalArgumentException("The uncaught exception must be cleared before restarting processing");
}

uncaughtExceptions.put(taskId, exception);
});

log.info("Set an uncaught exception for task {}", taskId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we log some more information about the exception, like the type and the error message? Just to be able to better reason about the error case.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}

public Map<TaskId, StreamsException> drainUncaughtExceptions() {
final Map<TaskId, StreamsException> returnValue = returnWithTasksLocked(() -> {
final Map<TaskId, StreamsException> result = new HashMap<>(uncaughtExceptions);
uncaughtExceptions.clear();
return result;
});

log.info("Drained {} uncaught exceptions", returnValue.size());

return returnValue;
}


private void executeWithTasksLocked(final Runnable action) {
tasksLock.lock();
try {
Expand Down
Expand Up @@ -16,7 +16,9 @@
*/
package org.apache.kafka.streams.processor.internals.tasks;

import java.util.Map;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.ReadOnlyTask;
import org.apache.kafka.streams.processor.internals.StreamTask;
Expand Down Expand Up @@ -98,4 +100,25 @@ public interface TaskManager {
* @return set of all managed active tasks
*/
Set<ReadOnlyTask> getTasks();

/**
* Called whenever an existing task has thrown an uncaught exception.
*
* Setting an uncaught exception for a task prevents it from being reassigned until the
* corresponding exception has been handled in the polling thread.
Comment on lines +107 to +108
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That part is not yet implemented, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct

*
*/
void setUncaughtException(StreamsException exception, TaskId taskId);

/**
* Returns and clears all uncaught exceptions that were fell through to the processing
* threads and need to be handled in the polling thread.
*
* Called by the polling thread to handle processing exceptions, e.g. to abort
* transactions or shut down the application.
*
* @return A map from task ID to the exception that occurred.
*/
Map<TaskId, StreamsException> drainUncaughtExceptions();

}
Expand Up @@ -19,6 +19,7 @@
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -104,4 +105,19 @@ public void shouldUnassignTaskWhenRequired() throws Exception {
assertTrue(future.isDone(), "Unassign is not completed");
assertEquals(task, future.get(), "Unexpected task was unassigned");
}

@Test
public void shouldSetUncaughtStreamsException() {
final StreamsException exception = mock(StreamsException.class);
when(task.process(anyLong())).thenThrow(exception);

taskExecutor.start();

verify(taskManager, timeout(VERIFICATION_TIMEOUT)).assignNextTask(taskExecutor);

lucasbru marked this conversation as resolved.
Show resolved Hide resolved
verify(taskManager, timeout(VERIFICATION_TIMEOUT)).setUncaughtException(exception, task.id());
verify(taskManager, timeout(VERIFICATION_TIMEOUT)).unassignTask(task, taskExecutor);
assertNull(taskExecutor.currentTask());
}

}
Expand Up @@ -21,6 +21,7 @@
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.processor.internals.TasksRegistry;
Expand Down Expand Up @@ -50,6 +51,7 @@ public class DefaultTaskManagerTest {
private final StreamTask task = mock(StreamTask.class);
private final TasksRegistry tasks = mock(TasksRegistry.class);
private final TaskExecutor taskExecutor = mock(TaskExecutor.class);
private final StreamsException exception = mock(StreamsException.class);

private final StreamsConfig config = new StreamsConfig(configProps());
private final TaskManager taskManager = new DefaultTaskManager(time, "TaskManager", tasks, config,
Expand Down Expand Up @@ -166,6 +168,44 @@ public void shouldNotAssignAnyLockedTask() {
assertNull(taskManager.assignNextTask(taskExecutor));
}

@Test
public void shouldNotSetUncaughtExceptionsForUnassignedTasks() {
taskManager.add(Collections.singleton(task));

assertThrows(IllegalArgumentException.class, () -> taskManager.setUncaughtException(exception, task.id()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please also verify the error message? Otherwise, we actually do not know which IllegalArgumentException was thrown.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}

@Test
public void shouldNotSetUncaughtExceptionsTwice() {
taskManager.add(Collections.singleton(task));
when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
taskManager.assignNextTask(taskExecutor);
taskManager.setUncaughtException(exception, task.id());

assertThrows(IllegalArgumentException.class, () -> taskManager.setUncaughtException(exception, task.id()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please also verify the error message? Otherwise, we actually do not know which IllegalArgumentException was thrown.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}

@Test
public void shouldReturnExceptionsOnDrainExceptions() {
taskManager.add(Collections.singleton(task));
when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
taskManager.assignNextTask(taskExecutor);
taskManager.setUncaughtException(exception, task.id());

assertEquals(taskManager.drainUncaughtExceptions(), Collections.singletonMap(task.id(), exception));
}

@Test
public void shouldClearExceptionsOnDrainExceptions() {
taskManager.add(Collections.singleton(task));
when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
taskManager.assignNextTask(taskExecutor);
taskManager.setUncaughtException(exception, task.id());
taskManager.drainUncaughtExceptions();

assertEquals(taskManager.drainUncaughtExceptions(), Collections.emptyMap());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think, it would be OK to merge these two tests. Just add the assertEquals() of the second test to the end of the first test.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


@Test
public void shouldUnassignLockingTask() {
final KafkaFutureImpl<StreamTask> future = new KafkaFutureImpl<>();
Expand Down