Skip to content

Commit

Permalink
[FLINK-3464] Use Processing-Time Clock in Window Assigners/Triggers
Browse files Browse the repository at this point in the history
Introduces a custom TimeServiceProvider to the StreamTask.
This is responsible for defining and updating the current
processingtime for a task and handling all related action,
such as registering timers for actions to be executed in
the future.
  • Loading branch information
kl0u authored and aljoscha committed Jun 27, 2016
1 parent cb2b76d commit 4b5a789
Show file tree
Hide file tree
Showing 28 changed files with 762 additions and 108 deletions.
Expand Up @@ -425,7 +425,7 @@ private static class PeriodicWatermarkEmitter implements Triggerable {
//------------------------------------------------- //-------------------------------------------------


public void start() { public void start() {
triggerContext.registerTimer(System.currentTimeMillis() + interval, this); triggerContext.registerTimer(triggerContext.getCurrentProcessingTime() + interval, this);
} }


@Override @Override
Expand Down Expand Up @@ -454,7 +454,7 @@ public void trigger(long timestamp) throws Exception {
} }


// schedule the next watermark // schedule the next watermark
triggerContext.registerTimer(System.currentTimeMillis() + interval, this); triggerContext.registerTimer(triggerContext.getCurrentProcessingTime() + interval, this);
} }
} }
} }
Expand Up @@ -38,15 +38,16 @@
import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.runtime.operators.Triggerable; import org.apache.flink.streaming.runtime.operators.Triggerable;
import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider;
import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
import org.apache.flink.util.Preconditions;


import java.io.Serializable; import java.io.Serializable;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;


@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
public class MockRuntimeContext extends StreamingRuntimeContext { public class MockRuntimeContext extends StreamingRuntimeContext {
Expand All @@ -57,16 +58,27 @@ public class MockRuntimeContext extends StreamingRuntimeContext {
private final ExecutionConfig execConfig; private final ExecutionConfig execConfig;
private final Object checkpointLock; private final Object checkpointLock;


private ScheduledExecutorService timer; private final TimeServiceProvider timerService;

public MockRuntimeContext(int numberOfParallelSubtasks, int indexOfThisSubtask) { public MockRuntimeContext(int numberOfParallelSubtasks, int indexOfThisSubtask) {
this(numberOfParallelSubtasks, indexOfThisSubtask, new ExecutionConfig(), null); this(numberOfParallelSubtasks, indexOfThisSubtask, new ExecutionConfig(), null);
} }


public MockRuntimeContext(
int numberOfParallelSubtasks, int indexOfThisSubtask,
ExecutionConfig execConfig,
Object checkpointLock) {

this(numberOfParallelSubtasks, indexOfThisSubtask, execConfig, checkpointLock,
DefaultTimeServiceProvider.create(Executors.newSingleThreadScheduledExecutor()));
}

public MockRuntimeContext( public MockRuntimeContext(
int numberOfParallelSubtasks, int indexOfThisSubtask, int numberOfParallelSubtasks, int indexOfThisSubtask,
ExecutionConfig execConfig, ExecutionConfig execConfig,
Object checkpointLock) { Object checkpointLock,
TimeServiceProvider timerService) {

super(new MockStreamOperator(), super(new MockStreamOperator(),
new MockEnvironment("no", 4 * MemoryManager.DEFAULT_PAGE_SIZE, null, 16), new MockEnvironment("no", 4 * MemoryManager.DEFAULT_PAGE_SIZE, null, 16),
Collections.<String, Accumulator<?, ?>>emptyMap()); Collections.<String, Accumulator<?, ?>>emptyMap());
Expand All @@ -75,6 +87,7 @@ public MockRuntimeContext(
this.indexOfThisSubtask = indexOfThisSubtask; this.indexOfThisSubtask = indexOfThisSubtask;
this.execConfig = execConfig; this.execConfig = execConfig;
this.checkpointLock = checkpointLock; this.checkpointLock = checkpointLock;
this.timerService = timerService;
} }


@Override @Override
Expand Down Expand Up @@ -186,16 +199,17 @@ public <T> ListState<T> getListState(ListStateDescriptor<T> stateProperties) {
public <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties) { public <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties) {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }


public long getCurrentProcessingTime() {
Preconditions.checkNotNull(timerService, "The processing time timer has not been initialized.");
return timerService.getCurrentProcessingTime();
}

@Override @Override
public ScheduledFuture<?> registerTimer(final long time, final Triggerable target) { public ScheduledFuture<?> registerTimer(final long time, final Triggerable target) {
if (timer == null) { Preconditions.checkNotNull(timerService, "The processing time timer has not been initialized.");
timer = Executors.newSingleThreadScheduledExecutor();
}


final long delay = Math.max(time - System.currentTimeMillis(), 0); return timerService.registerTimer(time, new Runnable() {

return timer.schedule(new Runnable() {
@Override @Override
public void run() { public void run() {
synchronized (checkpointLock) { synchronized (checkpointLock) {
Expand All @@ -207,7 +221,7 @@ public void run() {
} }
} }
} }
}, delay, TimeUnit.MILLISECONDS); });
} }


// ------------------------------------------------------------------------ // ------------------------------------------------------------------------
Expand Down
Expand Up @@ -260,6 +260,10 @@ protected ScheduledFuture<?> registerTimer(long time, Triggerable target) {
return container.registerTimer(time, target); return container.registerTimer(time, target);
} }


protected long getCurrentProcessingTime() {
return container.getCurrentProcessingTime();
}

/** /**
* Creates a partitioned state handle, using the state backend configured for this task. * Creates a partitioned state handle, using the state backend configured for this task.
* *
Expand Down
Expand Up @@ -93,7 +93,15 @@ public InputSplitProvider getInputSplitProvider() {
public ScheduledFuture<?> registerTimer(long time, Triggerable target) { public ScheduledFuture<?> registerTimer(long time, Triggerable target) {
return operator.registerTimer(time, target); return operator.registerTimer(time, target);
} }


/**
* Returns the current processing time as defined by the task's
* {@link org.apache.flink.streaming.runtime.tasks.TimeServiceProvider TimeServiceProvider}
*/
public long getCurrentProcessingTime() {
return operator.getCurrentProcessingTime();
}

// ------------------------------------------------------------------------ // ------------------------------------------------------------------------
// broadcast variables // broadcast variables
// ------------------------------------------------------------------------ // ------------------------------------------------------------------------
Expand Down
Expand Up @@ -51,7 +51,7 @@ protected EventTimeSessionWindows(long sessionTimeout) {
} }


@Override @Override
public Collection<TimeWindow> assignWindows(Object element, long timestamp) { public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
return Collections.singletonList(new TimeWindow(timestamp, timestamp + sessionTimeout)); return Collections.singletonList(new TimeWindow(timestamp, timestamp + sessionTimeout));
} }


Expand Down
Expand Up @@ -43,7 +43,7 @@ public class GlobalWindows extends WindowAssigner<Object, GlobalWindow> {
private GlobalWindows() {} private GlobalWindows() {}


@Override @Override
public Collection<GlobalWindow> assignWindows(Object element, long timestamp) { public Collection<GlobalWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
return Collections.singletonList(GlobalWindow.get()); return Collections.singletonList(GlobalWindow.get());
} }


Expand Down
Expand Up @@ -51,8 +51,9 @@ protected ProcessingTimeSessionWindows(long sessionTimeout) {
} }


@Override @Override
public Collection<TimeWindow> assignWindows(Object element, long timestamp) { public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
return Collections.singletonList(new TimeWindow(timestamp, timestamp + sessionTimeout)); long currentProcessingTime = context.getCurrentProcessingTime();
return Collections.singletonList(new TimeWindow(currentProcessingTime, currentProcessingTime + sessionTimeout));
} }


@Override @Override
Expand Down
Expand Up @@ -58,7 +58,7 @@ protected SlidingEventTimeWindows(long size, long slide) {
} }


@Override @Override
public Collection<TimeWindow> assignWindows(Object element, long timestamp) { public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
if (timestamp > Long.MIN_VALUE) { if (timestamp > Long.MIN_VALUE) {
List<TimeWindow> windows = new ArrayList<>((int) (size / slide)); List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
long lastStart = timestamp - timestamp % slide; long lastStart = timestamp - timestamp % slide;
Expand Down
Expand Up @@ -55,8 +55,8 @@ private SlidingProcessingTimeWindows(long size, long slide) {
} }


@Override @Override
public Collection<TimeWindow> assignWindows(Object element, long timestamp) { public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
timestamp = System.currentTimeMillis(); timestamp = context.getCurrentProcessingTime();
List<TimeWindow> windows = new ArrayList<>((int) (size / slide)); List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
long lastStart = timestamp - timestamp % slide; long lastStart = timestamp - timestamp % slide;
for (long start = lastStart; for (long start = lastStart;
Expand Down
Expand Up @@ -54,7 +54,7 @@ protected TumblingEventTimeWindows(long size) {
} }


@Override @Override
public Collection<TimeWindow> assignWindows(Object element, long timestamp) { public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
if (timestamp > Long.MIN_VALUE) { if (timestamp > Long.MIN_VALUE) {
// Long.MIN_VALUE is currently assigned when no timestamp is present // Long.MIN_VALUE is currently assigned when no timestamp is present
long start = timestamp - (timestamp % size); long start = timestamp - (timestamp % size);
Expand Down
Expand Up @@ -51,8 +51,8 @@ private TumblingProcessingTimeWindows(long size) {
} }


@Override @Override
public Collection<TimeWindow> assignWindows(Object element, long timestamp) { public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
final long now = System.currentTimeMillis(); final long now = context.getCurrentProcessingTime();
long start = now - (now % size); long start = now - (now % size);
return Collections.singletonList(new TimeWindow(start, start + size)); return Collections.singletonList(new TimeWindow(start, start + size));
} }
Expand Down
Expand Up @@ -50,8 +50,9 @@ public abstract class WindowAssigner<T, W extends Window> implements Serializabl
* *
* @param element The element to which windows should be assigned. * @param element The element to which windows should be assigned.
* @param timestamp The timestamp of the element. * @param timestamp The timestamp of the element.
* @param context The {@link WindowAssignerContext} in which the assigner operates.
*/ */
public abstract Collection<W> assignWindows(T element, long timestamp); public abstract Collection<W> assignWindows(T element, long timestamp, WindowAssignerContext context);


/** /**
* Returns the default trigger associated with this {@code WindowAssigner}. * Returns the default trigger associated with this {@code WindowAssigner}.
Expand Down
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.windowing.assigners;

import org.apache.flink.streaming.runtime.tasks.StreamTask;

/**
* A context provided to the {@link WindowAssigner} that allows it to query the
* current processing time. This is provided to the assigner by its containing
* {@link org.apache.flink.streaming.runtime.operators.windowing.WindowOperator},
* which, in turn, gets it from the containing
* {@link org.apache.flink.streaming.runtime.tasks.StreamTask}.
*/
public abstract class WindowAssignerContext {

/**
* Returns the current processing time, as returned by
* the {@link StreamTask#getCurrentProcessingTime()}.
*/
public abstract long getCurrentProcessingTime();

}
Expand Up @@ -89,6 +89,8 @@ public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) t
@Override @Override
public void clear(W window, TriggerContext ctx) throws Exception { public void clear(W window, TriggerContext ctx) throws Exception {
ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc); ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
long timestamp = fireTimestamp.get();
ctx.deleteEventTimeTimer(timestamp);
fireTimestamp.clear(); fireTimestamp.clear();
} }


Expand Down
Expand Up @@ -29,8 +29,8 @@
import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.streaming.api.windowing.windows.Window;


/** /**
* A {@link Trigger} that continuously fires based on a given time interval. The time is the current * A {@link Trigger} that continuously fires based on a given time interval as measured by
* system time. * the clock of the machine on which the job is running.
* *
* @param <W> The type of {@link Window Windows} on which this trigger can operate. * @param <W> The type of {@link Window Windows} on which this trigger can operate.
*/ */
Expand All @@ -52,7 +52,7 @@ private ContinuousProcessingTimeTrigger(long interval) {
public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception { public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc); ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);


timestamp = System.currentTimeMillis(); timestamp = ctx.getCurrentProcessingTime();


if (fireTimestamp.get() == null) { if (fireTimestamp.get() == null) {
long start = timestamp - (timestamp % interval); long start = timestamp - (timestamp % interval);
Expand Down Expand Up @@ -87,6 +87,8 @@ public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) t
@Override @Override
public void clear(W window, TriggerContext ctx) throws Exception { public void clear(W window, TriggerContext ctx) throws Exception {
ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc); ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
long timestamp = fireTimestamp.get();
ctx.deleteProcessingTimeTimer(timestamp);
fireTimestamp.clear(); fireTimestamp.clear();
} }


Expand Down
Expand Up @@ -33,7 +33,7 @@ private ProcessingTimeTrigger() {}


@Override @Override
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) { public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) {
ctx.registerProcessingTimeTimer(window.getEnd()); ctx.registerProcessingTimeTimer(window.maxTimestamp());
return TriggerResult.CONTINUE; return TriggerResult.CONTINUE;
} }


Expand Down
Expand Up @@ -27,6 +27,7 @@
import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.tasks.StreamTask;


import java.io.Serializable; import java.io.Serializable;


Expand Down Expand Up @@ -126,6 +127,12 @@ public void clear(W window, TriggerContext ctx) throws Exception {}
*/ */
public interface TriggerContext { public interface TriggerContext {


/**
* Returns the current processing time, as returned by
* the {@link StreamTask#getCurrentProcessingTime()}.
*/
long getCurrentProcessingTime();

/** /**
* Returns the metric group for this {@link Trigger}. This is the same metric * Returns the metric group for this {@link Trigger}. This is the same metric
* group that would be returned from {@link RuntimeContext#getMetricGroup()} in a user * group that would be returned from {@link RuntimeContext#getMetricGroup()} in a user
Expand Down Expand Up @@ -170,7 +177,7 @@ public interface TriggerContext {
void deleteEventTimeTimer(long time); void deleteEventTimeTimer(long time);


/** /**
* Retrieves an {@link State} object that can be used to interact with * Retrieves a {@link State} object that can be used to interact with
* fault-tolerant state that is scoped to the window and key of the current * fault-tolerant state that is scoped to the window and key of the current
* trigger invocation. * trigger invocation.
* *
Expand Down
Expand Up @@ -63,7 +63,7 @@
*/ */
@Internal @Internal
public class StreamInputProcessor<IN> { public class StreamInputProcessor<IN> {

private final RecordDeserializer<DeserializationDelegate<StreamElement>>[] recordDeserializers; private final RecordDeserializer<DeserializationDelegate<StreamElement>>[] recordDeserializers;


private RecordDeserializer<DeserializationDelegate<StreamElement>> currentRecordDeserializer; private RecordDeserializer<DeserializationDelegate<StreamElement>> currentRecordDeserializer;
Expand All @@ -76,7 +76,7 @@ public class StreamInputProcessor<IN> {


private boolean isFinished; private boolean isFinished;





private final long[] watermarks; private final long[] watermarks;
private long lastEmittedWatermark; private long lastEmittedWatermark;
Expand Down
Expand Up @@ -83,10 +83,10 @@ public EvictingWindowOperator(WindowAssigner<? super IN, W> windowAssigner,
@Override @Override
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void processElement(StreamRecord<IN> element) throws Exception { public void processElement(StreamRecord<IN> element) throws Exception {

Collection<W> elementWindows = windowAssigner.assignWindows( Collection<W> elementWindows = windowAssigner.assignWindows(
element.getValue(), element.getValue(),
element.getTimestamp()); element.getTimestamp(),
windowAssignerContext);


final K key = (K) getStateBackend().getCurrentKey(); final K key = (K) getStateBackend().getCurrentKey();


Expand Down

0 comments on commit 4b5a789

Please sign in to comment.