Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

package org.apache.samza.task;
package org.apache.samza.container;

import java.util.ArrayDeque;
import java.util.ArrayList;
Expand All @@ -34,13 +34,18 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.samza.SamzaException;
import org.apache.samza.container.SamzaContainerMetrics;
import org.apache.samza.container.TaskInstance;
import org.apache.samza.container.TaskInstanceMetrics;
import org.apache.samza.container.TaskName;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.SystemConsumers;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.task.CoordinatorRequests;
import org.apache.samza.scheduler.EpochTimeScheduler;
import org.apache.samza.task.ReadableCoordinator;
import org.apache.samza.task.TaskCallback;
import org.apache.samza.task.TaskCallbackFactory;
import org.apache.samza.task.TaskCallbackImpl;
import org.apache.samza.task.TaskCallbackListener;
import org.apache.samza.task.TaskCallbackManager;
import org.apache.samza.task.TaskCoordinator;
import org.apache.samza.util.HighResolutionClock;
import org.apache.samza.util.Throttleable;
import org.apache.samza.util.ThrottlingScheduler;
Expand All @@ -50,7 +55,13 @@


/**
* The RunLoop supports multithreading execution of Samza {@link AsyncStreamTask}s.
* The run loop supports both single-threaded and multi-threaded execution models.
* <p>
* If job.container.thread.pool.size &gt; 1 (multi-threaded), operations like commit, window and timer for all tasks within a container
* happens on a thread pool.
* If job.container.thread.pool.size &lt; 1 (single-threaded), operations for all tasks are multiplexed onto one execution thread.
* </p>.
* Note: In both models, process/processAsync for all tasks is invoked on the run loop thread.
*/
public class RunLoop implements Runnable, Throttleable {
private static final Logger log = LoggerFactory.getLogger(RunLoop.class);
Expand Down Expand Up @@ -340,7 +351,7 @@ private enum WorkerOp {
}

/**
* The AsyncTaskWorker encapsulates the states of an {@link AsyncStreamTask}. If the task becomes ready, it
* The AsyncTaskWorker encapsulates the states of an {@link org.apache.samza.task.AsyncStreamTask}. If the task becomes ready, it
* will run the task asynchronously. It runs window and commit in the provided thread pool.
*/
private class AsyncTaskWorker implements TaskCallbackListener {
Expand Down Expand Up @@ -596,28 +607,28 @@ public void run() {
*/
@Override
public void onComplete(final TaskCallback callback) {
long workNanos = clock.nanoTime() - ((TaskCallbackImpl) callback).timeCreatedNs;
long workNanos = clock.nanoTime() - ((TaskCallbackImpl) callback).getTimeCreatedNs();
callbackExecutor.schedule(new Runnable() {
@Override
public void run() {
try {
state.doneProcess();
state.taskMetrics.asyncCallbackCompleted().inc();
TaskCallbackImpl callbackImpl = (TaskCallbackImpl) callback;
containerMetrics.processNs().update(clock.nanoTime() - callbackImpl.timeCreatedNs);
containerMetrics.processNs().update(clock.nanoTime() - callbackImpl.getTimeCreatedNs());
log.trace("Got callback complete for task {}, ssp {}",
callbackImpl.taskName, callbackImpl.envelope.getSystemStreamPartition());
callbackImpl.getTaskName(), callbackImpl.getEnvelope().getSystemStreamPartition());

List<TaskCallbackImpl> callbacksToUpdate = callbackManager.updateCallback(callbackImpl);
for (TaskCallbackImpl callbackToUpdate : callbacksToUpdate) {
IncomingMessageEnvelope envelope = callbackToUpdate.envelope;
IncomingMessageEnvelope envelope = callbackToUpdate.getEnvelope();
log.trace("Update offset for ssp {}, offset {}", envelope.getSystemStreamPartition(), envelope.getOffset());

// update offset
task.offsetManager().update(task.taskName(), envelope.getSystemStreamPartition(), envelope.getOffset());

// update coordinator
coordinatorRequests.update(callbackToUpdate.coordinator);
coordinatorRequests.update(callbackToUpdate.getCoordinator());
}
} catch (Throwable t) {
log.error("Error marking process as complete.", t);
Expand All @@ -641,7 +652,7 @@ public void onFailure(TaskCallback callback, Throwable t) {
abort(t);
// update pending count, but not offset
TaskCallbackImpl callbackImpl = (TaskCallbackImpl) callback;
log.error("Got callback failure for task {}", callbackImpl.taskName, t);
log.error("Got callback failure for task {}", callbackImpl.getTaskName(), t);
} catch (Throwable e) {
log.error("Error marking process as failed.", e);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.samza.SamzaException;
import org.apache.samza.config.TaskConfig;
import org.apache.samza.system.SystemConsumers;
import org.apache.samza.task.RunLoop;
import org.apache.samza.util.HighResolutionClock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,22 @@
public interface SamzaContainerListener {

/**
* Method invoked when the {@link SamzaContainer} state is {@link org.apache.samza.SamzaContainerStatus#NOT_STARTED}
* and is about to transition to {@link org.apache.samza.SamzaContainerStatus#STARTING} to start the initialization sequence.
* Method invoked when the {@link SamzaContainer} state is {@link SamzaContainerStatus#NOT_STARTED}
* and is about to transition to {@link SamzaContainerStatus#STARTING} to start the initialization sequence.
*/
void beforeStart();

/**
* Method invoked after the {@link SamzaContainer} has successfully transitioned to
* the {@link org.apache.samza.SamzaContainerStatus#STARTED} state and is about to start the
* {@link org.apache.samza.task.RunLoop}
* the {@link SamzaContainerStatus#STARTED} state and is about to start the
* {@link RunLoop}
*/
void afterStart();

/**
* Method invoked after the {@link SamzaContainer} has successfully transitioned to
* {@link org.apache.samza.SamzaContainerStatus#STOPPED} state. Details on state transitions can be found in
* {@link org.apache.samza.SamzaContainerStatus}
* {@link SamzaContainerStatus#STOPPED} state. Details on state transitions can be found in
* {@link SamzaContainerStatus}
* <br>
* <b>Note</b>: This will be the last call after completely shutting down the SamzaContainer without any
* exceptions/errors.
Expand All @@ -48,8 +48,8 @@ public interface SamzaContainerListener {

/**
* Method invoked after the {@link SamzaContainer} has transitioned to
* {@link org.apache.samza.SamzaContainerStatus#FAILED} state. Details on state transitions can be found in
* {@link org.apache.samza.SamzaContainerStatus}
* {@link SamzaContainerStatus#FAILED} state. Details on state transitions can be found in
* {@link SamzaContainerStatus}
* <br>
* <b>Note</b>: {@link #afterFailure(Throwable)} is mutually exclusive to {@link #afterStop()}.
* @param t Throwable that caused the container failure.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza;
package org.apache.samza.container;


/**
Expand Down Expand Up @@ -44,12 +44,12 @@ public enum SamzaContainerStatus {

/**
* Indicates that the container is starting all the components required by the
* {@link org.apache.samza.task.RunLoop} for processing
* {@link RunLoop} for processing
*/
STARTING,

/**
* Indicates that the container started the {@link org.apache.samza.task.RunLoop}
* Indicates that the container started the {@link RunLoop}
*/
STARTED,

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@
*/
package org.apache.samza.scheduler;

import org.apache.samza.task.EpochTimeScheduler;


/**
* Delegates to {@link EpochTimeScheduler}. This is useful because it provides a write-only interface for user-facing
* purposes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,14 @@
* under the License.
*/

package org.apache.samza.task;
package org.apache.samza.scheduler;

import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.samza.scheduler.ScheduledCallback;

import static com.google.common.base.Preconditions.checkState;

Expand Down Expand Up @@ -81,7 +80,7 @@ public <K> void deleteTimer(K key) {
}
}

void registerListener(TimerListener listener) {
public void registerListener(TimerListener listener) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this class be in container instead of task as well?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed we have a scheduler package and CallbackSchedulerImpl is the main consumer of this class.
Ended up moving it there. Let me know if you think we consolidate scheduler and container package.

timerListener = listener;

if (!readyTimers.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
* callback is called multiple times, it will throw IllegalStateException
* to the listener.
*/
class TaskCallbackImpl implements TaskCallback, Comparable<TaskCallbackImpl> {
public class TaskCallbackImpl implements TaskCallback, Comparable<TaskCallbackImpl> {
private static final Logger log = LoggerFactory.getLogger(TaskCallbackImpl.class);

final TaskName taskName;
Expand All @@ -60,6 +60,22 @@ public TaskCallbackImpl(TaskCallbackListener listener,
this.timeCreatedNs = timeCreatedNs;
}

public TaskName getTaskName() {
return taskName;
}

public IncomingMessageEnvelope getEnvelope() {
return envelope;
}

public ReadableCoordinator getCoordinator() {
return coordinator;
}

public long getTimeCreatedNs() {
return timeCreatedNs;
}

@Override
public void complete() {
if (scheduledFuture != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
* callback events. If the callback completes with success, onComplete() will be fired.
* If the callback fails, onFailure() will be fired.
*/
interface TaskCallbackListener {
public interface TaskCallbackListener {
void onComplete(TaskCallback callback);
void onFailure(TaskCallback callback, Throwable t);
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
* for the callbacks based on the sequence number, and updates the offsets for checkpointing
* by always moving forward to the latest contiguous callback (uses the high watermark).
*/
class TaskCallbackManager {
public class TaskCallbackManager {

private static final class TaskCallbacks {
private final Queue<TaskCallbackImpl> callbacks = new PriorityQueue<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ import org.apache.samza.table.TableManager
import org.apache.samza.task._
import org.apache.samza.util.ScalaJavaUtil.JavaOptionals
import org.apache.samza.util.{Util, _}
import org.apache.samza.{SamzaContainerStatus, SamzaException}
import org.apache.samza.SamzaException

import scala.collection.JavaConverters._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.samza.config.Config
import org.apache.samza.config.StreamConfig.Config2Stream
import org.apache.samza.context._
import org.apache.samza.job.model.{JobModel, TaskModel}
import org.apache.samza.scheduler.{CallbackSchedulerImpl, ScheduledCallback}
import org.apache.samza.scheduler.{CallbackSchedulerImpl, EpochTimeScheduler, ScheduledCallback}
import org.apache.samza.startpoint.Startpoint
import org.apache.samza.storage.kv.KeyValueStore
import org.apache.samza.storage.TaskStorageManager
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

package org.apache.samza.task;
package org.apache.samza.container;

import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -31,11 +31,6 @@
import org.apache.samza.Partition;
import org.apache.samza.checkpoint.Checkpoint;
import org.apache.samza.checkpoint.OffsetManager;
import org.apache.samza.container.SamzaContainerMetrics;
import org.apache.samza.container.TaskInstance;
import org.apache.samza.container.TaskInstanceExceptionHandler;
import org.apache.samza.container.TaskInstanceMetrics;
import org.apache.samza.container.TaskName;
import org.apache.samza.context.ContainerContext;
import org.apache.samza.context.JobContext;
import org.apache.samza.job.model.TaskModel;
Expand All @@ -47,6 +42,14 @@
import org.apache.samza.system.SystemConsumers;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.system.TestSystemConsumers;
import org.apache.samza.task.AsyncStreamTask;
import org.apache.samza.task.EndOfStreamListenerTask;
import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.TaskCallback;
import org.apache.samza.task.TaskCallbackImpl;
import org.apache.samza.task.TaskCoordinator;
import org.apache.samza.task.TaskInstanceCollector;
import org.apache.samza.task.WindowableTask;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
Expand Down Expand Up @@ -294,7 +297,7 @@ private TestCode buildOutofOrderCallback(final TestTask task) {
return new TestCode() {
@Override
public void run(TaskCallback callback) {
IncomingMessageEnvelope envelope = ((TaskCallbackImpl) callback).envelope;
IncomingMessageEnvelope envelope = ((TaskCallbackImpl) callback).getEnvelope();
if (envelope.equals(envelope0)) {
// process first message will wait till the second one is processed
try {
Expand Down Expand Up @@ -694,7 +697,7 @@ public void testCommitBehaviourWhenAsyncCommitIsEnabled() throws InterruptedExce
final CountDownLatch firstMsgCompletionLatch = new CountDownLatch(1);
final CountDownLatch secondMsgCompletionLatch = new CountDownLatch(1);
task0.callbackHandler = callback -> {
IncomingMessageEnvelope envelope = ((TaskCallbackImpl) callback).envelope;
IncomingMessageEnvelope envelope = ((TaskCallbackImpl) callback).getEnvelope();
try {
if (envelope.equals(firstMsg)) {
firstMsgCompletionLatch.await();
Expand Down Expand Up @@ -745,7 +748,7 @@ public void testProcessBehaviourWhenAsyncCommitIsEnabled() throws InterruptedExc
CountDownLatch commitLatch = new CountDownLatch(1);
task0.commitHandler = callback -> {
TaskCallbackImpl taskCallback = (TaskCallbackImpl) callback;
if (taskCallback.envelope.equals(envelope3)) {
if (taskCallback.getEnvelope().equals(envelope3)) {
try {
commitLatch.await();
} catch (InterruptedException e) {
Expand All @@ -756,7 +759,7 @@ public void testProcessBehaviourWhenAsyncCommitIsEnabled() throws InterruptedExc

task0.callbackHandler = callback -> {
TaskCallbackImpl taskCallback = (TaskCallbackImpl) callback;
if (taskCallback.envelope.equals(envelope0)) {
if (taskCallback.getEnvelope().equals(envelope0)) {
// Both the process call has gone through when the first commit is in progress.
assertEquals(2, containerMetrics.processes().getCount());
assertEquals(0, containerMetrics.commits().getCount());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.samza.SamzaContainerStatus;
import org.apache.samza.container.SamzaContainerStatus;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
import org.apache.samza.task.RunLoop;
import org.apache.samza.container.RunLoop;
import org.apache.samza.container.SamzaContainer;
import org.apache.samza.coordinator.JobCoordinator;
import org.apache.samza.job.model.ContainerModel;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.samza.scheduler;

import org.apache.samza.task.EpochTimeScheduler;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@
* under the License.
*/

package org.apache.samza.task;
package org.apache.samza.scheduler;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.TaskCoordinator;
import org.junit.Test;

import static org.junit.Assert.assertEquals;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.samza.metrics.{Gauge, MetricsReporter, Timer}
import org.apache.samza.storage.{ContainerStorageManager, TaskStorageManager}
import org.apache.samza.system._
import org.apache.samza.task.{StreamTaskFactory, TaskFactory}
import org.apache.samza.{Partition, SamzaContainerStatus}
import org.apache.samza.Partition
import org.junit.Assert._
import org.junit.{Before, Test}
import org.mockito.Matchers.{any, notNull}
Expand Down
Loading