Skip to content

Commit

Permalink
[streaming] Temporal operator windowing syntax update
Browse files Browse the repository at this point in the history
  • Loading branch information
gyfora authored and mbalassi committed Jan 6, 2015
1 parent 92ceacd commit b0a2e4a
Show file tree
Hide file tree
Showing 18 changed files with 241 additions and 82 deletions.
Expand Up @@ -509,6 +509,10 @@ public <IN, OUT> void setInvokable(String id, StreamInvokable<IN, OUT> invokable
invokableObjects.put(id, invokableObject); invokableObjects.put(id, invokableObject);
} }


public StreamInvokable<?, ?> getInvokable(String id) {
return invokableObjects.get(id);
}

public <OUT> void setOutType(String id, TypeInformation<OUT> outType) { public <OUT> void setOutType(String id, TypeInformation<OUT> outType) {
StreamRecordSerializer<OUT> serializer = new StreamRecordSerializer<OUT>(outType); StreamRecordSerializer<OUT> serializer = new StreamRecordSerializer<OUT>(outType);
typeSerializersOut1.put(id, serializer); typeSerializersOut1.put(id, serializer);
Expand Down
Expand Up @@ -38,6 +38,8 @@
import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.JobGraphBuilder; import org.apache.flink.streaming.api.JobGraphBuilder;
import org.apache.flink.streaming.api.datastream.temporaloperator.StreamCrossOperator;
import org.apache.flink.streaming.api.datastream.temporaloperator.StreamJoinOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.aggregation.AggregationFunction; import org.apache.flink.streaming.api.function.aggregation.AggregationFunction;
import org.apache.flink.streaming.api.function.aggregation.AggregationFunction.AggregationType; import org.apache.flink.streaming.api.function.aggregation.AggregationFunction.AggregationType;
Expand Down Expand Up @@ -1180,7 +1182,7 @@ private void validateMerge(String id) {
* *
* @return The copy * @return The copy
*/ */
protected DataStream<OUT> copy() { public DataStream<OUT> copy() {
return new DataStream<OUT>(this); return new DataStream<OUT>(this);
} }


Expand Down
Expand Up @@ -38,7 +38,7 @@ protected DataStreamSink(DataStream<IN> dataStream) {
} }


@Override @Override
protected DataStreamSink<IN> copy() { public DataStreamSink<IN> copy() {
throw new RuntimeException("Data stream sinks cannot be copied"); throw new RuntimeException("Data stream sinks cannot be copied");
} }


Expand Down
Expand Up @@ -199,7 +199,7 @@ protected DataStream<OUT> setConnectionType(StreamPartitioner<OUT> partitioner)
} }


@Override @Override
protected GroupedDataStream<OUT> copy() { public GroupedDataStream<OUT> copy() {
return new GroupedDataStream<OUT>(this); return new GroupedDataStream<OUT>(this);
} }
} }
Expand Up @@ -72,7 +72,7 @@ public DataStream<IN> closeWith(DataStream<IN> iterationTail) {
} }


@Override @Override
protected IterativeDataStream<IN> copy() { public IterativeDataStream<IN> copy() {
return new IterativeDataStream<IN>(this, iterationID, waitTime); return new IterativeDataStream<IN>(this, iterationID, waitTime);
} }
} }
Expand Up @@ -186,7 +186,7 @@ public SingleOutputStreamOperator<OUT, O> distribute() {
} }


@Override @Override
protected SingleOutputStreamOperator<OUT, O> copy() { public SingleOutputStreamOperator<OUT, O> copy() {
return new SingleOutputStreamOperator<OUT, O>(this); return new SingleOutputStreamOperator<OUT, O>(this);
} }


Expand Down
Expand Up @@ -16,20 +16,24 @@
* limitations under the License. * limitations under the License.
*/ */


package org.apache.flink.streaming.api.datastream; package org.apache.flink.streaming.api.datastream.temporaloperator;

import java.util.concurrent.TimeUnit;


import org.apache.flink.api.common.functions.CrossFunction; import org.apache.flink.api.common.functions.CrossFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.operators.CrossOperator; import org.apache.flink.api.java.operators.CrossOperator;
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.function.co.CrossWindowFunction; import org.apache.flink.streaming.api.function.co.CrossWindowFunction;
import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable; import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable;


public class StreamCrossOperator<I1, I2> extends public class StreamCrossOperator<I1, I2> extends
TemporalOperator<I1, I2, StreamCrossOperator.CrossWindow<I1, I2>> { TemporalOperator<I1, I2, StreamCrossOperator.CrossWindow<I1, I2>> {

public StreamCrossOperator(DataStream<I1> input1, DataStream<I2> input2) { public StreamCrossOperator(DataStream<I1> input1, DataStream<I2> input2) {
super(input1, input2); super(input1, input2);
} }
Expand All @@ -47,7 +51,8 @@ protected CrossWindow<I1, I2> createNextWindowOperator() {
} }


public static class CrossWindow<I1, I2> extends public static class CrossWindow<I1, I2> extends
SingleOutputStreamOperator<Tuple2<I1, I2>, CrossWindow<I1, I2>> { SingleOutputStreamOperator<Tuple2<I1, I2>, CrossWindow<I1, I2>> implements
TemporalWindow<CrossWindow<I1, I2>> {


private StreamCrossOperator<I1, I2> op; private StreamCrossOperator<I1, I2> op;


Expand All @@ -56,6 +61,16 @@ public CrossWindow(StreamCrossOperator<I1, I2> op, DataStream<Tuple2<I1, I2>> ds
this.op = op; this.op = op;
} }


public CrossWindow<I1, I2> every(long length, TimeUnit timeUnit) {
return every(timeUnit.toMillis(length));
}

@SuppressWarnings("unchecked")
public CrossWindow<I1, I2> every(long length) {
((CoWindowInvokable<I1, I2, ?>) jobGraphBuilder.getInvokable(id)).setSlideSize(length);
return this;
}

/** /**
* Finalizes a temporal Cross transformation by applying a * Finalizes a temporal Cross transformation by applying a
* {@link CrossFunction} to each pair of crossed elements.<br/> * {@link CrossFunction} to each pair of crossed elements.<br/>
Expand Down
Expand Up @@ -16,7 +16,9 @@
* limitations under the License. * limitations under the License.
*/ */


package org.apache.flink.streaming.api.datastream; package org.apache.flink.streaming.api.datastream.temporaloperator;

import java.util.concurrent.TimeUnit;


import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.TypeInformation;
Expand All @@ -26,6 +28,8 @@
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.function.co.JoinWindowFunction; import org.apache.flink.streaming.api.function.co.JoinWindowFunction;
import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable; import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable;
import org.apache.flink.streaming.util.keys.KeySelectorUtil; import org.apache.flink.streaming.util.keys.KeySelectorUtil;
Expand All @@ -42,7 +46,7 @@ protected JoinWindow<I1, I2> createNextWindowOperator() {
return new JoinWindow<I1, I2>(this); return new JoinWindow<I1, I2>(this);
} }


public static class JoinWindow<I1, I2> { public static class JoinWindow<I1, I2> implements TemporalWindow<JoinWindow<I1, I2>> {


private StreamJoinOperator<I1, I2> op; private StreamJoinOperator<I1, I2> op;
private TypeInformation<I1> type1; private TypeInformation<I1> type1;
Expand Down Expand Up @@ -104,6 +108,17 @@ public <K> JoinPredicate<I1, I2> where(KeySelector<I1, K> keySelector) {
return new JoinPredicate<I1, I2>(op, keySelector); return new JoinPredicate<I1, I2>(op, keySelector);
} }


@Override
public JoinWindow<I1, I2> every(long length, TimeUnit timeUnit) {
return every(timeUnit.toMillis(length));
}

@Override
public JoinWindow<I1, I2> every(long length) {
op.slideInterval = length;
return this;
}

// ---------------------------------------------------------------------------------------- // ----------------------------------------------------------------------------------------


} }
Expand Down
Expand Up @@ -16,12 +16,16 @@
* limitations under the License. * limitations under the License.
*/ */


package org.apache.flink.streaming.api.datastream; package org.apache.flink.streaming.api.datastream.temporaloperator;


import java.util.concurrent.TimeUnit;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.windowing.helper.SystemTimestamp; import org.apache.flink.streaming.api.windowing.helper.SystemTimestamp;
import org.apache.flink.streaming.api.windowing.helper.Timestamp;
import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper; import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;


public abstract class TemporalOperator<I1, I2, OP> { public abstract class TemporalOperator<I1, I2, OP extends TemporalWindow<OP>> {


public final DataStream<I1> input1; public final DataStream<I1> input1;
public final DataStream<I2> input2; public final DataStream<I2> input2;
Expand All @@ -43,53 +47,71 @@ public TemporalOperator(DataStream<I1> input1, DataStream<I2> input2) {
/** /**
* Continues a temporal transformation.<br/> * Continues a temporal transformation.<br/>
* Defines the window size on which the two DataStreams will be transformed. * Defines the window size on which the two DataStreams will be transformed.
* To define sliding windows call {@link TemporalWindow#every} on the
* resulting operator.
* *
* @param windowSize * @param length
* The size of the window in milliseconds. * The size of the window in milliseconds.
* @param timeUnit
* The unit if time to be used
* @return An incomplete temporal transformation. * @return An incomplete temporal transformation.
*/ */
public OP onWindow(long windowSize) { @SuppressWarnings("unchecked")
return onWindow(windowSize, windowSize); public OP onWindow(long length, TimeUnit timeUnit) {
return onWindow(timeUnit.toMillis(length),
(TimestampWrapper<I1>) SystemTimestamp.getWrapper(),
(TimestampWrapper<I2>) SystemTimestamp.getWrapper());
} }


/** /**
* Continues a temporal transformation.<br/> * Continues a temporal transformation.<br/>
* Defines the window size on which the two DataStreams will be transformed. * Defines the window size on which the two DataStreams will be
* transformed.To define sliding windows call {@link TemporalWindow#every}
* on the resulting operator.
* *
* @param windowSize * @param windowSize
* The size of the window in milliseconds. * The size of the window in milliseconds.
* @param slideInterval * @param timeStamp1
* The slide size of the window. * The timestamp used to extract time from the elements of the
* first data stream.
* @param timeStamp2
* The timestamp used to extract time from the elements of the
* second data stream.
* @return An incomplete temporal transformation. * @return An incomplete temporal transformation.
*/ */
@SuppressWarnings("unchecked") public OP onWindow(long length, Timestamp<I1> timeStamp1, Timestamp<I2> timeStamp2) {
public OP onWindow(long windowSize, long slideInterval) { return onWindow(length, timeStamp1, timeStamp2, 0);
return onWindow(windowSize, slideInterval,
(TimestampWrapper<I1>) SystemTimestamp.getWrapper(),
(TimestampWrapper<I2>) SystemTimestamp.getWrapper());
} }


/** /**
* Continues a temporal transformation.<br/> * Continues a temporal transformation.<br/>
* Defines the window size on which the two DataStreams will be transformed. * Defines the window size on which the two DataStreams will be
* transformed.To define sliding windows call {@link TemporalWindow#every}
* on the resulting operator.
* *
* @param windowSize * @param windowSize
* The size of the window in milliseconds. * The size of the window in milliseconds.
* @param slideInterval
* The slide size of the window.
* @param timeStamp1 * @param timeStamp1
* The timestamp used to extract time from the elements of the * The timestamp used to extract time from the elements of the
* first data stream. * first data stream.
* @param timeStamp2 * @param timeStamp2
* The timestamp used to extract time from the elements of the * The timestamp used to extract time from the elements of the
* second data stream. * second data stream.
* @param startTime
* The start time to measure the first window
* @return An incomplete temporal transformation. * @return An incomplete temporal transformation.
*/ */
public OP onWindow(long windowSize, long slideInterval, TimestampWrapper<I1> timeStamp1, public OP onWindow(long length, Timestamp<I1> timeStamp1, Timestamp<I2> timeStamp2,
long startTime) {
return onWindow(length, new TimestampWrapper<I1>(timeStamp1, startTime),
new TimestampWrapper<I2>(timeStamp2, startTime));
}

private OP onWindow(long length, TimestampWrapper<I1> timeStamp1,
TimestampWrapper<I2> timeStamp2) { TimestampWrapper<I2> timeStamp2) {


this.windowSize = windowSize; this.windowSize = length;
this.slideInterval = slideInterval; this.slideInterval = length;


this.timeStamp1 = timeStamp1; this.timeStamp1 = timeStamp1;
this.timeStamp2 = timeStamp2; this.timeStamp2 = timeStamp2;
Expand Down
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.api.datastream.temporaloperator;

import java.util.concurrent.TimeUnit;

public interface TemporalWindow<T> {

/**
* Defines the slide interval for this temporal operator
*
* @param length
* Length of the window
* @param timeUnit
* Unit of time
* @return The temporal operator with slide interval specified
*/
public T every(long length, TimeUnit timeUnit);

/**
* Defines the slide interval for this temporal operator
*
* @param length
* Length of the window
* @return The temporal operator with slide interval specified
*/
public T every(long length);

}
Expand Up @@ -190,4 +190,8 @@ protected void callUserFunction1() throws Exception {
protected void callUserFunction2() throws Exception { protected void callUserFunction2() throws Exception {
} }


public void setSlideSize(long slideSize) {
this.slideSize = slideSize;
}

} }
Expand Up @@ -35,13 +35,13 @@
*/ */
public class Time<DATA> implements WindowingHelper<DATA> { public class Time<DATA> implements WindowingHelper<DATA> {


private long length; protected long length;
private TimeUnit granularity; protected TimeUnit granularity;
private TimestampWrapper<DATA> timestampWrapper; protected TimestampWrapper<DATA> timestampWrapper;
private long delay; protected long delay;


/** /**
* Creates an helper representing a trigger which triggers every given * Creates a helper representing a trigger which triggers every given
* length or an eviction which evicts all elements older than length. * length or an eviction which evicts all elements older than length.
* *
* @param length * @param length
Expand All @@ -62,7 +62,7 @@ private Time(long length, TimeUnit timeUnit, Timestamp<DATA> timestamp, long sta
} }


/** /**
* Creates an helper representing a trigger which triggers every given * Creates a helper representing a trigger which triggers every given
* length or an eviction which evicts all elements older than length. * length or an eviction which evicts all elements older than length.
* *
* @param length * @param length
Expand Down Expand Up @@ -160,11 +160,7 @@ public Time<DATA> withDelay(long delay) {
return this; return this;
} }


private long granularityInMillis() { protected long granularityInMillis() {
if (granularity != null) { return granularity == null ? length : granularity.toMillis(length);
return this.granularity.toMillis(this.length);
} else {
return this.length;
}
} }
} }

0 comments on commit b0a2e4a

Please sign in to comment.