From 171bfd115572b3d11962c28b48db3f2ceefd8280 Mon Sep 17 00:00:00 2001 From: Johannes Reifferscheid Date: Wed, 17 Jun 2015 15:35:37 +0200 Subject: [PATCH 1/4] [FLINK-2124] [streaming] Fix behavior of FromElementsFunction when T isn't Serializable rewrite FromElementsFunction to work with arbitrary types. The elements are serialized to a byte array (using a TypeSerializer) when the FromElementsFunction is constructed and deserialized when run is called. --- .../StreamExecutionEnvironment.java | 6 +- .../source/FromElementsFunction.java | 60 +++++++++++++++---- .../streaming/api/SourceFunctionTest.java | 18 ++++-- 3 files changed, 62 insertions(+), 22 deletions(-) diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index 5e7be8dfcb553..f98a9f09c6e6e 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -473,7 +473,7 @@ public DataStreamSource fromElements(OUT... data) { TypeInformation typeInfo = TypeExtractor.getForObject(data[0]); - SourceFunction function = new FromElementsFunction(data); + SourceFunction function = new FromElementsFunction(typeInfo.createSerializer(getConfig()), data); return addSource(function, "Elements source").returns(typeInfo); } @@ -504,7 +504,7 @@ public DataStreamSource fromCollection(Collection data) { } TypeInformation typeInfo = TypeExtractor.getForObject(data.iterator().next()); - SourceFunction function = new FromElementsFunction(data); + SourceFunction function = new FromElementsFunction(typeInfo.createSerializer(getConfig()), data); checkCollection(data, typeInfo.getTypeClass()); return addSource(function, "Collection Source").returns(typeInfo); @@ -529,7 +529,7 @@ public DataStreamSource fromCollection(Collection data, TypeInfo throw new IllegalArgumentException("Collection must not be empty"); } - SourceFunction function = new FromElementsFunction(data); + SourceFunction function = new FromElementsFunction(typeInfo.createSerializer(getConfig()), data); checkCollection(data, typeInfo.getTypeClass()); return addSource(function, "Collection Source").returns(typeInfo); diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java index 736cc73de226b..67c82b266d8b6 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java @@ -17,37 +17,71 @@ package org.apache.flink.streaming.api.functions.source; -import java.util.Arrays; -import java.util.Collection; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.InputViewDataInputStreamWrapper; +import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper; + +import java.io.*; import java.util.Iterator; public class FromElementsFunction implements SourceFunction { private static final long serialVersionUID = 1L; - private Iterable iterable; + private final TypeSerializer serializer; + private final byte[] elements; private volatile boolean isRunning = true; - public FromElementsFunction(T... elements) { - this.iterable = Arrays.asList(elements); - } + public FromElementsFunction(TypeSerializer serializer, final T... elements) { + this(serializer, new Iterable() { + @Override + public Iterator iterator() { + return new Iterator() { + int index = 0; + + @Override + public boolean hasNext() { + return index < elements.length; + } - public FromElementsFunction(Collection elements) { - this.iterable = elements; + @Override + public T next() { + return elements[index++]; + } + }; + } + }); } - public FromElementsFunction(Iterable elements) { - this.iterable = elements; + public FromElementsFunction(TypeSerializer serializer, Iterable elements) { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + OutputViewDataOutputStreamWrapper wrapper = new OutputViewDataOutputStreamWrapper(new DataOutputStream(baos)); + + try { + for (T element : elements) + serializer.serialize(element, wrapper); + } catch (IOException e) { + // ByteArrayOutputStream doesn't throw IOExceptions when written to + } + // closing the DataOutputStream would just flush the ByteArrayOutputStream, which in turn doesn't do anything. + + this.serializer = serializer; + this.elements = baos.toByteArray(); } @Override public void run(SourceContext ctx) throws Exception { - Iterator it = iterable.iterator(); + T value = serializer.createInstance(); + ByteArrayInputStream bais = new ByteArrayInputStream(elements); + DataInputView input = new InputViewDataInputStreamWrapper(new DataInputStream(bais)); - while (isRunning && it.hasNext()) { - ctx.collect(it.next()); + while (isRunning && bais.available() > 0) { + value = serializer.deserialize(value, input); + ctx.collect(value); } + // closing the DataOutputStream would just close the ByteArrayInputStream, which doesn't do anything } @Override diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceFunctionTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceFunctionTest.java index 7a78205ea606b..f0fe63d91ae60 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceFunctionTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceFunctionTest.java @@ -23,6 +23,8 @@ import java.util.Arrays; import java.util.List; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.runtime.testutils.CommonTestUtils; import org.apache.flink.streaming.api.functions.source.FromElementsFunction; import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource; import org.apache.flink.streaming.util.SourceFunctionUtil; @@ -33,18 +35,22 @@ public class SourceFunctionTest { @Test public void fromElementsTest() throws Exception { List expectedList = Arrays.asList(1, 2, 3); - List actualList = SourceFunctionUtil.runSourceFunction(new FromElementsFunction( - 1, - 2, - 3)); + List actualList = SourceFunctionUtil.runSourceFunction(CommonTestUtils.createCopySerializable( + new FromElementsFunction( + IntSerializer.INSTANCE, + 1, + 2, + 3))); assertEquals(expectedList, actualList); } @Test public void fromCollectionTest() throws Exception { List expectedList = Arrays.asList(1, 2, 3); - List actualList = SourceFunctionUtil.runSourceFunction(new FromElementsFunction( - Arrays.asList(1, 2, 3))); + List actualList = SourceFunctionUtil.runSourceFunction( + CommonTestUtils.createCopySerializable(new FromElementsFunction( + IntSerializer.INSTANCE, + Arrays.asList(1, 2, 3)))); assertEquals(expectedList, actualList); } From dea7a8b37b4b485055be3a880cb12c3dff37b106 Mon Sep 17 00:00:00 2001 From: Johannes Reifferscheid Date: Thu, 18 Jun 2015 10:55:29 +0200 Subject: [PATCH 2/4] Fix typo, code style issues --- .../api/functions/source/FromElementsFunction.java | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java index 67c82b266d8b6..7dc18b085cd4c 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java @@ -22,7 +22,11 @@ import org.apache.flink.core.memory.InputViewDataInputStreamWrapper; import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper; -import java.io.*; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; import java.util.Iterator; public class FromElementsFunction implements SourceFunction { @@ -60,8 +64,9 @@ public FromElementsFunction(TypeSerializer serializer, Iterable elements) OutputViewDataOutputStreamWrapper wrapper = new OutputViewDataOutputStreamWrapper(new DataOutputStream(baos)); try { - for (T element : elements) + for (T element : elements) { serializer.serialize(element, wrapper); + } } catch (IOException e) { // ByteArrayOutputStream doesn't throw IOExceptions when written to } @@ -81,7 +86,7 @@ public void run(SourceContext ctx) throws Exception { value = serializer.deserialize(value, input); ctx.collect(value); } - // closing the DataOutputStream would just close the ByteArrayInputStream, which doesn't do anything + // closing the DataInputStream would just close the ByteArrayInputStream, which doesn't do anything } @Override From 2d76c590c66755c38aeb91d9b0cf70dc1ba63dec Mon Sep 17 00:00:00 2001 From: Johannes Reifferscheid Date: Thu, 18 Jun 2015 12:12:18 +0200 Subject: [PATCH 3/4] Fix java 6,7 compatibility --- .../streaming/api/functions/source/FromElementsFunction.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java index 7dc18b085cd4c..63eb0ad66ef7f 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java @@ -54,6 +54,11 @@ public boolean hasNext() { public T next() { return elements[index++]; } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } }; } }); From 6514bc6ed7289f286752819ac31970deebf2a63d Mon Sep 17 00:00:00 2001 From: Johannes Reifferscheid Date: Thu, 18 Jun 2015 12:18:46 +0200 Subject: [PATCH 4/4] Fix FromElementsFunction usage in StreamExecutionEnvironment.scala --- .../streaming/api/scala/StreamExecutionEnvironment.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala index 968e07f9942e0..7215a4db14d2f 100644 --- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala @@ -284,8 +284,8 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { require(data != null, "Data must not be null.") val typeInfo = implicitly[TypeInformation[T]] - val sourceFunction = new FromElementsFunction[T](scala.collection.JavaConversions - .asJavaCollection(data)) + val sourceFunction = new FromElementsFunction[T](typeInfo.createSerializer(getConfig), + scala.collection.JavaConversions.asJavaCollection(data)) javaEnv.addSource(sourceFunction).returns(typeInfo) }