Skip to content

Commit

Permalink
[FLINK-4973] Let LatencyMarksEmitter use StreamTask's ProcessingTimeS…
Browse files Browse the repository at this point in the history
…ervice

The LatencyMarksEmitter class uses now the StreamTask's ProcessingTimeService to schedule
latency mark emission. For that the ProcessingTimeService was extended to have the method
scheduleAtFixedRate to schedule repeated tasks. The latency mark emission is such a repeated
task.

This closes #3008.
  • Loading branch information
tillrohrmann authored and StephanEwen committed Dec 20, 2016
1 parent a26b0f0 commit ab2125b
Show file tree
Hide file tree
Showing 8 changed files with 474 additions and 123 deletions.
Expand Up @@ -23,11 +23,10 @@
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

/**
* {@link StreamOperator} for streaming sources.
Expand Down Expand Up @@ -62,8 +61,12 @@ public void run(final Object lockingObject, final Output<StreamRecord<OUT>> coll

LatencyMarksEmitter latencyEmitter = null;
if(getExecutionConfig().isLatencyTrackingEnabled()) {
latencyEmitter = new LatencyMarksEmitter<>(lockingObject, collector, getExecutionConfig().getLatencyTrackingInterval(),
getOperatorConfig().getVertexID(), getRuntimeContext().getIndexOfThisSubtask());
latencyEmitter = new LatencyMarksEmitter<>(
getProcessingTimeService(),
collector,
getExecutionConfig().getLatencyTrackingInterval(),
getOperatorConfig().getVertexID(),
getRuntimeContext().getIndexOfThisSubtask());
}

final long watermarkInterval = getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval();
Expand Down Expand Up @@ -121,28 +124,35 @@ protected boolean isCanceledOrStopped() {
}

private static class LatencyMarksEmitter<OUT> {
private final ScheduledExecutorService scheduleExecutor;
private final ScheduledFuture<?> latencyMarkTimer;

public LatencyMarksEmitter(final Object lockingObject, final Output<StreamRecord<OUT>> output, long latencyTrackingInterval, final int vertexID, final int subtaskIndex) {
this.scheduleExecutor = Executors.newScheduledThreadPool(1);
this.latencyMarkTimer = scheduleExecutor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
synchronized (lockingObject) {
output.emitLatencyMarker(new LatencyMarker(System.currentTimeMillis(), vertexID, subtaskIndex));
public LatencyMarksEmitter(
final ProcessingTimeService processingTimeService,
final Output<StreamRecord<OUT>> output,
long latencyTrackingInterval,
final int vertexID,
final int subtaskIndex) {

latencyMarkTimer = processingTimeService.scheduleAtFixedRate(
new ProcessingTimeCallback() {
@Override
public void onProcessingTime(long timestamp) throws Exception {
try {
// ProcessingTimeService callbacks are executed under the checkpointing lock
output.emitLatencyMarker(new LatencyMarker(timestamp, vertexID, subtaskIndex));
} catch (Throwable t) {
// we catch the Throwables here so that we don't trigger the processing
// timer services async exception handler
LOG.warn("Error while emitting latency marker.", t);
}
} catch (Throwable t) {
LOG.warn("Error while emitting latency marker", t);
}
}
}, 0, latencyTrackingInterval, TimeUnit.MILLISECONDS);
},
0L,
latencyTrackingInterval);
}

public void close() {
latencyMarkTimer.cancel(true);
scheduleExecutor.shutdownNow();
}
}
}
Expand Up @@ -55,6 +55,16 @@ public abstract class ProcessingTimeService {
*/
public abstract ScheduledFuture<?> registerTimer(long timestamp, ProcessingTimeCallback target);

/**
* Registers a task to be executed repeatedly at a fixed rate.
*
* @param callback to be executed after the initial delay and then after each period
* @param initialDelay initial delay to start executing callback
* @param period after the initial delay after which the callback is executed
* @return Scheduled future representing the task to be executed repeatedly
*/
public abstract ScheduledFuture<?> scheduleAtFixedRate(ProcessingTimeCallback callback, long initialDelay, long period);

/**
* Returns <tt>true</tt> if the service has been shut down, <tt>false</tt> otherwise.
*/
Expand Down
Expand Up @@ -18,6 +18,7 @@
package org.apache.flink.streaming.runtime.tasks;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.util.Preconditions;

import javax.annotation.Nonnull;
import java.util.concurrent.BlockingQueue;
Expand Down Expand Up @@ -123,6 +124,33 @@ else if (status == STATUS_SHUTDOWN) {
}
}

@Override
public ScheduledFuture<?> scheduleAtFixedRate(ProcessingTimeCallback callback, long initialDelay, long period) {
long nextTimestamp = getCurrentProcessingTime() + initialDelay;

// we directly try to register the timer and only react to the status on exception
// that way we save unnecessary volatile accesses for each timer
try {
return timerService.scheduleAtFixedRate(
new RepeatedTriggerTask(task, checkpointLock, callback, nextTimestamp, period),
initialDelay,
period,
TimeUnit.MILLISECONDS);
} catch (RejectedExecutionException e) {
final int status = this.status.get();
if (status == STATUS_QUIESCED) {
return new NeverCompleteFuture(initialDelay);
}
else if (status == STATUS_SHUTDOWN) {
throw new IllegalStateException("Timer service is shut down");
}
else {
// something else happened, so propagate the exception
throw e;
}
}
}

@Override
public boolean isTerminated() {
return status.get() == STATUS_SHUTDOWN;
Expand Down Expand Up @@ -196,6 +224,46 @@ public void run() {
}
}

/**
* Internal task which is repeatedly called by the processing time service.
*/
private static final class RepeatedTriggerTask implements Runnable {
private final Object lock;
private final ProcessingTimeCallback target;
private final long period;
private final AsyncExceptionHandler exceptionHandler;

private long nextTimestamp;

private RepeatedTriggerTask(
AsyncExceptionHandler exceptionHandler,
Object lock,
ProcessingTimeCallback target,
long nextTimestamp,
long period) {
this.lock = Preconditions.checkNotNull(lock);
this.target = Preconditions.checkNotNull(target);
this.period = period;
this.exceptionHandler = Preconditions.checkNotNull(exceptionHandler);

this.nextTimestamp = nextTimestamp;
}

@Override
public void run() {
try {
synchronized (lock) {
target.onProcessingTime(nextTimestamp);
}

nextTimestamp += period;
} catch (Throwable t) {
TimerException asyncException = new TimerException(t);
exceptionHandler.handleAsyncException("Caught exception while processing repeated timer task.", asyncException);
}
}
}

// ------------------------------------------------------------------------

private static final class NeverCompleteFuture implements ScheduledFuture<Object> {
Expand Down

0 comments on commit ab2125b

Please sign in to comment.