diff --git a/samza-core/src/main/java/org/apache/samza/task/RunLoop.java b/samza-core/src/main/java/org/apache/samza/container/RunLoop.java similarity index 95% rename from samza-core/src/main/java/org/apache/samza/task/RunLoop.java rename to samza-core/src/main/java/org/apache/samza/container/RunLoop.java index 004c31dda1..a509a27abb 100644 --- a/samza-core/src/main/java/org/apache/samza/task/RunLoop.java +++ b/samza-core/src/main/java/org/apache/samza/container/RunLoop.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.samza.task; +package org.apache.samza.container; import java.util.ArrayDeque; import java.util.ArrayList; @@ -34,13 +34,18 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import org.apache.samza.SamzaException; -import org.apache.samza.container.SamzaContainerMetrics; -import org.apache.samza.container.TaskInstance; -import org.apache.samza.container.TaskInstanceMetrics; -import org.apache.samza.container.TaskName; import org.apache.samza.system.IncomingMessageEnvelope; import org.apache.samza.system.SystemConsumers; import org.apache.samza.system.SystemStreamPartition; +import org.apache.samza.task.CoordinatorRequests; +import org.apache.samza.scheduler.EpochTimeScheduler; +import org.apache.samza.task.ReadableCoordinator; +import org.apache.samza.task.TaskCallback; +import org.apache.samza.task.TaskCallbackFactory; +import org.apache.samza.task.TaskCallbackImpl; +import org.apache.samza.task.TaskCallbackListener; +import org.apache.samza.task.TaskCallbackManager; +import org.apache.samza.task.TaskCoordinator; import org.apache.samza.util.HighResolutionClock; import org.apache.samza.util.Throttleable; import org.apache.samza.util.ThrottlingScheduler; @@ -50,7 +55,13 @@ /** - * The RunLoop supports multithreading execution of Samza {@link AsyncStreamTask}s. + * The run loop supports both single-threaded and multi-threaded execution models. + *

+ * If job.container.thread.pool.size > 1 (multi-threaded), operations like commit, window and timer for all tasks within a container + * happens on a thread pool. + * If job.container.thread.pool.size < 1 (single-threaded), operations for all tasks are multiplexed onto one execution thread. + *

. + * Note: In both models, process/processAsync for all tasks is invoked on the run loop thread. */ public class RunLoop implements Runnable, Throttleable { private static final Logger log = LoggerFactory.getLogger(RunLoop.class); @@ -340,7 +351,7 @@ private enum WorkerOp { } /** - * The AsyncTaskWorker encapsulates the states of an {@link AsyncStreamTask}. If the task becomes ready, it + * The AsyncTaskWorker encapsulates the states of an {@link org.apache.samza.task.AsyncStreamTask}. If the task becomes ready, it * will run the task asynchronously. It runs window and commit in the provided thread pool. */ private class AsyncTaskWorker implements TaskCallbackListener { @@ -596,7 +607,7 @@ public void run() { */ @Override public void onComplete(final TaskCallback callback) { - long workNanos = clock.nanoTime() - ((TaskCallbackImpl) callback).timeCreatedNs; + long workNanos = clock.nanoTime() - ((TaskCallbackImpl) callback).getTimeCreatedNs(); callbackExecutor.schedule(new Runnable() { @Override public void run() { @@ -604,20 +615,20 @@ public void run() { state.doneProcess(); state.taskMetrics.asyncCallbackCompleted().inc(); TaskCallbackImpl callbackImpl = (TaskCallbackImpl) callback; - containerMetrics.processNs().update(clock.nanoTime() - callbackImpl.timeCreatedNs); + containerMetrics.processNs().update(clock.nanoTime() - callbackImpl.getTimeCreatedNs()); log.trace("Got callback complete for task {}, ssp {}", - callbackImpl.taskName, callbackImpl.envelope.getSystemStreamPartition()); + callbackImpl.getTaskName(), callbackImpl.getEnvelope().getSystemStreamPartition()); List callbacksToUpdate = callbackManager.updateCallback(callbackImpl); for (TaskCallbackImpl callbackToUpdate : callbacksToUpdate) { - IncomingMessageEnvelope envelope = callbackToUpdate.envelope; + IncomingMessageEnvelope envelope = callbackToUpdate.getEnvelope(); log.trace("Update offset for ssp {}, offset {}", envelope.getSystemStreamPartition(), envelope.getOffset()); // update offset task.offsetManager().update(task.taskName(), envelope.getSystemStreamPartition(), envelope.getOffset()); // update coordinator - coordinatorRequests.update(callbackToUpdate.coordinator); + coordinatorRequests.update(callbackToUpdate.getCoordinator()); } } catch (Throwable t) { log.error("Error marking process as complete.", t); @@ -641,7 +652,7 @@ public void onFailure(TaskCallback callback, Throwable t) { abort(t); // update pending count, but not offset TaskCallbackImpl callbackImpl = (TaskCallbackImpl) callback; - log.error("Got callback failure for task {}", callbackImpl.taskName, t); + log.error("Got callback failure for task {}", callbackImpl.getTaskName(), t); } catch (Throwable e) { log.error("Error marking process as failed.", e); } finally { diff --git a/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java b/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java index d1280fade2..b50a270ebd 100644 --- a/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java +++ b/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java @@ -22,7 +22,6 @@ import org.apache.samza.SamzaException; import org.apache.samza.config.TaskConfig; import org.apache.samza.system.SystemConsumers; -import org.apache.samza.task.RunLoop; import org.apache.samza.util.HighResolutionClock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/samza-core/src/main/java/org/apache/samza/container/SamzaContainerListener.java b/samza-core/src/main/java/org/apache/samza/container/SamzaContainerListener.java index 87321b7648..db532c5be8 100644 --- a/samza-core/src/main/java/org/apache/samza/container/SamzaContainerListener.java +++ b/samza-core/src/main/java/org/apache/samza/container/SamzaContainerListener.java @@ -24,22 +24,22 @@ public interface SamzaContainerListener { /** - * Method invoked when the {@link SamzaContainer} state is {@link org.apache.samza.SamzaContainerStatus#NOT_STARTED} - * and is about to transition to {@link org.apache.samza.SamzaContainerStatus#STARTING} to start the initialization sequence. + * Method invoked when the {@link SamzaContainer} state is {@link SamzaContainerStatus#NOT_STARTED} + * and is about to transition to {@link SamzaContainerStatus#STARTING} to start the initialization sequence. */ void beforeStart(); /** * Method invoked after the {@link SamzaContainer} has successfully transitioned to - * the {@link org.apache.samza.SamzaContainerStatus#STARTED} state and is about to start the - * {@link org.apache.samza.task.RunLoop} + * the {@link SamzaContainerStatus#STARTED} state and is about to start the + * {@link RunLoop} */ void afterStart(); /** * Method invoked after the {@link SamzaContainer} has successfully transitioned to - * {@link org.apache.samza.SamzaContainerStatus#STOPPED} state. Details on state transitions can be found in - * {@link org.apache.samza.SamzaContainerStatus} + * {@link SamzaContainerStatus#STOPPED} state. Details on state transitions can be found in + * {@link SamzaContainerStatus} *
* Note: This will be the last call after completely shutting down the SamzaContainer without any * exceptions/errors. @@ -48,8 +48,8 @@ public interface SamzaContainerListener { /** * Method invoked after the {@link SamzaContainer} has transitioned to - * {@link org.apache.samza.SamzaContainerStatus#FAILED} state. Details on state transitions can be found in - * {@link org.apache.samza.SamzaContainerStatus} + * {@link SamzaContainerStatus#FAILED} state. Details on state transitions can be found in + * {@link SamzaContainerStatus} *
* Note: {@link #afterFailure(Throwable)} is mutually exclusive to {@link #afterStop()}. * @param t Throwable that caused the container failure. diff --git a/samza-core/src/main/java/org/apache/samza/SamzaContainerStatus.java b/samza-core/src/main/java/org/apache/samza/container/SamzaContainerStatus.java similarity index 93% rename from samza-core/src/main/java/org/apache/samza/SamzaContainerStatus.java rename to samza-core/src/main/java/org/apache/samza/container/SamzaContainerStatus.java index 3b18138835..878054ea8f 100644 --- a/samza-core/src/main/java/org/apache/samza/SamzaContainerStatus.java +++ b/samza-core/src/main/java/org/apache/samza/container/SamzaContainerStatus.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.samza; +package org.apache.samza.container; /** @@ -44,12 +44,12 @@ public enum SamzaContainerStatus { /** * Indicates that the container is starting all the components required by the - * {@link org.apache.samza.task.RunLoop} for processing + * {@link RunLoop} for processing */ STARTING, /** - * Indicates that the container started the {@link org.apache.samza.task.RunLoop} + * Indicates that the container started the {@link RunLoop} */ STARTED, diff --git a/samza-core/src/main/java/org/apache/samza/scheduler/CallbackSchedulerImpl.java b/samza-core/src/main/java/org/apache/samza/scheduler/CallbackSchedulerImpl.java index 1f87c7c664..fe8e22745e 100644 --- a/samza-core/src/main/java/org/apache/samza/scheduler/CallbackSchedulerImpl.java +++ b/samza-core/src/main/java/org/apache/samza/scheduler/CallbackSchedulerImpl.java @@ -18,9 +18,6 @@ */ package org.apache.samza.scheduler; -import org.apache.samza.task.EpochTimeScheduler; - - /** * Delegates to {@link EpochTimeScheduler}. This is useful because it provides a write-only interface for user-facing * purposes. diff --git a/samza-core/src/main/java/org/apache/samza/task/EpochTimeScheduler.java b/samza-core/src/main/java/org/apache/samza/scheduler/EpochTimeScheduler.java similarity index 97% rename from samza-core/src/main/java/org/apache/samza/task/EpochTimeScheduler.java rename to samza-core/src/main/java/org/apache/samza/scheduler/EpochTimeScheduler.java index 820a6aed6b..ddc5b29816 100644 --- a/samza-core/src/main/java/org/apache/samza/task/EpochTimeScheduler.java +++ b/samza-core/src/main/java/org/apache/samza/scheduler/EpochTimeScheduler.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.samza.task; +package org.apache.samza.scheduler; import java.util.Map; import java.util.TreeMap; @@ -25,7 +25,6 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; -import org.apache.samza.scheduler.ScheduledCallback; import static com.google.common.base.Preconditions.checkState; @@ -81,7 +80,7 @@ public void deleteTimer(K key) { } } - void registerListener(TimerListener listener) { + public void registerListener(TimerListener listener) { timerListener = listener; if (!readyTimers.isEmpty()) { diff --git a/samza-core/src/main/java/org/apache/samza/task/TaskCallbackImpl.java b/samza-core/src/main/java/org/apache/samza/task/TaskCallbackImpl.java index 5c178aa6e4..0ba20324d1 100644 --- a/samza-core/src/main/java/org/apache/samza/task/TaskCallbackImpl.java +++ b/samza-core/src/main/java/org/apache/samza/task/TaskCallbackImpl.java @@ -34,7 +34,7 @@ * callback is called multiple times, it will throw IllegalStateException * to the listener. */ -class TaskCallbackImpl implements TaskCallback, Comparable { +public class TaskCallbackImpl implements TaskCallback, Comparable { private static final Logger log = LoggerFactory.getLogger(TaskCallbackImpl.class); final TaskName taskName; @@ -60,6 +60,22 @@ public TaskCallbackImpl(TaskCallbackListener listener, this.timeCreatedNs = timeCreatedNs; } + public TaskName getTaskName() { + return taskName; + } + + public IncomingMessageEnvelope getEnvelope() { + return envelope; + } + + public ReadableCoordinator getCoordinator() { + return coordinator; + } + + public long getTimeCreatedNs() { + return timeCreatedNs; + } + @Override public void complete() { if (scheduledFuture != null) { diff --git a/samza-core/src/main/java/org/apache/samza/task/TaskCallbackListener.java b/samza-core/src/main/java/org/apache/samza/task/TaskCallbackListener.java index de4ee58bd6..77dad9858f 100644 --- a/samza-core/src/main/java/org/apache/samza/task/TaskCallbackListener.java +++ b/samza-core/src/main/java/org/apache/samza/task/TaskCallbackListener.java @@ -24,7 +24,7 @@ * callback events. If the callback completes with success, onComplete() will be fired. * If the callback fails, onFailure() will be fired. */ -interface TaskCallbackListener { +public interface TaskCallbackListener { void onComplete(TaskCallback callback); void onFailure(TaskCallback callback, Throwable t); } \ No newline at end of file diff --git a/samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java b/samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java index 9d87e6e642..2d49de7402 100644 --- a/samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java +++ b/samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java @@ -41,7 +41,7 @@ * for the callbacks based on the sequence number, and updates the offsets for checkpointing * by always moving forward to the latest contiguous callback (uses the high watermark). */ -class TaskCallbackManager { +public class TaskCallbackManager { private static final class TaskCallbacks { private final Queue callbacks = new PriorityQueue<>(); diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index 1b356f6611..f1d9d1c81d 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -55,7 +55,7 @@ import org.apache.samza.table.TableManager import org.apache.samza.task._ import org.apache.samza.util.ScalaJavaUtil.JavaOptionals import org.apache.samza.util.{Util, _} -import org.apache.samza.{SamzaContainerStatus, SamzaException} +import org.apache.samza.SamzaException import scala.collection.JavaConverters._ diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala index ae6db2285b..884244e07a 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala @@ -30,7 +30,7 @@ import org.apache.samza.config.Config import org.apache.samza.config.StreamConfig.Config2Stream import org.apache.samza.context._ import org.apache.samza.job.model.{JobModel, TaskModel} -import org.apache.samza.scheduler.{CallbackSchedulerImpl, ScheduledCallback} +import org.apache.samza.scheduler.{CallbackSchedulerImpl, EpochTimeScheduler, ScheduledCallback} import org.apache.samza.startpoint.Startpoint import org.apache.samza.storage.kv.KeyValueStore import org.apache.samza.storage.TaskStorageManager diff --git a/samza-core/src/test/java/org/apache/samza/task/TestRunLoop.java b/samza-core/src/test/java/org/apache/samza/container/TestRunLoop.java similarity index 98% rename from samza-core/src/test/java/org/apache/samza/task/TestRunLoop.java rename to samza-core/src/test/java/org/apache/samza/container/TestRunLoop.java index 1aa43e5a23..41e55edf03 100644 --- a/samza-core/src/test/java/org/apache/samza/task/TestRunLoop.java +++ b/samza-core/src/test/java/org/apache/samza/container/TestRunLoop.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.samza.task; +package org.apache.samza.container; import java.util.ArrayList; import java.util.Collections; @@ -31,11 +31,6 @@ import org.apache.samza.Partition; import org.apache.samza.checkpoint.Checkpoint; import org.apache.samza.checkpoint.OffsetManager; -import org.apache.samza.container.SamzaContainerMetrics; -import org.apache.samza.container.TaskInstance; -import org.apache.samza.container.TaskInstanceExceptionHandler; -import org.apache.samza.container.TaskInstanceMetrics; -import org.apache.samza.container.TaskName; import org.apache.samza.context.ContainerContext; import org.apache.samza.context.JobContext; import org.apache.samza.job.model.TaskModel; @@ -47,6 +42,14 @@ import org.apache.samza.system.SystemConsumers; import org.apache.samza.system.SystemStreamPartition; import org.apache.samza.system.TestSystemConsumers; +import org.apache.samza.task.AsyncStreamTask; +import org.apache.samza.task.EndOfStreamListenerTask; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.TaskCallback; +import org.apache.samza.task.TaskCallbackImpl; +import org.apache.samza.task.TaskCoordinator; +import org.apache.samza.task.TaskInstanceCollector; +import org.apache.samza.task.WindowableTask; import org.junit.Rule; import org.junit.Test; import org.junit.rules.Timeout; @@ -294,7 +297,7 @@ private TestCode buildOutofOrderCallback(final TestTask task) { return new TestCode() { @Override public void run(TaskCallback callback) { - IncomingMessageEnvelope envelope = ((TaskCallbackImpl) callback).envelope; + IncomingMessageEnvelope envelope = ((TaskCallbackImpl) callback).getEnvelope(); if (envelope.equals(envelope0)) { // process first message will wait till the second one is processed try { @@ -694,7 +697,7 @@ public void testCommitBehaviourWhenAsyncCommitIsEnabled() throws InterruptedExce final CountDownLatch firstMsgCompletionLatch = new CountDownLatch(1); final CountDownLatch secondMsgCompletionLatch = new CountDownLatch(1); task0.callbackHandler = callback -> { - IncomingMessageEnvelope envelope = ((TaskCallbackImpl) callback).envelope; + IncomingMessageEnvelope envelope = ((TaskCallbackImpl) callback).getEnvelope(); try { if (envelope.equals(firstMsg)) { firstMsgCompletionLatch.await(); @@ -745,7 +748,7 @@ public void testProcessBehaviourWhenAsyncCommitIsEnabled() throws InterruptedExc CountDownLatch commitLatch = new CountDownLatch(1); task0.commitHandler = callback -> { TaskCallbackImpl taskCallback = (TaskCallbackImpl) callback; - if (taskCallback.envelope.equals(envelope3)) { + if (taskCallback.getEnvelope().equals(envelope3)) { try { commitLatch.await(); } catch (InterruptedException e) { @@ -756,7 +759,7 @@ public void testProcessBehaviourWhenAsyncCommitIsEnabled() throws InterruptedExc task0.callbackHandler = callback -> { TaskCallbackImpl taskCallback = (TaskCallbackImpl) callback; - if (taskCallback.envelope.equals(envelope0)) { + if (taskCallback.getEnvelope().equals(envelope0)) { // Both the process call has gone through when the first commit is in progress. assertEquals(2, containerMetrics.processes().getCount()); assertEquals(0, containerMetrics.commits().getCount()); diff --git a/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java b/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java index 75c39acc3b..d5bce16b1d 100644 --- a/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java +++ b/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java @@ -27,11 +27,11 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; -import org.apache.samza.SamzaContainerStatus; +import org.apache.samza.container.SamzaContainerStatus; import org.apache.samza.SamzaException; import org.apache.samza.config.Config; import org.apache.samza.config.MapConfig; -import org.apache.samza.task.RunLoop; +import org.apache.samza.container.RunLoop; import org.apache.samza.container.SamzaContainer; import org.apache.samza.coordinator.JobCoordinator; import org.apache.samza.job.model.ContainerModel; diff --git a/samza-core/src/test/java/org/apache/samza/scheduler/TestCallbackSchedulerImpl.java b/samza-core/src/test/java/org/apache/samza/scheduler/TestCallbackSchedulerImpl.java index 649a4e48e7..58ef1c1aec 100644 --- a/samza-core/src/test/java/org/apache/samza/scheduler/TestCallbackSchedulerImpl.java +++ b/samza-core/src/test/java/org/apache/samza/scheduler/TestCallbackSchedulerImpl.java @@ -18,7 +18,6 @@ */ package org.apache.samza.scheduler; -import org.apache.samza.task.EpochTimeScheduler; import org.junit.Before; import org.junit.Test; import org.mockito.Mock; diff --git a/samza-core/src/test/java/org/apache/samza/task/TestEpochTimeScheduler.java b/samza-core/src/test/java/org/apache/samza/scheduler/TestEpochTimeScheduler.java similarity index 97% rename from samza-core/src/test/java/org/apache/samza/task/TestEpochTimeScheduler.java rename to samza-core/src/test/java/org/apache/samza/scheduler/TestEpochTimeScheduler.java index da137e68b9..5db908c45e 100644 --- a/samza-core/src/test/java/org/apache/samza/task/TestEpochTimeScheduler.java +++ b/samza-core/src/test/java/org/apache/samza/scheduler/TestEpochTimeScheduler.java @@ -17,12 +17,14 @@ * under the License. */ -package org.apache.samza.task; +package org.apache.samza.scheduler; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.TaskCoordinator; import org.junit.Test; import static org.junit.Assert.assertEquals; diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala index e75fe544ad..0d267a124e 100644 --- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala +++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala @@ -31,7 +31,7 @@ import org.apache.samza.metrics.{Gauge, MetricsReporter, Timer} import org.apache.samza.storage.{ContainerStorageManager, TaskStorageManager} import org.apache.samza.system._ import org.apache.samza.task.{StreamTaskFactory, TaskFactory} -import org.apache.samza.{Partition, SamzaContainerStatus} +import org.apache.samza.Partition import org.junit.Assert._ import org.junit.{Before, Test} import org.mockito.Matchers.{any, notNull} diff --git a/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala b/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala index 5979bc4d71..5ab76355d6 100644 --- a/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala +++ b/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala @@ -29,7 +29,7 @@ import org.apache.samza.serializers.SerdeManager import org.apache.samza.storage.ContainerStorageManager import org.apache.samza.system._ import org.apache.samza.system.chooser.RoundRobinChooser -import org.apache.samza.task.{RunLoop, StreamTask, TaskInstanceCollector} +import org.apache.samza.task.{StreamTask, TaskInstanceCollector} import org.mockito.Mockito