From 63951adca0e8bfefd1d81b933017e9fadc5f556f Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Mon, 7 Sep 2015 11:34:48 +0200 Subject: [PATCH 1/2] [FLINK-2631] [streaming] Fixes the StreamFold operator. Adds OutputTypeConfigurable interface to support type injection at StreamGraph creation. Adds test for non serializable fold type. Adds test to verify proper output type forwarding for OutputTypeConfigurable implementations. Adds comments --- .../api/datastream/GroupedDataStream.java | 2 +- .../streaming/api/graph/StreamGraph.java | 6 + .../api/graph/StreamGraphGenerator.java | 2 +- .../api/operators/OutputTypeConfigurable.java | 42 ++++ .../streaming/api/operators/StreamFold.java | 51 +++- .../api/operators/StreamGroupedFold.java | 28 ++- .../api/operators/StreamGroupedReduce.java | 8 +- .../streaming/api/operators/StreamReduce.java | 3 +- .../api/StreamingOperatorsITCase.java | 230 ++++++++++++++++++ .../api/graph/StreamGraphGeneratorTest.java | 61 ++++- .../api/operators/StreamGroupedFoldTest.java | 10 +- ...alaStreamingMultipleProgramsTestBase.scala | 55 +++++ .../api/scala/StreamingOperatorsITCase.scala | 116 +++++++++ 13 files changed, 588 insertions(+), 26 deletions(-) create mode 100644 flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/OutputTypeConfigurable.java create mode 100644 flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamingOperatorsITCase.java create mode 100644 flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/ScalaStreamingMultipleProgramsTestBase.scala create mode 100644 flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java index 72ef945d0ea15..a1106bc381a5d 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java @@ -88,7 +88,7 @@ public GroupedDataStream(DataStream dataStream, KeySelector keySele Utils.getCallLocationName(), true); return transform("Grouped Fold", outType, new StreamGroupedFold(clean(folder), - keySelector, initialValue, outType)); + keySelector, initialValue)); } /** diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java index 6474ae9d652f2..5cd42c8421950 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java @@ -46,6 +46,7 @@ import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.collector.selector.OutputSelector; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.operators.OutputTypeConfigurable; import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.api.operators.StreamSource; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; @@ -205,6 +206,11 @@ public void addOperator(Integer vertexID, StreamOperator operator setSerializers(vertexID, inSerializer, null, outSerializer); + if (operatorObject instanceof OutputTypeConfigurable) { + // sets the output type which must be know at StreamGraph creation time + ((OutputTypeConfigurable) operatorObject).setOutputType(outTypeInfo, executionConfig); + } + if (LOG.isDebugEnabled()) { LOG.debug("Vertex: {}", vertexID); } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java index 6df8cb5c80426..774c00b3b23cc 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java @@ -47,7 +47,7 @@ * *

* This traverses the tree of {@code StreamTransformations} starting from the sinks. At each - * we transformation recursively transform the inputs, then create a node in the {@code StreamGraph} + * transformation we recursively transform the inputs, then create a node in the {@code StreamGraph} * and add edges from the input Nodes to our newly created node. The transformation methods * return the IDs of the nodes in the StreamGraph that represent the input transformation. Several * IDs can be returned to be able to deal with feedback transformations and unions. diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/OutputTypeConfigurable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/OutputTypeConfigurable.java new file mode 100644 index 0000000000000..7bc0a5076d83d --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/OutputTypeConfigurable.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; + +/** + * Stream operators can implement this interface if they need access to the output type information + * at {@link org.apache.flink.streaming.api.graph.StreamGraph} generation. This can be useful for + * cases where the output type is specified by the returns method and, thus, after the stream + * operator has been created. + */ +public interface OutputTypeConfigurable { + + /** + * Is called by the {@link org.apache.flink.streaming.api.graph.StreamGraph#addOperator(Integer, StreamOperator, TypeInformation, TypeInformation, String)} + * method when the {@link org.apache.flink.streaming.api.graph.StreamGraph} is generated. The + * method is called with the output {@link TypeInformation} which is also used for the + * {@link org.apache.flink.streaming.runtime.tasks.StreamTask} output serializer. + * + * @param outTypeInfo Output type information of the {@link org.apache.flink.streaming.runtime.tasks.StreamTask} + * @param executionConfig Execution configuration + */ + void setOutputType(TypeInformation outTypeInfo, ExecutionConfig executionConfig); +} diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFold.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFold.java index a5e526408b9d2..636e7bf78a172 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFold.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFold.java @@ -17,27 +17,36 @@ package org.apache.flink.streaming.api.operators; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.FoldFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.memory.InputViewDataInputStreamWrapper; +import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + public class StreamFold extends AbstractUdfStreamOperator> - implements OneInputStreamOperator { + implements OneInputStreamOperator, OutputTypeConfigurable { private static final long serialVersionUID = 1L; - private OUT accumulator; + protected transient OUT accumulator; + private byte[] serializedInitialValue; + protected TypeSerializer outTypeSerializer; - protected TypeInformation outTypeInformation; - public StreamFold(FoldFunction folder, OUT initialValue, TypeInformation outTypeInformation) { + public StreamFold(FoldFunction folder, OUT initialValue) { super(folder); this.accumulator = initialValue; - this.outTypeInformation = outTypeInformation; this.chainingStrategy = ChainingStrategy.FORCE_ALWAYS; } @@ -50,11 +59,41 @@ public void processElement(StreamRecord element) throws Exception { @Override public void open(Configuration config) throws Exception { super.open(config); - this.outTypeSerializer = outTypeInformation.createSerializer(executionConfig); + + if (serializedInitialValue == null) { + throw new RuntimeException("No initial value was serialized for the fold " + + "operator. Probably the setOutputType method was not called."); + } + + ByteArrayInputStream bais = new ByteArrayInputStream(serializedInitialValue); + InputViewDataInputStreamWrapper in = new InputViewDataInputStreamWrapper( + new DataInputStream(bais) + ); + + accumulator = outTypeSerializer.deserialize(in); } @Override public void processWatermark(Watermark mark) throws Exception { output.emitWatermark(mark); } + + @Override + public void setOutputType(TypeInformation outTypeInfo, ExecutionConfig executionConfig) { + outTypeSerializer = (TypeSerializer) outTypeInfo.createSerializer(executionConfig); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + OutputViewDataOutputStreamWrapper out = new OutputViewDataOutputStreamWrapper( + new DataOutputStream(baos) + ); + + try { + outTypeSerializer.serialize(accumulator, out); + } catch (IOException ioe) { + throw new RuntimeException("Unable to serialize initial value of type " + + accumulator.getClass().getSimpleName() + " of fold operator.", ioe); + } + + serializedInitialValue = baos.toByteArray(); + } } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java index 5272a48eb8d04..f4e44c6c82115 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java @@ -21,8 +21,8 @@ import java.util.Map; import org.apache.flink.api.common.functions.FoldFunction; -import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; public class StreamGroupedFold extends StreamFold { @@ -30,28 +30,34 @@ public class StreamGroupedFold extends StreamFold { private static final long serialVersionUID = 1L; private KeySelector keySelector; - private Map values; - private OUT initialValue; + private transient Map values; - public StreamGroupedFold(FoldFunction folder, KeySelector keySelector, - OUT initialValue, TypeInformation outTypeInformation) { - super(folder, initialValue, outTypeInformation); + public StreamGroupedFold( + FoldFunction folder, + KeySelector keySelector, + OUT initialValue) { + super(folder, initialValue); this.keySelector = keySelector; - this.initialValue = initialValue; + } + + @Override + public void open(Configuration configuration) throws Exception { + super.open(configuration); + values = new HashMap(); } @Override public void processElement(StreamRecord element) throws Exception { Object key = keySelector.getKey(element.getValue()); - OUT accumulator = values.get(key); + OUT value = values.get(key); - if (accumulator != null) { - OUT folded = userFunction.fold(outTypeSerializer.copy(accumulator), element.getValue()); + if (value != null) { + OUT folded = userFunction.fold(outTypeSerializer.copy(value), element.getValue()); values.put(key, folded); output.collect(element.replace(folded)); } else { - OUT first = userFunction.fold(outTypeSerializer.copy(initialValue), element.getValue()); + OUT first = userFunction.fold(outTypeSerializer.copy(accumulator), element.getValue()); values.put(key, first); output.collect(element.replace(first)); } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java index 6be011e7c4aa8..7533c33297222 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java @@ -29,17 +29,21 @@ public class StreamGroupedReduce extends StreamReduce { private static final long serialVersionUID = 1L; private KeySelector keySelector; - private Map values; + private transient Map values; public StreamGroupedReduce(ReduceFunction reducer, KeySelector keySelector) { super(reducer); this.keySelector = keySelector; - values = new HashMap(); } @Override public void processElement(StreamRecord element) throws Exception { Object key = keySelector.getKey(element.getValue()); + + if (values == null) { + values = new HashMap(); + } + IN currentValue = values.get(key); if (currentValue != null) { // TODO: find a way to let operators copy elements (maybe) diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamReduce.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamReduce.java index 52c07d06d0856..af562feb805a0 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamReduce.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamReduce.java @@ -26,7 +26,7 @@ public class StreamReduce extends AbstractUdfStreamOperator reducer) { super(reducer); @@ -42,7 +42,6 @@ public void processElement(StreamRecord element) throws Exception { currentValue = userFunction.reduce(currentValue, element.getValue()); } else { currentValue = element.getValue(); - } output.collect(element.replace(currentValue)); } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamingOperatorsITCase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamingOperatorsITCase.java new file mode 100644 index 0000000000000..11100a4f81221 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamingOperatorsITCase.java @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api; + +import org.apache.flink.api.common.functions.FoldFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.streaming.api.collector.selector.OutputSelector; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SplitDataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.util.ArrayList; +import java.util.List; + +public class StreamingOperatorsITCase extends StreamingMultipleProgramsTestBase { + + private String resultPath1; + private String resultPath2; + private String expected1; + private String expected2; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Before + public void before() throws Exception { + resultPath1 = tempFolder.newFile().toURI().toString(); + resultPath2 = tempFolder.newFile().toURI().toString(); + expected1 = ""; + expected2 = ""; + } + + @After + public void after() throws Exception { + compareResultsByLinesInMemory(expected1, resultPath1); + compareResultsByLinesInMemory(expected2, resultPath2); + } + + /** + * Tests the proper functioning of the streaming fold operator. For this purpose, a stream + * of Tuple2 is created. The stream is grouped according to the first tuple + * value. Each group is folded where the second tuple value is summed up. + * + * @throws Exception + */ + @Test + public void testFoldOperation() throws Exception { + int numElements = 10; + int numKeys = 2; + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + DataStream> sourceStream = env.addSource(new TupleSource(numElements, numKeys)); + + SplitDataStream> splittedResult = sourceStream + .groupBy(0) + .fold(0, new FoldFunction, Integer>() { + @Override + public Integer fold(Integer accumulator, Tuple2 value) throws Exception { + return accumulator + value.f1; + } + }).map(new RichMapFunction>() { + @Override + public Tuple2 map(Integer value) throws Exception { + return new Tuple2(getRuntimeContext().getIndexOfThisSubtask(), value); + } + }).split(new OutputSelector>() { + @Override + public Iterable select(Tuple2 value) { + List output = new ArrayList<>(); + + output.add(value.f0 + ""); + + return output; + } + }); + + splittedResult.select("0").map(new MapFunction, Integer>() { + @Override + public Integer map(Tuple2 value) throws Exception { + return value.f1; + } + }).writeAsText(resultPath1, FileSystem.WriteMode.OVERWRITE); + + splittedResult.select("1").map(new MapFunction, Integer>() { + @Override + public Integer map(Tuple2 value) throws Exception { + return value.f1; + } + }).writeAsText(resultPath2, FileSystem.WriteMode.OVERWRITE); + + StringBuilder builder1 = new StringBuilder(); + StringBuilder builder2 = new StringBuilder(); + int counter1 = 0; + int counter2 = 0; + + for (int i = 0; i < numElements; i++) { + if (i % 2 == 0) { + counter1 += i; + builder1.append(counter1 + "\n"); + } else { + counter2 += i; + builder2.append(counter2 + "\n"); + } + } + + expected1 = builder1.toString(); + expected2 = builder2.toString(); + + env.execute(); + } + + /** + * Tests whether the fold operation can also be called with non Java serializable types. + */ + @Test + public void testFoldOperationWithNonJavaSerializableType() throws Exception { + final int numElements = 10; + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + DataStream> input = env.addSource(new NonSerializableTupleSource(numElements)); + + input + .groupBy(0) + .fold( + new NonSerializable(42), + new FoldFunction, NonSerializable>() { + @Override + public NonSerializable fold(NonSerializable accumulator, Tuple2 value) throws Exception { + return new NonSerializable(accumulator.value + value.f1.value); + } + }) + .map(new MapFunction() { + @Override + public Integer map(NonSerializable value) throws Exception { + return value.value; + } + }) + .writeAsText(resultPath1, FileSystem.WriteMode.OVERWRITE); + + StringBuilder builder = new StringBuilder(); + + for (int i = 0; i < numElements; i++) { + builder.append(42 + i + "\n"); + } + + expected1 = builder.toString(); + + env.execute(); + } + + private static class NonSerializable { + // This makes the type non-serializable + private final Object obj = new Object(); + + private final int value; + + public NonSerializable(int value) { + this.value = value; + } + } + + private static class NonSerializableTupleSource implements SourceFunction> { + private final int numElements; + + public NonSerializableTupleSource(int numElements) { + this.numElements = numElements; + } + + + @Override + public void run(SourceContext> ctx) throws Exception { + for (int i = 0; i < numElements; i++) { + ctx.collect(new Tuple2(i, new NonSerializable(i))); + } + } + + @Override + public void cancel() {} + } + + private static class TupleSource implements SourceFunction> { + + private final int numElements; + private final int numKeys; + + public TupleSource(int numElements, int numKeys) { + this.numElements = numElements; + this.numKeys = numKeys; + } + + @Override + public void run(SourceContext> ctx) throws Exception { + for (int i = 0; i < numElements; i++) { + Tuple2 result = new Tuple2<>(i % numKeys, i); + ctx.collect(result); + } + } + + @Override + public void cancel() { + + } + } +} diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java index fb2ef56f2adcc..b91650458b6bb 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java @@ -17,18 +17,25 @@ */ package org.apache.flink.streaming.api.graph; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.OutputTypeConfigurable; +import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner; import org.apache.flink.streaming.runtime.partitioner.GlobalPartitioner; import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner; import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.EvenOddOutputSelector; import org.apache.flink.streaming.util.NoOpIntMap; import org.apache.flink.streaming.util.NoOpSink; import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; -import org.apache.flink.streaming.util.TestStreamEnvironment; import org.junit.Test; import static org.junit.Assert.assertEquals; @@ -176,4 +183,56 @@ public void testVirtualTransformations2() throws Exception { } + /** + * Test whether an {@link OutputTypeConfigurable} implementation gets called with the correct + * output type. In this test case the output type must be BasicTypeInfo.INT_TYPE_INFO. + * + * @throws Exception + */ + @Test + public void testOutputTypeConfiguration() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream source = env.fromElements(1, 10); + + OutputTypeConfigurableOperation outputTypeConfigurableOperation = new OutputTypeConfigurableOperation(); + + DataStream result = source.transform( + "Output type configurable operation", + BasicTypeInfo.INT_TYPE_INFO, + outputTypeConfigurableOperation); + + result.addSink(new NoOpSink()); + + StreamGraph graph = env.getStreamGraph(); + + assertEquals(BasicTypeInfo.INT_TYPE_INFO, outputTypeConfigurableOperation.getTypeInformation()); + } + + private static class OutputTypeConfigurableOperation + extends AbstractStreamOperator + implements OneInputStreamOperator, OutputTypeConfigurable { + + TypeInformation tpeInformation; + + public TypeInformation getTypeInformation() { + return tpeInformation; + } + + @Override + public void processElement(StreamRecord element) throws Exception { + output.collect(element); + } + + @Override + public void processWatermark(Watermark mark) throws Exception { + + } + + @Override + public void setOutputType(TypeInformation outTypeInfo, ExecutionConfig executionConfig) { + tpeInformation = (TypeInformation)outTypeInfo; + } + } + } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedFoldTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedFoldTest.java index cb08e65ea4dab..fb3e81a281705 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedFoldTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedFoldTest.java @@ -19,6 +19,7 @@ import java.util.concurrent.ConcurrentLinkedQueue; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.FoldFunction; import org.apache.flink.api.common.functions.RichFoldFunction; import org.apache.flink.api.common.functions.RichReduceFunction; @@ -71,7 +72,9 @@ public void testGroupedFold() throws Exception { public String getKey(Integer value) throws Exception { return value.toString(); } - }, "100", outType); + }, "100"); + + operator.setOutputType(outType, new ExecutionConfig()); OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness(operator); @@ -106,7 +109,10 @@ public void testOpenClose() throws Exception { public Integer getKey(Integer value) throws Exception { return value; } - }, "init", BasicTypeInfo.STRING_TYPE_INFO); + }, "init"); + + operator.setOutputType(BasicTypeInfo.STRING_TYPE_INFO, new ExecutionConfig()); + OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness(operator); long initialTime = 0L; diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/ScalaStreamingMultipleProgramsTestBase.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/ScalaStreamingMultipleProgramsTestBase.scala new file mode 100644 index 0000000000000..3342e1e8e2ced --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/ScalaStreamingMultipleProgramsTestBase.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.scala + +import org.apache.flink.runtime.StreamingMode +import org.apache.flink.streaming.util.TestStreamEnvironment +import org.apache.flink.test.util.{ForkableFlinkMiniCluster, TestBaseUtils} +import org.scalatest.BeforeAndAfterAll +import org.scalatest.junit.JUnitSuiteLike + +trait ScalaStreamingMultipleProgramsTestBase + extends TestBaseUtils + with JUnitSuiteLike + with BeforeAndAfterAll { + + val parallelism = 4 + var cluster: Option[ForkableFlinkMiniCluster] = None + + override protected def beforeAll(): Unit = { + val cluster = Some( + TestBaseUtils.startCluster( + 1, + parallelism, + StreamingMode.STREAMING, + false, + false, + true + ) + ) + + val clusterEnvironment = new TestStreamEnvironment(cluster.get, parallelism) + } + + override protected def afterAll(): Unit = { + cluster.foreach { + TestBaseUtils.stopCluster(_, TestBaseUtils.DEFAULT_TIMEOUT) + } + } +} diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala new file mode 100644 index 0000000000000..d5e2b7b740c65 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.scala + +import org.apache.flink.api.common.functions.{RichMapFunction, FoldFunction} +import org.apache.flink.core.fs.FileSystem +import org.apache.flink.streaming.api.functions.source.SourceFunction +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext +import org.apache.flink.test.util.TestBaseUtils +import org.junit.rules.TemporaryFolder +import org.junit.{After, Before, Rule, Test} + +class StreamingOperatorsITCase extends ScalaStreamingMultipleProgramsTestBase { + + var resultPath1: String = _ + var resultPath2: String = _ + var expected1: String = _ + var expected2: String = _ + + val _tempFolder = new TemporaryFolder() + + @Rule + def tempFolder: TemporaryFolder = _tempFolder + + @Before + def before(): Unit = { + val temp = tempFolder + resultPath1 = temp.newFile.toURI.toString + resultPath2 = temp.newFile.toURI.toString + expected1 = "" + expected2 = "" + } + + @After + def after(): Unit = { + TestBaseUtils.compareResultsByLinesInMemory(expected1, resultPath1) + TestBaseUtils.compareResultsByLinesInMemory(expected2, resultPath2) + } + + /** Tests the streaming fold operation. For this purpose a stream of Tuple[Int, Int] is created. + * The stream is grouped by the first field. For each group, the resulting stream is folded by + * summing up the second tuple field. + * + */ + @Test + def testFoldOperator(): Unit = { + val numElements = 10 + val numKeys = 2 + + val env = StreamExecutionEnvironment.getExecutionEnvironment + + env.setParallelism(2) + + val sourceStream = env.addSource(new SourceFunction[(Int, Int)] { + + override def run(ctx: SourceContext[(Int, Int)]): Unit = { + 0 until numElements foreach { + i => ctx.collect((i % numKeys, i)) + } + } + + override def cancel(): Unit = {} + }) + + val splittedResult = sourceStream + .groupBy(0) + .fold(0, new FoldFunction[(Int, Int), Int] { + override def fold(accumulator: Int, value: (Int, Int)): Int = { + accumulator + value._2 + } + }) + .map(new RichMapFunction[Int, (Int, Int)] { + override def map(value: Int): (Int, Int) = { + (getRuntimeContext.getIndexOfThisSubtask, value) + } + }) + .split{ + x => + Seq(x._1.toString) + } + + splittedResult + .select("0") + .map(_._2) + .getJavaStream + .writeAsText(resultPath1, FileSystem.WriteMode.OVERWRITE) + splittedResult + .select("1") + .map(_._2) + .getJavaStream + .writeAsText(resultPath2, FileSystem.WriteMode.OVERWRITE) + + val groupedSequence = 0 until numElements groupBy( _ % numKeys) + + expected1 = groupedSequence(0).scanLeft(0)(_ + _).tail.mkString("\n") + expected2 = groupedSequence(1).scanLeft(0)(_ + _).tail.mkString("\n") + + env.execute() + } +} From a9cb79b2a85148eeb6e6c119e5ee7f8878e98ccc Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Tue, 8 Sep 2015 15:54:30 +0200 Subject: [PATCH 2/2] Makes OutputTypeConfigurable typed, tests that TwoInputStreamOperator is output type configurable --- .../streaming/api/graph/StreamGraph.java | 31 +++++-- .../api/operators/OutputTypeConfigurable.java | 4 +- .../streaming/api/operators/StreamFold.java | 6 +- .../api/graph/StreamGraphGeneratorTest.java | 82 +++++++++++++++++-- 4 files changed, 104 insertions(+), 19 deletions(-) diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java index 5cd42c8421950..cda5686c23d4c 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java @@ -191,8 +191,12 @@ public void addSink(Integer vertexID, StreamOperator operatorObje sinks.add(vertexID); } - public void addOperator(Integer vertexID, StreamOperator operatorObject, - TypeInformation inTypeInfo, TypeInformation outTypeInfo, String operatorName) { + public void addOperator( + Integer vertexID, + StreamOperator operatorObject, + TypeInformation inTypeInfo, + TypeInformation outTypeInfo, + String operatorName) { if (operatorObject instanceof StreamSource) { addNode(vertexID, SourceStreamTask.class, operatorObject, operatorName); @@ -207,8 +211,10 @@ public void addOperator(Integer vertexID, StreamOperator operator setSerializers(vertexID, inSerializer, null, outSerializer); if (operatorObject instanceof OutputTypeConfigurable) { + @SuppressWarnings("unchecked") + OutputTypeConfigurable outputTypeConfigurable = (OutputTypeConfigurable) operatorObject; // sets the output type which must be know at StreamGraph creation time - ((OutputTypeConfigurable) operatorObject).setOutputType(outTypeInfo, executionConfig); + outputTypeConfigurable.setOutputType(outTypeInfo, executionConfig); } if (LOG.isDebugEnabled()) { @@ -216,17 +222,28 @@ public void addOperator(Integer vertexID, StreamOperator operator } } - public void addCoOperator(Integer vertexID, - TwoInputStreamOperator taskoperatorObject, TypeInformation in1TypeInfo, - TypeInformation in2TypeInfo, TypeInformation outTypeInfo, String operatorName) { + public void addCoOperator( + Integer vertexID, + TwoInputStreamOperator taskOperatorObject, + TypeInformation in1TypeInfo, + TypeInformation in2TypeInfo, + TypeInformation outTypeInfo, + String operatorName) { - addNode(vertexID, TwoInputStreamTask.class, taskoperatorObject, operatorName); + addNode(vertexID, TwoInputStreamTask.class, taskOperatorObject, operatorName); TypeSerializer outSerializer = (outTypeInfo != null) && !(outTypeInfo instanceof MissingTypeInfo) ? outTypeInfo.createSerializer(executionConfig) : null; setSerializers(vertexID, in1TypeInfo.createSerializer(executionConfig), in2TypeInfo.createSerializer(executionConfig), outSerializer); + if (taskOperatorObject instanceof OutputTypeConfigurable) { + @SuppressWarnings("unchecked") + OutputTypeConfigurable outputTypeConfigurable = (OutputTypeConfigurable) taskOperatorObject; + // sets the output type which must be know at StreamGraph creation time + outputTypeConfigurable.setOutputType(outTypeInfo, executionConfig); + } + if (LOG.isDebugEnabled()) { LOG.debug("CO-TASK: {}", vertexID); } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/OutputTypeConfigurable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/OutputTypeConfigurable.java index 7bc0a5076d83d..1d059661aa0b1 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/OutputTypeConfigurable.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/OutputTypeConfigurable.java @@ -27,7 +27,7 @@ * cases where the output type is specified by the returns method and, thus, after the stream * operator has been created. */ -public interface OutputTypeConfigurable { +public interface OutputTypeConfigurable { /** * Is called by the {@link org.apache.flink.streaming.api.graph.StreamGraph#addOperator(Integer, StreamOperator, TypeInformation, TypeInformation, String)} @@ -38,5 +38,5 @@ public interface OutputTypeConfigurable { * @param outTypeInfo Output type information of the {@link org.apache.flink.streaming.runtime.tasks.StreamTask} * @param executionConfig Execution configuration */ - void setOutputType(TypeInformation outTypeInfo, ExecutionConfig executionConfig); + void setOutputType(TypeInformation outTypeInfo, ExecutionConfig executionConfig); } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFold.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFold.java index 636e7bf78a172..81115f029d45c 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFold.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFold.java @@ -35,7 +35,7 @@ public class StreamFold extends AbstractUdfStreamOperator> - implements OneInputStreamOperator, OutputTypeConfigurable { + implements OneInputStreamOperator, OutputTypeConfigurable { private static final long serialVersionUID = 1L; @@ -79,8 +79,8 @@ public void processWatermark(Watermark mark) throws Exception { } @Override - public void setOutputType(TypeInformation outTypeInfo, ExecutionConfig executionConfig) { - outTypeSerializer = (TypeSerializer) outTypeInfo.createSerializer(executionConfig); + public void setOutputType(TypeInformation outTypeInfo, ExecutionConfig executionConfig) { + outTypeSerializer = outTypeInfo.createSerializer(executionConfig); ByteArrayOutputStream baos = new ByteArrayOutputStream(); OutputViewDataOutputStreamWrapper out = new OutputViewDataOutputStreamWrapper( diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java index b91650458b6bb..3b052740bd8fe 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java @@ -20,18 +20,22 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.api.datastream.ConnectedDataStream; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.operators.OutputTypeConfigurable; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner; import org.apache.flink.streaming.runtime.partitioner.GlobalPartitioner; import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner; import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext; import org.apache.flink.streaming.util.EvenOddOutputSelector; import org.apache.flink.streaming.util.NoOpIntMap; import org.apache.flink.streaming.util.NoOpSink; @@ -190,15 +194,15 @@ public void testVirtualTransformations2() throws Exception { * @throws Exception */ @Test - public void testOutputTypeConfiguration() throws Exception { + public void testOutputTypeConfigurationWithOneInputTransformation() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream source = env.fromElements(1, 10); - OutputTypeConfigurableOperation outputTypeConfigurableOperation = new OutputTypeConfigurableOperation(); + OutputTypeConfigurableOperationWithOneInput outputTypeConfigurableOperation = new OutputTypeConfigurableOperationWithOneInput(); DataStream result = source.transform( - "Output type configurable operation", + "Single input and output type configurable operation", BasicTypeInfo.INT_TYPE_INFO, outputTypeConfigurableOperation); @@ -209,9 +213,73 @@ public void testOutputTypeConfiguration() throws Exception { assertEquals(BasicTypeInfo.INT_TYPE_INFO, outputTypeConfigurableOperation.getTypeInformation()); } - private static class OutputTypeConfigurableOperation + @Test + public void testOutputTypeConfigurationWithTwoInputTransformation() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream source1 = env.fromElements(1, 10); + DataStream source2 = env.fromElements(2, 11); + + ConnectedDataStream connectedSource = source1.connect(source2); + + OutputTypeConfigurableOperationWithTwoInputs outputTypeConfigurableOperation = new OutputTypeConfigurableOperationWithTwoInputs(); + + DataStream result = connectedSource.transform( + "Two input and output type configurable operation", + BasicTypeInfo.INT_TYPE_INFO, + outputTypeConfigurableOperation); + + result.addSink(new NoOpSink()); + + StreamGraph graph = env.getStreamGraph(); + + assertEquals(BasicTypeInfo.INT_TYPE_INFO, outputTypeConfigurableOperation.getTypeInformation()); + } + + private static class OutputTypeConfigurableOperationWithTwoInputs + extends AbstractStreamOperator + implements TwoInputStreamOperator, OutputTypeConfigurable { + + TypeInformation tpeInformation; + + public TypeInformation getTypeInformation() { + return tpeInformation; + } + + @Override + public void setOutputType(TypeInformation outTypeInfo, ExecutionConfig executionConfig) { + tpeInformation = outTypeInfo; + } + + @Override + public void processElement1(StreamRecord element) throws Exception { + output.collect(element); + } + + @Override + public void processElement2(StreamRecord element) throws Exception { + output.collect(element); + } + + @Override + public void processWatermark1(Watermark mark) throws Exception { + + } + + @Override + public void processWatermark2(Watermark mark) throws Exception { + + } + + @Override + public void setup(Output output, StreamingRuntimeContext runtimeContext) { + + } + } + + private static class OutputTypeConfigurableOperationWithOneInput extends AbstractStreamOperator - implements OneInputStreamOperator, OutputTypeConfigurable { + implements OneInputStreamOperator, OutputTypeConfigurable { TypeInformation tpeInformation; @@ -230,8 +298,8 @@ public void processWatermark(Watermark mark) throws Exception { } @Override - public void setOutputType(TypeInformation outTypeInfo, ExecutionConfig executionConfig) { - tpeInformation = (TypeInformation)outTypeInfo; + public void setOutputType(TypeInformation outTypeInfo, ExecutionConfig executionConfig) { + tpeInformation = outTypeInfo; } }