Skip to content

Commit

Permalink
[FLINK-4494] Expose the TimeServiceProvider from the Task to each Ope…
Browse files Browse the repository at this point in the history
…rator.
  • Loading branch information
kl0u authored and aljoscha committed Sep 23, 2016
1 parent 4779c7e commit ffff299
Show file tree
Hide file tree
Showing 20 changed files with 177 additions and 304 deletions.
Expand Up @@ -35,14 +35,12 @@
import org.apache.flink.runtime.state.VoidNamespaceSerializer; import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.operators.Triggerable; import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


import java.util.concurrent.ScheduledFuture;

/** /**
* Base class for all stream operators. Operators that contain a user function should extend the class * Base class for all stream operators. Operators that contain a user function should extend the class
* {@link AbstractUdfStreamOperator} instead (which is a specialized subclass of this class). * {@link AbstractUdfStreamOperator} instead (which is a specialized subclass of this class).
Expand Down Expand Up @@ -230,18 +228,11 @@ public <K> KeyedStateBackend<K> getStateBackend() {
} }


/** /**
* Register a timer callback. At the specified time the provided {@link Triggerable} will * Returns the {@link TimeServiceProvider} responsible for getting the current
* be invoked. This call is guaranteed to not happen concurrently with method calls on the operator. * processing time and registering timers.
*
* @param time The absolute time in milliseconds.
* @param target The target to be triggered.
*/ */
protected ScheduledFuture<?> registerTimer(long time, Triggerable target) { protected TimeServiceProvider getTimerService() {
return container.registerTimer(time, target); return container.getTimerService();
}

protected long getCurrentProcessingTime() {
return container.getCurrentProcessingTime();
} }


/** /**
Expand Down
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.operators.Triggerable; import org.apache.flink.streaming.runtime.operators.Triggerable;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;


import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;


Expand Down Expand Up @@ -189,6 +190,7 @@ public void close() {}
public static class AutomaticWatermarkContext<T> implements SourceFunction.SourceContext<T> { public static class AutomaticWatermarkContext<T> implements SourceFunction.SourceContext<T> {


private final StreamSource<?, ?> owner; private final StreamSource<?, ?> owner;
private final TimeServiceProvider timeService;
private final Object lockingObject; private final Object lockingObject;
private final Output<StreamRecord<T>> output; private final Output<StreamRecord<T>> output;
private final StreamRecord<T> reuse; private final StreamRecord<T> reuse;
Expand All @@ -209,22 +211,23 @@ public AutomaticWatermarkContext(
} }


this.owner = owner; this.owner = owner;
this.timeService = owner.getTimerService();
this.lockingObject = lockingObjectParam; this.lockingObject = lockingObjectParam;
this.output = outputParam; this.output = outputParam;
this.watermarkInterval = watermarkInterval; this.watermarkInterval = watermarkInterval;
this.reuse = new StreamRecord<T>(null); this.reuse = new StreamRecord<T>(null);


long now = owner.getCurrentProcessingTime(); long now = this.timeService.getCurrentProcessingTime();
this.watermarkTimer = owner.registerTimer(now + watermarkInterval, this.watermarkTimer = this.timeService.registerTimer(now + watermarkInterval,
new WatermarkEmittingTask(owner, lockingObjectParam, outputParam)); new WatermarkEmittingTask(this.timeService, lockingObjectParam, outputParam));
} }


@Override @Override
public void collect(T element) { public void collect(T element) {
owner.checkAsyncException(); owner.checkAsyncException();


synchronized (lockingObject) { synchronized (lockingObject) {
final long currentTime = owner.getCurrentProcessingTime(); final long currentTime = this.timeService.getCurrentProcessingTime();
output.collect(reuse.replace(element, currentTime)); output.collect(reuse.replace(element, currentTime));


// this is to avoid lock contention in the lockingObject by // this is to avoid lock contention in the lockingObject by
Expand Down Expand Up @@ -276,19 +279,19 @@ public void close() {


private class WatermarkEmittingTask implements Triggerable { private class WatermarkEmittingTask implements Triggerable {


private final StreamSource<?, ?> owner; private final TimeServiceProvider timeService;
private final Object lockingObject; private final Object lockingObject;
private final Output<StreamRecord<T>> output; private final Output<StreamRecord<T>> output;


private WatermarkEmittingTask(StreamSource<?, ?> src, Object lock, Output<StreamRecord<T>> output) { private WatermarkEmittingTask(TimeServiceProvider timeService, Object lock, Output<StreamRecord<T>> output) {
this.owner = src; this.timeService = timeService;
this.lockingObject = lock; this.lockingObject = lock;
this.output = output; this.output = output;
} }


@Override @Override
public void trigger(long timestamp) { public void trigger(long timestamp) {
final long currentTime = owner.getCurrentProcessingTime(); final long currentTime = this.timeService.getCurrentProcessingTime();


if (currentTime > nextWatermarkTime) { if (currentTime > nextWatermarkTime) {
// align the watermarks across all machines. this will ensure that we // align the watermarks across all machines. this will ensure that we
Expand All @@ -304,8 +307,8 @@ public void trigger(long timestamp) {
} }
} }


owner.registerTimer(owner.getCurrentProcessingTime() + watermarkInterval, this.timeService.registerTimer(this.timeService.getCurrentProcessingTime() + watermarkInterval,
new WatermarkEmittingTask(owner, lockingObject, output)); new WatermarkEmittingTask(this.timeService, lockingObject, output));
} }
} }
} }
Expand Down
Expand Up @@ -83,25 +83,6 @@ public InputSplitProvider getInputSplitProvider() {
return taskEnvironment.getInputSplitProvider(); return taskEnvironment.getInputSplitProvider();
} }


/**
* Register a timer callback. At the specified time the {@link Triggerable } will be invoked.
* This call is guaranteed to not happen concurrently with method calls on the operator.
*
* @param time The absolute time in milliseconds.
* @param target The target to be triggered.
*/
public ScheduledFuture<?> registerTimer(long time, Triggerable target) {
return operator.registerTimer(time, target);
}

/**
* Returns the current processing time as defined by the task's
* {@link org.apache.flink.streaming.runtime.tasks.TimeServiceProvider TimeServiceProvider}
*/
public long getCurrentProcessingTime() {
return operator.getCurrentProcessingTime();
}

// ------------------------------------------------------------------------ // ------------------------------------------------------------------------
// broadcast variables // broadcast variables
// ------------------------------------------------------------------------ // ------------------------------------------------------------------------
Expand Down
Expand Up @@ -24,7 +24,6 @@
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.triggers.Trigger; import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.tasks.StreamTask;


import java.io.Serializable; import java.io.Serializable;


Expand Down Expand Up @@ -85,8 +84,7 @@ public abstract class WindowAssigner<T, W extends Window> implements Serializabl
public abstract static class WindowAssignerContext { public abstract static class WindowAssignerContext {


/** /**
* Returns the current processing time, as returned by * Returns the current processing time.
* the {@link StreamTask#getCurrentProcessingTime()}.
*/ */
public abstract long getCurrentProcessingTime(); public abstract long getCurrentProcessingTime();


Expand Down
Expand Up @@ -27,7 +27,6 @@
import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.tasks.StreamTask;


import java.io.Serializable; import java.io.Serializable;


Expand Down Expand Up @@ -128,8 +127,7 @@ public void clear(W window, TriggerContext ctx) throws Exception {}
public interface TriggerContext { public interface TriggerContext {


/** /**
* Returns the current processing time, as returned by * Returns the current processing time.
* the {@link StreamTask#getCurrentProcessingTime()}.
*/ */
long getCurrentProcessingTime(); long getCurrentProcessingTime();


Expand Down
Expand Up @@ -54,9 +54,9 @@ public void open() throws Exception {
super.open(); super.open();
watermarkInterval = getExecutionConfig().getAutoWatermarkInterval(); watermarkInterval = getExecutionConfig().getAutoWatermarkInterval();
if (watermarkInterval > 0) { if (watermarkInterval > 0) {
registerTimer(System.currentTimeMillis() + watermarkInterval, this); long now = getTimerService().getCurrentProcessingTime();
getTimerService().registerTimer(now + watermarkInterval, this);
} }

currentWatermark = Long.MIN_VALUE; currentWatermark = Long.MIN_VALUE;
} }


Expand All @@ -74,14 +74,15 @@ public void processElement(StreamRecord<T> element) throws Exception {
@Override @Override
public void trigger(long timestamp) throws Exception { public void trigger(long timestamp) throws Exception {
// register next timer // register next timer
registerTimer(System.currentTimeMillis() + watermarkInterval, this);
long newWatermark = userFunction.getCurrentWatermark(); long newWatermark = userFunction.getCurrentWatermark();

if (newWatermark > currentWatermark) { if (newWatermark > currentWatermark) {
currentWatermark = newWatermark; currentWatermark = newWatermark;
// emit watermark // emit watermark
output.emitWatermark(new Watermark(currentWatermark)); output.emitWatermark(new Watermark(currentWatermark));
} }

long now = getTimerService().getCurrentProcessingTime();
getTimerService().registerTimer(now + watermarkInterval, this);
} }


@Override @Override
Expand Down
Expand Up @@ -54,7 +54,8 @@ public void open() throws Exception {
watermarkInterval = getExecutionConfig().getAutoWatermarkInterval(); watermarkInterval = getExecutionConfig().getAutoWatermarkInterval();


if (watermarkInterval > 0) { if (watermarkInterval > 0) {
registerTimer(System.currentTimeMillis() + watermarkInterval, this); long now = getTimerService().getCurrentProcessingTime();
getTimerService().registerTimer(now + watermarkInterval, this);
} }
} }


Expand All @@ -76,7 +77,8 @@ public void trigger(long timestamp) throws Exception {
output.emitWatermark(newWatermark); output.emitWatermark(newWatermark);
} }


registerTimer(System.currentTimeMillis() + watermarkInterval, this); long now = getTimerService().getCurrentProcessingTime();
getTimerService().registerTimer(now + watermarkInterval, this);
} }


@Override @Override
Expand Down
Expand Up @@ -125,7 +125,7 @@ public void open() throws Exception {


// decide when to first compute the window and when to slide it // decide when to first compute the window and when to slide it
// the values should align with the start of time (that is, the UNIX epoch, not the big bang) // the values should align with the start of time (that is, the UNIX epoch, not the big bang)
final long now = getRuntimeContext().getCurrentProcessingTime(); final long now = getTimerService().getCurrentProcessingTime();
nextEvaluationTime = now + windowSlide - (now % windowSlide); nextEvaluationTime = now + windowSlide - (now % windowSlide);
nextSlideTime = now + paneSize - (now % paneSize); nextSlideTime = now + paneSize - (now % paneSize);


Expand Down Expand Up @@ -164,9 +164,9 @@ public void open() throws Exception {
nextPastTriggerTime = Math.min(nextPastEvaluationTime, nextPastSlideTime); nextPastTriggerTime = Math.min(nextPastEvaluationTime, nextPastSlideTime);
} }
} }

// make sure the first window happens // make sure the first window happens
registerTimer(firstTriggerTime, this); getTimerService().registerTimer(firstTriggerTime, this);
} }


@Override @Override
Expand Down Expand Up @@ -230,7 +230,7 @@ public void trigger(long timestamp) throws Exception {
} }


long nextTriggerTime = Math.min(nextEvaluationTime, nextSlideTime); long nextTriggerTime = Math.min(nextEvaluationTime, nextSlideTime);
registerTimer(nextTriggerTime, this); getTimerService().registerTimer(nextTriggerTime, this);
} }


private void computeWindow(long timestamp) throws Exception { private void computeWindow(long timestamp) throws Exception {
Expand Down
Expand Up @@ -255,7 +255,7 @@ public final void open() throws Exception {
windowAssignerContext = new WindowAssigner.WindowAssignerContext() { windowAssignerContext = new WindowAssigner.WindowAssignerContext() {
@Override @Override
public long getCurrentProcessingTime() { public long getCurrentProcessingTime() {
return WindowOperator.this.getCurrentProcessingTime(); return WindowOperator.this.getTimerService().getCurrentProcessingTime();
} }
}; };


Expand Down Expand Up @@ -721,7 +721,7 @@ public <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescri


@Override @Override
public long getCurrentProcessingTime() { public long getCurrentProcessingTime() {
return WindowOperator.this.getCurrentProcessingTime(); return WindowOperator.this.getTimerService().getCurrentProcessingTime();
} }


@Override @Override
Expand All @@ -732,7 +732,8 @@ public void registerProcessingTimeTimer(long time) {
processingTimeTimersQueue.add(timer); processingTimeTimersQueue.add(timer);
//If this is the first timer added for this timestamp register a TriggerTask //If this is the first timer added for this timestamp register a TriggerTask
if (processingTimeTimerTimestamps.add(time, 1) == 0) { if (processingTimeTimerTimestamps.add(time, 1) == 0) {
ScheduledFuture<?> scheduledFuture = WindowOperator.this.registerTimer(time, WindowOperator.this); ScheduledFuture<?> scheduledFuture = WindowOperator.this.getTimerService()
.registerTimer(time, WindowOperator.this);
processingTimeTimerFutures.put(time, scheduledFuture); processingTimeTimerFutures.put(time, scheduledFuture);
} }
} }
Expand Down
Expand Up @@ -47,7 +47,6 @@
import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.runtime.io.RecordWriterOutput; import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
import org.apache.flink.streaming.runtime.operators.Triggerable;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;


import org.slf4j.Logger; import org.slf4j.Logger;
Expand All @@ -65,7 +64,6 @@
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.RunnableFuture; import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ScheduledThreadPoolExecutor;


/** /**
Expand Down Expand Up @@ -207,16 +205,6 @@ public void setTimeService(TimeServiceProvider timeProvider) {
timerService = timeProvider; timerService = timeProvider;
} }


/**
* Returns the current processing time.
*/
public long getCurrentProcessingTime() {
if (timerService == null) {
throw new IllegalStateException("The timer service has not been initialized.");
}
return timerService.getCurrentProcessingTime();
}

@Override @Override
public final void invoke() throws Exception { public final void invoke() throws Exception {


Expand Down Expand Up @@ -825,13 +813,14 @@ private String createOperatorIdentifier(StreamOperator<?> operator, int vertexId
} }


/** /**
* Registers a timer. * Returns the {@link TimeServiceProvider} responsible for telling the current
* processing time and registering timers.
*/ */
public ScheduledFuture<?> registerTimer(final long timestamp, final Triggerable target) { public TimeServiceProvider getTimerService() {
if (timerService == null) { if (timerService == null) {
throw new IllegalStateException("The timer service has not been initialized."); throw new IllegalStateException("The timer service has not been initialized.");
} }
return timerService.registerTimer(timestamp, target); return timerService;
} }


/** /**
Expand Down
Expand Up @@ -45,12 +45,9 @@
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.concurrent.ScheduledFuture;


import static org.junit.Assert.*; import static org.junit.Assert.*;


import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -241,30 +238,15 @@ private static <T> void setupSourceOperator(StreamSource<T, ?> operator,
when(mockTask.getExecutionConfig()).thenReturn(executionConfig); when(mockTask.getExecutionConfig()).thenReturn(executionConfig);
when(mockTask.getAccumulatorMap()).thenReturn(Collections.<String, Accumulator<?, ?>>emptyMap()); when(mockTask.getAccumulatorMap()).thenReturn(Collections.<String, Accumulator<?, ?>>emptyMap());


doAnswer(new Answer<ScheduledFuture>() { doAnswer(new Answer<TimeServiceProvider>() {
@Override @Override
public ScheduledFuture answer(InvocationOnMock invocation) throws Throwable { public TimeServiceProvider answer(InvocationOnMock invocation) throws Throwable {
final long execTime = (Long) invocation.getArguments()[0];
final Triggerable target = (Triggerable) invocation.getArguments()[1];

if (timeProvider == null) {
throw new RuntimeException("The time provider is null");
}

timeProvider.registerTimer(execTime, target);
return null;
}
}).when(mockTask).registerTimer(anyLong(), any(Triggerable.class));

doAnswer(new Answer<Long>() {
@Override
public Long answer(InvocationOnMock invocation) throws Throwable {
if (timeProvider == null) { if (timeProvider == null) {
throw new RuntimeException("The time provider is null"); throw new RuntimeException("The time provider is null.");
} }
return timeProvider.getCurrentProcessingTime(); return timeProvider;
} }
}).when(mockTask).getCurrentProcessingTime(); }).when(mockTask).getTimerService();


operator.setup(mockTask, cfg, (Output<StreamRecord<T>>) mock(Output.class)); operator.setup(mockTask, cfg, (Output<StreamRecord<T>>) mock(Output.class));
} }
Expand Down

0 comments on commit ffff299

Please sign in to comment.