Skip to content

Commit

Permalink
[FLINK-2124] [streaming] Fix behavior of FromElementsFunction when T …
Browse files Browse the repository at this point in the history
…isn't Serializable

This change rewrites the FromElementsFunction to work with arbitrary types. The elements
are serialized to a byte array (using a TypeSerializer) when the FromElementsFunction is
constructed and deserialized when run is called.

This closes #848
  • Loading branch information
jreiffers authored and StephanEwen committed Jul 8, 2015
1 parent 0c49891 commit e4a030c
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 24 deletions.
Expand Up @@ -473,7 +473,7 @@ public <OUT> DataStreamSource<OUT> fromElements(OUT... data) {


TypeInformation<OUT> typeInfo = TypeExtractor.getForObject(data[0]); TypeInformation<OUT> typeInfo = TypeExtractor.getForObject(data[0]);


SourceFunction<OUT> function = new FromElementsFunction<OUT>(data); SourceFunction<OUT> function = new FromElementsFunction<OUT>(typeInfo.createSerializer(getConfig()), data);


return addSource(function, "Elements source").returns(typeInfo); return addSource(function, "Elements source").returns(typeInfo);
} }
Expand Down Expand Up @@ -504,7 +504,7 @@ public <OUT> DataStreamSource<OUT> fromCollection(Collection<OUT> data) {
} }


TypeInformation<OUT> typeInfo = TypeExtractor.getForObject(data.iterator().next()); TypeInformation<OUT> typeInfo = TypeExtractor.getForObject(data.iterator().next());
SourceFunction<OUT> function = new FromElementsFunction<OUT>(data); SourceFunction<OUT> function = new FromElementsFunction<OUT>(typeInfo.createSerializer(getConfig()), data);
checkCollection(data, typeInfo.getTypeClass()); checkCollection(data, typeInfo.getTypeClass());


return addSource(function, "Collection Source").returns(typeInfo); return addSource(function, "Collection Source").returns(typeInfo);
Expand All @@ -529,7 +529,7 @@ public <OUT> DataStreamSource<OUT> fromCollection(Collection<OUT> data, TypeInfo
throw new IllegalArgumentException("Collection must not be empty"); throw new IllegalArgumentException("Collection must not be empty");
} }


SourceFunction<OUT> function = new FromElementsFunction<OUT>(data); SourceFunction<OUT> function = new FromElementsFunction<OUT>(typeInfo.createSerializer(getConfig()), data);
checkCollection(data, typeInfo.getTypeClass()); checkCollection(data, typeInfo.getTypeClass());


return addSource(function, "Collection Source").returns(typeInfo); return addSource(function, "Collection Source").returns(typeInfo);
Expand Down
Expand Up @@ -17,37 +17,81 @@


package org.apache.flink.streaming.api.functions.source; package org.apache.flink.streaming.api.functions.source;


import java.util.Arrays; import org.apache.flink.api.common.typeutils.TypeSerializer;
import java.util.Collection; import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Iterator; import java.util.Iterator;


public class FromElementsFunction<T> implements SourceFunction<T> { public class FromElementsFunction<T> implements SourceFunction<T> {


private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;


private Iterable<T> iterable; private final TypeSerializer<T> serializer;
private final byte[] elements;


private volatile boolean isRunning = true; private volatile boolean isRunning = true;


public FromElementsFunction(T... elements) { public FromElementsFunction(TypeSerializer<T> serializer, final T... elements) {
this.iterable = Arrays.asList(elements); this(serializer, new Iterable<T>() {
} @Override
public Iterator<T> iterator() {
return new Iterator<T>() {
int index = 0;

@Override
public boolean hasNext() {
return index < elements.length;
}

@Override
public T next() {
return elements[index++];
}


public FromElementsFunction(Collection<T> elements) { @Override
this.iterable = elements; public void remove() {
throw new UnsupportedOperationException();
}
};
}
});
} }


public FromElementsFunction(Iterable<T> elements) { public FromElementsFunction(TypeSerializer<T> serializer, Iterable<T> elements) {
this.iterable = elements; ByteArrayOutputStream baos = new ByteArrayOutputStream();
OutputViewDataOutputStreamWrapper wrapper = new OutputViewDataOutputStreamWrapper(new DataOutputStream(baos));

try {
for (T element : elements) {
serializer.serialize(element, wrapper);
}
} catch (IOException e) {
// ByteArrayOutputStream doesn't throw IOExceptions when written to
}
// closing the DataOutputStream would just flush the ByteArrayOutputStream, which in turn doesn't do anything.

this.serializer = serializer;
this.elements = baos.toByteArray();
} }


@Override @Override
public void run(SourceContext<T> ctx) throws Exception { public void run(SourceContext<T> ctx) throws Exception {
Iterator<T> it = iterable.iterator(); T value = serializer.createInstance();
ByteArrayInputStream bais = new ByteArrayInputStream(elements);
DataInputView input = new InputViewDataInputStreamWrapper(new DataInputStream(bais));


while (isRunning && it.hasNext()) { while (isRunning && bais.available() > 0) {
ctx.collect(it.next()); value = serializer.deserialize(value, input);
ctx.collect(value);
} }
// closing the DataInputStream would just close the ByteArrayInputStream, which doesn't do anything
} }


@Override @Override
Expand Down
Expand Up @@ -23,6 +23,8 @@
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;


import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.streaming.api.functions.source.FromElementsFunction; import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource; import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource;
import org.apache.flink.streaming.util.SourceFunctionUtil; import org.apache.flink.streaming.util.SourceFunctionUtil;
Expand All @@ -33,18 +35,22 @@ public class SourceFunctionTest {
@Test @Test
public void fromElementsTest() throws Exception { public void fromElementsTest() throws Exception {
List<Integer> expectedList = Arrays.asList(1, 2, 3); List<Integer> expectedList = Arrays.asList(1, 2, 3);
List<Integer> actualList = SourceFunctionUtil.runSourceFunction(new FromElementsFunction<Integer>( List<Integer> actualList = SourceFunctionUtil.runSourceFunction(CommonTestUtils.createCopySerializable(
1, new FromElementsFunction<Integer>(
2, IntSerializer.INSTANCE,
3)); 1,
2,
3)));
assertEquals(expectedList, actualList); assertEquals(expectedList, actualList);
} }


@Test @Test
public void fromCollectionTest() throws Exception { public void fromCollectionTest() throws Exception {
List<Integer> expectedList = Arrays.asList(1, 2, 3); List<Integer> expectedList = Arrays.asList(1, 2, 3);
List<Integer> actualList = SourceFunctionUtil.runSourceFunction(new FromElementsFunction<Integer>( List<Integer> actualList = SourceFunctionUtil.runSourceFunction(
Arrays.asList(1, 2, 3))); CommonTestUtils.createCopySerializable(new FromElementsFunction<Integer>(
IntSerializer.INSTANCE,
Arrays.asList(1, 2, 3))));
assertEquals(expectedList, actualList); assertEquals(expectedList, actualList);
} }


Expand Down
Expand Up @@ -284,8 +284,8 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
require(data != null, "Data must not be null.") require(data != null, "Data must not be null.")
val typeInfo = implicitly[TypeInformation[T]] val typeInfo = implicitly[TypeInformation[T]]


val sourceFunction = new FromElementsFunction[T](scala.collection.JavaConversions val sourceFunction = new FromElementsFunction[T](typeInfo.createSerializer(getConfig),
.asJavaCollection(data)) scala.collection.JavaConversions.asJavaCollection(data))


javaEnv.addSource(sourceFunction).returns(typeInfo) javaEnv.addSource(sourceFunction).returns(typeInfo)
} }
Expand Down

0 comments on commit e4a030c

Please sign in to comment.