diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.java index 5ad0950fc517..c03384f4daaf 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.java @@ -21,7 +21,6 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.errors.StreamsException; -import org.apache.kafka.streams.processor.internals.DefaultStateUpdater; import org.apache.kafka.streams.processor.internals.ReadOnlyTask; import org.apache.kafka.streams.processor.internals.StreamTask; import org.slf4j.Logger; @@ -45,7 +44,7 @@ public TaskExecutorThread(final String name) { super(name); final String logPrefix = String.format("%s ", name); final LogContext logContext = new LogContext(logPrefix); - log = logContext.logger(DefaultStateUpdater.class); + log = logContext.logger(DefaultTaskExecutor.class); } @Override @@ -102,6 +101,7 @@ private StreamTask unassignCurrentTask() { } private final Time time; + private final String name; private final TaskManager taskManager; private StreamTask currentTask = null; @@ -109,15 +109,22 @@ private StreamTask unassignCurrentTask() { private CountDownLatch shutdownGate; public DefaultTaskExecutor(final TaskManager taskManager, + final String name, final Time time) { this.time = time; + this.name = name; this.taskManager = taskManager; } + @Override + public String name() { + return name; + } + @Override public void start() { if (taskExecutorThread == null) { - taskExecutorThread = new TaskExecutorThread("task-executor"); + taskExecutorThread = new TaskExecutorThread(name); taskExecutorThread.start(); shutdownGate = new CountDownLatch(1); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java new file mode 100644 index 000000000000..3f97de85cebc --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals.tasks; + +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.internals.KafkaFutureImpl; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.ReadOnlyTask; +import org.apache.kafka.streams.processor.internals.StreamTask; +import org.apache.kafka.streams.processor.internals.Task; +import org.apache.kafka.streams.processor.internals.TasksRegistry; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +/** + * An active task could only be in one of the following status: + * + * 1. It's assigned to one of the executors for processing. + * 2. It's locked for committing, removal, other manipulations etc. + * 3. Neither 1 or 2, i.e. it stays idle. This is possible if we do not have enough executors or because those tasks + * are not processable (e.g. because no records fetched) yet. + */ +public class DefaultTaskManager implements TaskManager { + + private final Time time; + private final Logger log; + private final TasksRegistry tasks; + + private final Lock tasksLock = new ReentrantLock(); + private final List lockedTasks = new ArrayList<>(); + private final Map assignedTasks = new HashMap<>(); + + private final List taskExecutors; + + static class DefaultTaskExecutorCreator implements TaskExecutorCreator { + @Override + public TaskExecutor create(final TaskManager taskManager, final String name, final Time time) { + return new DefaultTaskExecutor(taskManager, name, time); + } + } + + public DefaultTaskManager(final Time time, + final String clientId, + final TasksRegistry tasks, + final StreamsConfig config, + final TaskExecutorCreator executorCreator) { + final String logPrefix = String.format("%s ", clientId); + final LogContext logContext = new LogContext(logPrefix); + this.log = logContext.logger(DefaultTaskManager.class); + this.time = time; + this.tasks = tasks; + + final int numExecutors = config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG); + this.taskExecutors = new ArrayList<>(numExecutors); + for (int i = 1; i <= numExecutors; i++) { + final String name = clientId + "-TaskExecutor-" + i; + this.taskExecutors.add(executorCreator.create(this, name, time)); + } + } + + @Override + public StreamTask assignNextTask(final TaskExecutor executor) { + return returnWithTasksLocked(() -> { + if (!taskExecutors.contains(executor)) { + throw new IllegalArgumentException("The requested executor for getting next task to assign is unrecognized"); + } + + // the most naive scheduling algorithm for now: give the next unlocked, unassigned, and processable task + for (final Task task : tasks.activeTasks()) { + if (!assignedTasks.containsKey(task.id()) && + !lockedTasks.contains(task.id()) && + ((StreamTask) task).isProcessable(time.milliseconds())) { + + assignedTasks.put(task.id(), executor); + + log.info("Assigned {} to executor {}", task.id(), executor.name()); + + return (StreamTask) task; + } + } + + return null; + }); + } + + @Override + public void unassignTask(final StreamTask task, final TaskExecutor executor) { + executeWithTasksLocked(() -> { + if (!taskExecutors.contains(executor)) { + throw new IllegalArgumentException("The requested executor for unassign task is unrecognized"); + } + + final TaskExecutor lockedExecutor = assignedTasks.get(task.id()); + if (lockedExecutor == null || lockedExecutor != executor) { + throw new IllegalArgumentException("Task " + task.id() + " is not locked by the executor"); + } + + assignedTasks.remove(task.id()); + + log.info("Unassigned {} from executor {}", task.id(), executor.name()); + }); + } + + @Override + public KafkaFuture lockTasks(final Set taskIds) { + return returnWithTasksLocked(() -> { + lockedTasks.addAll(taskIds); + + final KafkaFutureImpl result = new KafkaFutureImpl<>(); + final Set remainingTaskIds = new ConcurrentSkipListSet<>(taskIds); + + for (final TaskId taskId : taskIds) { + final Task task = tasks.task(taskId); + + if (task == null) { + throw new IllegalArgumentException("Trying to lock task " + taskId + " but it's not owned"); + } + + if (!task.isActive()) { + throw new IllegalArgumentException("The locking task " + taskId + " is not an active task"); + } + + if (assignedTasks.containsKey(taskId)) { + final KafkaFuture future = assignedTasks.get(taskId).unassign(); + future.whenComplete((streamTask, throwable) -> { + if (throwable != null) { + result.completeExceptionally(throwable); + } else { + remainingTaskIds.remove(streamTask.id()); + if (remainingTaskIds.isEmpty()) { + result.complete(null); + } + } + }); + } else { + remainingTaskIds.remove(taskId); + if (remainingTaskIds.isEmpty()) { + result.complete(null); + } + } + } + + return result; + }); + } + + @Override + public KafkaFuture lockAllTasks() { + return returnWithTasksLocked(() -> + lockTasks(tasks.activeTasks().stream().map(Task::id).collect(Collectors.toSet())) + ); + } + + @Override + public void unlockTasks(final Set taskIds) { + executeWithTasksLocked(() -> lockedTasks.removeAll(taskIds)); + } + + @Override + public void unlockAllTasks() { + executeWithTasksLocked(() -> unlockTasks(tasks.activeTasks().stream().map(Task::id).collect(Collectors.toSet()))); + } + + @Override + public void add(final Set tasksToAdd) { + executeWithTasksLocked(() -> { + for (final StreamTask task : tasksToAdd) { + tasks.addTask(task); + } + }); + + log.info("Added tasks {} to the task manager to process", tasksToAdd); + } + + @Override + public void remove(final TaskId taskId) { + executeWithTasksLocked(() -> { + if (assignedTasks.containsKey(taskId)) { + throw new IllegalArgumentException("The task to remove is still assigned to executors"); + } + + if (!lockedTasks.contains(taskId)) { + throw new IllegalArgumentException("The task to remove is not locked yet by the task manager"); + } + + if (!tasks.contains(taskId)) { + throw new IllegalArgumentException("The task to remove is not owned by the task manager"); + } + + tasks.removeTask(tasks.task(taskId)); + }); + + log.info("Removed task {} from the task manager", taskId); + } + + @Override + public Set getTasks() { + return returnWithTasksLocked(() -> tasks.activeTasks().stream().map(ReadOnlyTask::new).collect(Collectors.toSet())); + } + + private void executeWithTasksLocked(final Runnable action) { + tasksLock.lock(); + try { + action.run(); + } finally { + tasksLock.unlock(); + } + } + + private T returnWithTasksLocked(final Supplier action) { + tasksLock.lock(); + try { + return action.get(); + } finally { + tasksLock.unlock(); + } + } +} + diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/TaskExecutor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/TaskExecutor.java index 9e660986a783..04538744a292 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/TaskExecutor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/TaskExecutor.java @@ -25,7 +25,12 @@ public interface TaskExecutor { /** - * Starts the task processor. + * @return ID name string of the task executor. + */ + String name(); + + /** + * Starts the task executor. */ void start(); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/TaskExecutorCreator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/TaskExecutorCreator.java new file mode 100644 index 000000000000..c18eb973792a --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/TaskExecutorCreator.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals.tasks; + +import org.apache.kafka.common.utils.Time; + +public interface TaskExecutorCreator { + + TaskExecutor create(final TaskManager taskManager, String name, Time time); +} diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutorTest.java index 513609366db4..c2faeef880ba 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutorTest.java @@ -45,7 +45,7 @@ public class DefaultTaskExecutorTest { private final StreamTask task = mock(StreamTask.class); private final TaskManager taskManager = mock(TaskManager.class); - private final DefaultTaskExecutor taskExecutor = new DefaultTaskExecutor(taskManager, time); + private final DefaultTaskExecutor taskExecutor = new DefaultTaskExecutor(taskManager, "TaskExecutor", time); @BeforeEach public void setUp() { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManagerTest.java new file mode 100644 index 000000000000..e17a724f3652 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManagerTest.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals.tasks; + +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.internals.KafkaFutureImpl; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.StreamTask; +import org.apache.kafka.streams.processor.internals.TasksRegistry; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.Properties; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.common.utils.Utils.mkObjectProperties; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class DefaultTaskManagerTest { + + private final Time time = new MockTime(1L); + private final StreamTask task = mock(StreamTask.class); + private final TasksRegistry tasks = mock(TasksRegistry.class); + private final TaskExecutor taskExecutor = mock(TaskExecutor.class); + + private final StreamsConfig config = new StreamsConfig(configProps()); + private final TaskManager taskManager = new DefaultTaskManager(time, "TaskManager", tasks, config, + (taskManager, name, time) -> taskExecutor); + + private Properties configProps() { + return mkObjectProperties(mkMap( + mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "appId"), + mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171"), + mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2) + )); + } + + @BeforeEach + public void setUp() { + when(task.id()).thenReturn(new TaskId(0, 0, "A")); + when(task.isProcessable(anyLong())).thenReturn(true); + when(task.isActive()).thenReturn(true); + } + + @Test + public void shouldAddTask() { + taskManager.add(Collections.singleton(task)); + + verify(tasks).addTask(task); + when(tasks.activeTasks()).thenReturn(Collections.singleton(task)); + assertEquals(1, taskManager.getTasks().size()); + } + + @Test + public void shouldAssignTask() { + taskManager.add(Collections.singleton(task)); + when(tasks.activeTasks()).thenReturn(Collections.singleton(task)); + + assertEquals(task, taskManager.assignNextTask(taskExecutor)); + assertNull(taskManager.assignNextTask(taskExecutor)); + } + + @Test + public void shouldUnassignTask() { + taskManager.add(Collections.singleton(task)); + when(tasks.activeTasks()).thenReturn(Collections.singleton(task)); + + assertEquals(task, taskManager.assignNextTask(taskExecutor)); + + taskManager.unassignTask(task, taskExecutor); + assertEquals(task, taskManager.assignNextTask(taskExecutor)); + } + + @Test + public void shouldNotUnassignNotOwnedTask() { + taskManager.add(Collections.singleton(task)); + when(tasks.activeTasks()).thenReturn(Collections.singleton(task)); + + assertEquals(task, taskManager.assignNextTask(taskExecutor)); + + final TaskExecutor anotherExecutor = mock(TaskExecutor.class); + assertThrows(IllegalArgumentException.class, () -> taskManager.unassignTask(task, anotherExecutor)); + } + + @Test + public void shouldNotRemoveUnlockedTask() { + taskManager.add(Collections.singleton(task)); + + assertThrows(IllegalArgumentException.class, () -> taskManager.remove(task.id())); + } + + @Test + public void shouldNotRemoveAssignedTask() { + taskManager.add(Collections.singleton(task)); + taskManager.assignNextTask(taskExecutor); + + assertThrows(IllegalArgumentException.class, () -> taskManager.remove(task.id())); + } + + @Test + public void shouldRemoveTask() { + taskManager.add(Collections.singleton(task)); + when(tasks.activeTasks()).thenReturn(Collections.singleton(task)); + when(tasks.task(task.id())).thenReturn(task); + when(tasks.contains(task.id())).thenReturn(true); + + taskManager.lockTasks(Collections.singleton(task.id())); + taskManager.remove(task.id()); + + verify(tasks).removeTask(task); + reset(tasks); + when(tasks.activeTasks()).thenReturn(Collections.emptySet()); + + assertEquals(0, taskManager.getTasks().size()); + } + + @Test + public void shouldNotAssignLockedTask() { + taskManager.add(Collections.singleton(task)); + when(tasks.activeTasks()).thenReturn(Collections.singleton(task)); + when(tasks.task(task.id())).thenReturn(task); + when(tasks.contains(task.id())).thenReturn(true); + + assertTrue(taskManager.lockTasks(Collections.singleton(task.id())).isDone()); + + assertNull(taskManager.assignNextTask(taskExecutor)); + } + + @Test + public void shouldNotAssignAnyLockedTask() { + taskManager.add(Collections.singleton(task)); + when(tasks.activeTasks()).thenReturn(Collections.singleton(task)); + when(tasks.task(task.id())).thenReturn(task); + when(tasks.contains(task.id())).thenReturn(true); + + assertTrue(taskManager.lockAllTasks().isDone()); + + assertNull(taskManager.assignNextTask(taskExecutor)); + } + + @Test + public void shouldUnassignLockingTask() { + final KafkaFutureImpl future = new KafkaFutureImpl<>(); + + taskManager.add(Collections.singleton(task)); + when(tasks.activeTasks()).thenReturn(Collections.singleton(task)); + when(tasks.task(task.id())).thenReturn(task); + when(tasks.contains(task.id())).thenReturn(true); + when(taskExecutor.unassign()).thenReturn(future); + + assertEquals(task, taskManager.assignNextTask(taskExecutor)); + + final KafkaFuture lockFuture = taskManager.lockAllTasks(); + assertFalse(lockFuture.isDone()); + + verify(taskExecutor).unassign(); + + future.complete(task); + assertTrue(lockFuture.isDone()); + } +}