Skip to content

Commit

Permalink
[hotfix] [streaming] Handle rich functions properly in aligned time w…
Browse files Browse the repository at this point in the history
…indows
  • Loading branch information
StephanEwen committed Oct 2, 2015
1 parent 7390201 commit 6e0e67d
Show file tree
Hide file tree
Showing 10 changed files with 101 additions and 57 deletions.
Expand Up @@ -34,6 +34,7 @@
import org.apache.flink.streaming.api.windowing.evictors.Evictor; import org.apache.flink.streaming.api.windowing.evictors.Evictor;
import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger; import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.Trigger; import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProcessingTimeWindowOperator; import org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProcessingTimeWindowOperator;
import org.apache.flink.streaming.runtime.operators.windowing.AggregatingProcessingTimeWindowOperator; import org.apache.flink.streaming.runtime.operators.windowing.AggregatingProcessingTimeWindowOperator;
Expand Down Expand Up @@ -248,7 +249,7 @@ private <R> DataStream<R> createFastTimeOperatorIfValid(
} }
else if (function instanceof KeyedWindowFunction) { else if (function instanceof KeyedWindowFunction) {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
KeyedWindowFunction<T, R, K, Window> wf = (KeyedWindowFunction<T, R, K, Window>) function; KeyedWindowFunction<T, R, K, TimeWindow> wf = (KeyedWindowFunction<T, R, K, TimeWindow>) function;


OneInputStreamOperator<T, R> op = new AccumulatingProcessingTimeWindowOperator<>( OneInputStreamOperator<T, R> op = new AccumulatingProcessingTimeWindowOperator<>(
wf, input.getKeySelector(), windowLength, windowSlide); wf, input.getKeySelector(), windowLength, windowSlide);
Expand All @@ -273,7 +274,7 @@ else if (function instanceof KeyedWindowFunction) {
} }
else if (function instanceof KeyedWindowFunction) { else if (function instanceof KeyedWindowFunction) {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
KeyedWindowFunction<T, R, K, Window> wf = (KeyedWindowFunction<T, R, K, Window>) function; KeyedWindowFunction<T, R, K, TimeWindow> wf = (KeyedWindowFunction<T, R, K, TimeWindow>) function;


OneInputStreamOperator<T, R> op = new AccumulatingProcessingTimeWindowOperator<>( OneInputStreamOperator<T, R> op = new AccumulatingProcessingTimeWindowOperator<>(
wf, input.getKeySelector(), windowLength, windowSlide); wf, input.getKeySelector(), windowLength, windowSlide);
Expand Down
Expand Up @@ -24,7 +24,10 @@
import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext; import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;


/** /**
* Base class for operators that do not contain a user-defined function. * Base class for all stream operators.
*
* Operators that contain a user function should extend the class
* {@link AbstractUdfStreamOperator} instead (which is a specialized subclass of this class).
* *
* @param <OUT> The output type of the operator * @param <OUT> The output type of the operator
*/ */
Expand Down
Expand Up @@ -22,6 +22,7 @@
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Objects;


import org.apache.flink.api.common.functions.Function; import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.util.FunctionUtils; import org.apache.flink.api.common.functions.util.FunctionUtils;
Expand All @@ -41,29 +42,45 @@


/** /**
* This is used as the base class for operators that have a user-defined * This is used as the base class for operators that have a user-defined
* function. * function. This class handles the opening and closing of the user-defined functions,
* as part of the operator life cycle.
* *
* @param <OUT> * @param <OUT>
* The output type of the operator * The output type of the operator
* @param <F> * @param <F>
* The type of the user function * The type of the user function
*/ */
public abstract class AbstractUdfStreamOperator<OUT, F extends Function & Serializable> public abstract class AbstractUdfStreamOperator<OUT, F extends Function>
extends AbstractStreamOperator<OUT> implements StatefulStreamOperator<OUT> { extends AbstractStreamOperator<OUT> implements StatefulStreamOperator<OUT> {


private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;


private static final Logger LOG = LoggerFactory.getLogger(AbstractUdfStreamOperator.class); private static final Logger LOG = LoggerFactory.getLogger(AbstractUdfStreamOperator.class);




/** the user function */
protected final F userFunction; protected final F userFunction;


/** Flag to prevent duplicate function.close() calls in close() and dispose() */
private boolean functionsClosed = false; private boolean functionsClosed = false;



public AbstractUdfStreamOperator(F userFunction) { public AbstractUdfStreamOperator(F userFunction) {
this.userFunction = userFunction; this.userFunction = Objects.requireNonNull(userFunction);
} }


/**
* Gets the user function executed in this operator.
* @return The user function of this operator.
*/
public F getUserFunction() {
return userFunction;
}

// ------------------------------------------------------------------------
// operator life cycle
// ------------------------------------------------------------------------

@Override @Override
public final void setup(Output<StreamRecord<OUT>> output, StreamingRuntimeContext runtimeContext) { public final void setup(Output<StreamRecord<OUT>> output, StreamingRuntimeContext runtimeContext) {
super.setup(output, runtimeContext); super.setup(output, runtimeContext);
Expand Down Expand Up @@ -97,6 +114,10 @@ public void dispose() {
} }
} }


// ------------------------------------------------------------------------
// checkpointing and recovery
// ------------------------------------------------------------------------

@Override @Override
@SuppressWarnings({ "unchecked", "rawtypes" }) @SuppressWarnings({ "unchecked", "rawtypes" })
public void restoreInitialState(Tuple2<StateHandle<Serializable>, Map<String, OperatorStateHandle>> snapshots) throws Exception { public void restoreInitialState(Tuple2<StateHandle<Serializable>, Map<String, OperatorStateHandle>> snapshots) throws Exception {
Expand Down Expand Up @@ -170,8 +191,4 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception {
} }
} }
} }

public F getUserFunction() {
return userFunction;
}
} }
Expand Up @@ -24,7 +24,7 @@
import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.util.MathUtils; import org.apache.flink.runtime.util.MathUtils;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.TimestampedCollector; import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.watermark.Watermark;
Expand All @@ -33,7 +33,8 @@
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;




public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT> extends AbstractStreamOperator<OUT> public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT, F extends Function>
extends AbstractUdfStreamOperator<OUT, F>
implements OneInputStreamOperator<IN, OUT>, Triggerable { implements OneInputStreamOperator<IN, OUT>, Triggerable {


private static final long serialVersionUID = 3245500864882459867L; private static final long serialVersionUID = 3245500864882459867L;
Expand All @@ -60,11 +61,13 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT>
private transient long nextSlideTime; private transient long nextSlideTime;


protected AbstractAlignedProcessingTimeWindowOperator( protected AbstractAlignedProcessingTimeWindowOperator(
Function function, F function,
KeySelector<IN, KEY> keySelector, KeySelector<IN, KEY> keySelector,
long windowLength, long windowLength,
long windowSlide) long windowSlide)
{ {
super(function);

if (function == null || keySelector == null) { if (function == null || keySelector == null) {
throw new NullPointerException(); throw new NullPointerException();
} }
Expand Down Expand Up @@ -103,6 +106,8 @@ protected AbstractAlignedProcessingTimeWindowOperator(


@Override @Override
public void open(Configuration parameters) throws Exception { public void open(Configuration parameters) throws Exception {
super.open(parameters);

out = new TimestampedCollector<>(output); out = new TimestampedCollector<>(output);


// create the panes that gather the elements per slide // create the panes that gather the elements per slide
Expand All @@ -119,6 +124,8 @@ public void open(Configuration parameters) throws Exception {


@Override @Override
public void close() throws Exception { public void close() throws Exception {
super.close();

final long finalWindowTimestamp = nextEvaluationTime; final long finalWindowTimestamp = nextEvaluationTime;


// early stop the triggering thread, so it does not attempt to return any more data // early stop the triggering thread, so it does not attempt to return any more data
Expand All @@ -130,12 +137,17 @@ public void close() throws Exception {


@Override @Override
public void dispose() { public void dispose() {
super.dispose();

// acquire the lock during shutdown, to prevent trigger calls at the same time // acquire the lock during shutdown, to prevent trigger calls at the same time
// fail-safe stop of the triggering thread (in case of an error) // fail-safe stop of the triggering thread (in case of an error)
stopTriggers(); stopTriggers();


// release the panes // release the panes. panes may still be null if dispose is called
panes.dispose(); // after open() failed
if (panes != null) {
panes.dispose();
}
} }


private void stopTriggers() { private void stopTriggers() {
Expand Down
Expand Up @@ -21,17 +21,18 @@
import org.apache.flink.api.common.functions.Function; import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.functions.windowing.KeyedWindowFunction; import org.apache.flink.streaming.api.functions.windowing.KeyedWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.streaming.api.windowing.windows.Window;




public class AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT> public class AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT>
extends AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT> { extends AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT, KeyedWindowFunction<IN, OUT, KEY, TimeWindow>> {


private static final long serialVersionUID = 7305948082830843475L; private static final long serialVersionUID = 7305948082830843475L;




public AccumulatingProcessingTimeWindowOperator( public AccumulatingProcessingTimeWindowOperator(
KeyedWindowFunction<IN, OUT, KEY, Window> function, KeyedWindowFunction<IN, OUT, KEY, TimeWindow> function,
KeySelector<IN, KEY> keySelector, KeySelector<IN, KEY> keySelector,
long windowLength, long windowLength,
long windowSlide) long windowSlide)
Expand Down
Expand Up @@ -23,7 +23,7 @@
import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.functions.KeySelector;


public class AggregatingProcessingTimeWindowOperator<KEY, IN> public class AggregatingProcessingTimeWindowOperator<KEY, IN>
extends AbstractAlignedProcessingTimeWindowOperator<KEY, IN, IN> { extends AbstractAlignedProcessingTimeWindowOperator<KEY, IN, IN, ReduceFunction<IN>> {


private static final long serialVersionUID = 7305948082830843475L; private static final long serialVersionUID = 7305948082830843475L;


Expand Down
Expand Up @@ -86,6 +86,9 @@
*/ */
public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends AbstractInvokable implements StatefulTask<StateHandle<Serializable>> { public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends AbstractInvokable implements StatefulTask<StateHandle<Serializable>> {


/** The thread group that holds all trigger timer threads */
public static final ThreadGroup TRIGGER_THREAD_GROUP = new ThreadGroup("Triggers");

private static final Logger LOG = LoggerFactory.getLogger(StreamTask.class); private static final Logger LOG = LoggerFactory.getLogger(StreamTask.class);


/** /**
Expand All @@ -104,9 +107,6 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs


protected ClassLoader userClassLoader; protected ClassLoader userClassLoader;


/** The thread group that holds all trigger timer threads */
public static final ThreadGroup TRIGGER_THREAD_GROUP = new ThreadGroup("Triggers");

/** The executor service that */ /** The executor service that */
private ScheduledExecutorService timerService; private ScheduledExecutorService timerService;


Expand Down
Expand Up @@ -18,15 +18,18 @@


package org.apache.flink.streaming.runtime.operators; package org.apache.flink.streaming.runtime.operators;


import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.StreamMap; import org.apache.flink.streaming.api.operators.StreamMap;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask; import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness; import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.apache.flink.streaming.runtime.tasks.StreamTask;

import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;

import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner; import org.powermock.modules.junit4.PowerMockRunner;


Expand All @@ -38,7 +41,8 @@
* Tests for the timer service of {@link org.apache.flink.streaming.runtime.tasks.StreamTask}. * Tests for the timer service of {@link org.apache.flink.streaming.runtime.tasks.StreamTask}.
*/ */
@RunWith(PowerMockRunner.class) @RunWith(PowerMockRunner.class)
@PrepareForTest({ResultPartitionWriter.class}) @PrepareForTest(ResultPartitionWriter.class)
@SuppressWarnings("serial")
public class StreamTaskTimerTest { public class StreamTaskTimerTest {


@Test @Test
Expand All @@ -47,7 +51,8 @@ public void testOpenCloseAndTimestamps() throws Exception {
final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(mapTask, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(mapTask, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);


StreamConfig streamConfig = testHarness.getStreamConfig(); StreamConfig streamConfig = testHarness.getStreamConfig();
StreamMap<String, String> mapOperator = new StreamMap<>(null);
StreamMap<String, String> mapOperator = new StreamMap<>(new DummyMapFunction<String>());
streamConfig.setStreamOperator(mapOperator); streamConfig.setStreamOperator(mapOperator);


testHarness.invoke(); testHarness.invoke();
Expand Down Expand Up @@ -77,12 +82,11 @@ public void trigger(long timestamp) {}
@Test @Test
public void checkScheduledTimestampe() { public void checkScheduledTimestampe() {
try { try {

final OneInputStreamTask<String, String> mapTask = new OneInputStreamTask<>(); final OneInputStreamTask<String, String> mapTask = new OneInputStreamTask<>();
final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(mapTask, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(mapTask, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);


StreamConfig streamConfig = testHarness.getStreamConfig(); StreamConfig streamConfig = testHarness.getStreamConfig();
StreamMap<String, String> mapOperator = new StreamMap<>(null); StreamMap<String, String> mapOperator = new StreamMap<>(new DummyMapFunction<String>());
streamConfig.setStreamOperator(mapOperator); streamConfig.setStreamOperator(mapOperator);


testHarness.invoke(); testHarness.invoke();
Expand Down Expand Up @@ -162,4 +166,11 @@ public void trigger(long timestamp) {
} }
} }
} }

// ------------------------------------------------------------------------

public static class DummyMapFunction<T> implements MapFunction<T, T> {
@Override
public T map(T value) { return value; }
}
} }

0 comments on commit 6e0e67d

Please sign in to comment.