Skip to content

Commit

Permalink
[streaming] Streaming API grouping rework to use batch api Keys
Browse files Browse the repository at this point in the history
  • Loading branch information
gyfora committed Jan 2, 2015
1 parent 1d019b9 commit 8bf9416
Show file tree
Hide file tree
Showing 26 changed files with 378 additions and 616 deletions.
Expand Up @@ -33,6 +33,7 @@
import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.ClosureCleaner; import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.operators.Keys;
import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.api.java.typeutils.TypeExtractor;
Expand Down Expand Up @@ -66,8 +67,7 @@
import org.apache.flink.streaming.partitioner.FieldsPartitioner; import org.apache.flink.streaming.partitioner.FieldsPartitioner;
import org.apache.flink.streaming.partitioner.ShufflePartitioner; import org.apache.flink.streaming.partitioner.ShufflePartitioner;
import org.apache.flink.streaming.partitioner.StreamPartitioner; import org.apache.flink.streaming.partitioner.StreamPartitioner;
import org.apache.flink.streaming.util.keys.FieldsKeySelector; import org.apache.flink.streaming.util.keys.KeySelectorUtil;
import org.apache.flink.streaming.util.keys.PojoKeySelector;


/** /**
* A DataStream represents a stream of elements of the same type. A DataStream * A DataStream represents a stream of elements of the same type. A DataStream
Expand Down Expand Up @@ -245,9 +245,11 @@ public <R> ConnectedDataStream<OUT, R> connect(DataStream<R> dataStream) {
* @return The grouped {@link DataStream} * @return The grouped {@link DataStream}
*/ */
public GroupedDataStream<OUT> groupBy(int... fields) { public GroupedDataStream<OUT> groupBy(int... fields) {

if (getType() instanceof BasicArrayTypeInfo || getType() instanceof PrimitiveArrayTypeInfo) {
return groupBy(FieldsKeySelector.getSelector(getType(), fields)); return groupBy(new KeySelectorUtil.ArrayKeySelector<OUT>(fields));

} else {
return groupBy(new Keys.ExpressionKeys<OUT>(fields, getType()));
}
} }


/** /**
Expand All @@ -264,7 +266,7 @@ public GroupedDataStream<OUT> groupBy(int... fields) {
**/ **/
public GroupedDataStream<OUT> groupBy(String... fields) { public GroupedDataStream<OUT> groupBy(String... fields) {


return groupBy(new PojoKeySelector<OUT>(getType(), fields)); return groupBy(new Keys.ExpressionKeys<OUT>(fields, getType()));


} }


Expand All @@ -282,6 +284,11 @@ public GroupedDataStream<OUT> groupBy(KeySelector<OUT, ?> keySelector) {
return new GroupedDataStream<OUT>(this, clean(keySelector)); return new GroupedDataStream<OUT>(this, clean(keySelector));
} }


private GroupedDataStream<OUT> groupBy(Keys<OUT> keys) {
return new GroupedDataStream<OUT>(this, clean(KeySelectorUtil.getSelectorForKeys(keys,
getType())));
}

/** /**
* Sets the partitioning of the {@link DataStream} so that the output is * Sets the partitioning of the {@link DataStream} so that the output is
* partitioned by the selected fields. This setting only effects the how the * partitioned by the selected fields. This setting only effects the how the
Expand All @@ -293,9 +300,11 @@ public GroupedDataStream<OUT> groupBy(KeySelector<OUT, ?> keySelector) {
* @return The DataStream with fields partitioning set. * @return The DataStream with fields partitioning set.
*/ */
public DataStream<OUT> partitionBy(int... fields) { public DataStream<OUT> partitionBy(int... fields) {

if (getType() instanceof BasicArrayTypeInfo || getType() instanceof PrimitiveArrayTypeInfo) {
return setConnectionType(new FieldsPartitioner<OUT>(FieldsKeySelector.getSelector( return partitionBy(new KeySelectorUtil.ArrayKeySelector<OUT>(fields));
getType(), fields))); } else {
return partitionBy(new Keys.ExpressionKeys<OUT>(fields, getType()));
}
} }


/** /**
Expand All @@ -309,9 +318,11 @@ public DataStream<OUT> partitionBy(int... fields) {
* @return The DataStream with fields partitioning set. * @return The DataStream with fields partitioning set.
*/ */
public DataStream<OUT> partitionBy(String... fields) { public DataStream<OUT> partitionBy(String... fields) {
return partitionBy(new Keys.ExpressionKeys<OUT>(fields, getType()));
}


return setConnectionType(new FieldsPartitioner<OUT>(new PojoKeySelector<OUT>(getType(), private DataStream<OUT> partitionBy(Keys<OUT> keys) {
fields))); return partitionBy(KeySelectorUtil.getSelectorForKeys(keys, getType()));
} }


/** /**
Expand Down Expand Up @@ -411,7 +422,7 @@ public IterativeDataStream<OUT> iterate() {
* the data stream that will be fed back and used as the input for the * the data stream that will be fed back and used as the input for the
* iteration head. A common usage pattern for streaming iterations is to use * iteration head. A common usage pattern for streaming iterations is to use
* output splitting to send a part of the closing data stream to the head. * output splitting to send a part of the closing data stream to the head.
* Refer to {@link SingleOutputStreamOperator#split(OutputSelector)} for * Refer to {@link SingleOutputStreamOperator#split(outputSelector)} for
* more information. * more information.
* <p> * <p>
* The iteration edge will be partitioned the same way as the first input of * The iteration edge will be partitioned the same way as the first input of
Expand Down Expand Up @@ -549,7 +560,7 @@ public StreamProjection<OUT> project(int... fieldIndexes) {
* {@link StreamCrossOperator#onWindow} should be called to define the * {@link StreamCrossOperator#onWindow} should be called to define the
* window. * window.
* <p> * <p>
* Call {@link StreamCrossOperator.CrossWindow#with(CrossFunction)} to * Call {@link StreamCrossOperator.CrossWindow#with(crossFunction)} to
* define a custom cross function. * define a custom cross function.
* *
* @param dataStreamToCross * @param dataStreamToCross
Expand All @@ -572,7 +583,7 @@ public <IN2> StreamCrossOperator<OUT, IN2> cross(DataStream<IN2> dataStreamToCro
* window, and then the {@link StreamJoinOperator.JoinWindow#where} and * window, and then the {@link StreamJoinOperator.JoinWindow#where} and
* {@link StreamJoinOperator.JoinPredicate#equalTo} can be used to define * {@link StreamJoinOperator.JoinPredicate#equalTo} can be used to define
* the join keys.</p> The user can also use the * the join keys.</p> The user can also use the
* {@link StreamJoinOperator.JoinedStream#with(JoinFunction)} to apply * {@link StreamJoinOperator.JoinedStream#with(joinFunction)} to apply
* custom join function. * custom join function.
* *
* @param other * @param other
Expand Down
Expand Up @@ -21,14 +21,14 @@
import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.operators.Keys;
import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.function.co.JoinWindowFunction; import org.apache.flink.streaming.api.function.co.JoinWindowFunction;
import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable; import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable;
import org.apache.flink.streaming.util.keys.FieldsKeySelector; import org.apache.flink.streaming.util.keys.KeySelectorUtil;
import org.apache.flink.streaming.util.keys.PojoKeySelector;


public class StreamJoinOperator<I1, I2> extends public class StreamJoinOperator<I1, I2> extends
TemporalOperator<I1, I2, StreamJoinOperator.JoinWindow<I1, I2>> { TemporalOperator<I1, I2, StreamJoinOperator.JoinWindow<I1, I2>> {
Expand All @@ -45,9 +45,11 @@ protected JoinWindow<I1, I2> createNextWindowOperator() {
public static class JoinWindow<I1, I2> { public static class JoinWindow<I1, I2> {


private StreamJoinOperator<I1, I2> op; private StreamJoinOperator<I1, I2> op;
private TypeInformation<I1> type1;


private JoinWindow(StreamJoinOperator<I1, I2> operator) { private JoinWindow(StreamJoinOperator<I1, I2> operator) {
this.op = operator; this.op = operator;
this.type1 = op.input1.getType();
} }


/** /**
Expand All @@ -64,8 +66,8 @@ private JoinWindow(StreamJoinOperator<I1, I2> operator) {
* {@link JoinPredicate#equalTo} to continue the Join. * {@link JoinPredicate#equalTo} to continue the Join.
*/ */
public JoinPredicate<I1, I2> where(int... fields) { public JoinPredicate<I1, I2> where(int... fields) {
return new JoinPredicate<I1, I2>(op, FieldsKeySelector.getSelector(op.input1.getType(), return new JoinPredicate<I1, I2>(op, KeySelectorUtil.getSelectorForKeys(
fields)); new Keys.ExpressionKeys<I1>(fields, type1), type1));
} }


/** /**
Expand All @@ -81,8 +83,8 @@ public JoinPredicate<I1, I2> where(int... fields) {
* {@link JoinPredicate#equalTo} to continue the Join. * {@link JoinPredicate#equalTo} to continue the Join.
*/ */
public JoinPredicate<I1, I2> where(String... fields) { public JoinPredicate<I1, I2> where(String... fields) {
return new JoinPredicate<I1, I2>(op, new PojoKeySelector<I1>(op.input1.getType(), return new JoinPredicate<I1, I2>(op, KeySelectorUtil.getSelectorForKeys(
fields)); new Keys.ExpressionKeys<I1>(fields, type1), type1));
} }


/** /**
Expand Down Expand Up @@ -114,13 +116,15 @@ public <K> JoinPredicate<I1, I2> where(KeySelector<I1, K> keySelector) {
*/ */
public static class JoinPredicate<I1, I2> { public static class JoinPredicate<I1, I2> {


public StreamJoinOperator<I1, I2> op; private StreamJoinOperator<I1, I2> op;
public KeySelector<I1, ?> keys1; private KeySelector<I1, ?> keys1;
public KeySelector<I2, ?> keys2; private KeySelector<I2, ?> keys2;
private TypeInformation<I2> type2;


private JoinPredicate(StreamJoinOperator<I1, I2> operator, KeySelector<I1, ?> keys1) { private JoinPredicate(StreamJoinOperator<I1, I2> operator, KeySelector<I1, ?> keys1) {
this.op = operator; this.op = operator;
this.keys1 = keys1; this.keys1 = keys1;
this.type2 = op.input2.getType();
} }


/** /**
Expand All @@ -138,7 +142,8 @@ private JoinPredicate(StreamJoinOperator<I1, I2> operator, KeySelector<I1, ?> ke
* apply a custom wrapping * apply a custom wrapping
*/ */
public JoinedStream<I1, I2> equalTo(int... fields) { public JoinedStream<I1, I2> equalTo(int... fields) {
keys2 = FieldsKeySelector.getSelector(op.input2.getType(), fields); keys2 = KeySelectorUtil.getSelectorForKeys(new Keys.ExpressionKeys<I2>(fields, type2),
type2);
return createJoinOperator(); return createJoinOperator();
} }


Expand All @@ -156,7 +161,8 @@ public JoinedStream<I1, I2> equalTo(int... fields) {
* apply a custom wrapping * apply a custom wrapping
*/ */
public JoinedStream<I1, I2> equalTo(String... fields) { public JoinedStream<I1, I2> equalTo(String... fields) {
this.keys2 = new PojoKeySelector<I2>(op.input2.getType(), fields); this.keys2 = KeySelectorUtil.getSelectorForKeys(new Keys.ExpressionKeys<I2>(fields,
type2), type2);
return createJoinOperator(); return createJoinOperator();
} }


Expand Down
Expand Up @@ -41,8 +41,12 @@ public JoinWindowFunction(KeySelector<IN1, ?> keySelector1, KeySelector<IN2, ?>
@Override @Override
public void coWindow(List<IN1> first, List<IN2> second, Collector<OUT> out) throws Exception { public void coWindow(List<IN1> first, List<IN2> second, Collector<OUT> out) throws Exception {
for (IN1 item1 : first) { for (IN1 item1 : first) {
Object key1 = keySelector1.getKey(item1);

for (IN2 item2 : second) { for (IN2 item2 : second) {
if (keySelector1.getKey(item1).equals(keySelector2.getKey(item2))) { Object key2 = keySelector2.getKey(item2);

if (key1.equals(key2)) {
out.collect(joinFunction.join(item1, item2)); out.collect(joinFunction.join(item1, item2));
} }
} }
Expand Down
Expand Up @@ -59,7 +59,7 @@ public static void runOnMiniCluster(JobGraph jobGraph, int degreeOfParallelism,
exec = new LocalFlinkMiniCluster(configuration, true); exec = new LocalFlinkMiniCluster(configuration, true);
ActorRef jobClient = exec.getJobClient(); ActorRef jobClient = exec.getJobClient();


JobClient.submitJobAndWait(jobGraph, false, jobClient, exec.timeout()); JobClient.submitJobAndWait(jobGraph, true, jobClient, exec.timeout());


} catch (Exception e) { } catch (Exception e) {
throw e; throw e;
Expand Down

This file was deleted.

This file was deleted.

0 comments on commit 8bf9416

Please sign in to comment.