Skip to content

Commit

Permalink
[FLINK-2124] [streaming] Handle 'fromElements()' for non-serializable…
Browse files Browse the repository at this point in the history
… elements. Cleanup code, and add proper tests.
  • Loading branch information
StephanEwen committed Jul 8, 2015
1 parent e4a030c commit 28713a2
Show file tree
Hide file tree
Showing 7 changed files with 442 additions and 162 deletions.
Expand Up @@ -65,6 +65,8 @@
import org.apache.flink.util.SplittableIterator;

import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -432,7 +434,7 @@ public void registerType(Class<?> type) {
/**
* Creates a new data stream that contains a sequence of numbers. This is a parallel source,
* if you manually set the parallelism to {@code 1}
* (using {@link org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator.setParallelism()})
* (using {@link org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator#setParallelism(int)})
* the generated sequence of elements is in order.
*
* @param from
Expand Down Expand Up @@ -467,52 +469,67 @@ public DataStreamSource<Long> generateSequence(long from, long to) {
*/
public <OUT> DataStreamSource<OUT> fromElements(OUT... data) {
if (data.length == 0) {
throw new IllegalArgumentException(
"fromElements needs at least one element as argument");
throw new IllegalArgumentException("fromElements needs at least one element as argument");
}

TypeInformation<OUT> typeInfo = TypeExtractor.getForObject(data[0]);

SourceFunction<OUT> function = new FromElementsFunction<OUT>(typeInfo.createSerializer(getConfig()), data);

return addSource(function, "Elements source").returns(typeInfo);
TypeInformation<OUT> typeInfo;
try {
typeInfo = TypeExtractor.getForObject(data[0]);
}
catch (Exception e) {
throw new RuntimeException("Could not create TypeInformation for type " + data[0].getClass().getName()
+ "; please specify the TypeInformation manually via "
+ "StreamExecutionEnvironment#fromElements(Collection, TypeInformation)");
}
return fromCollection(Arrays.asList(data), typeInfo);
}

/**
* Creates a data stream from the given non-empty collection. The type of the data stream is that of the
* elements in the collection.
*
* <p>
* The framework will try and determine the exact type from the collection elements. In case of generic
* <p>The framework will try and determine the exact type from the collection elements. In case of generic
* elements, it may be necessary to manually supply the type information via
* {@link #fromCollection(java.util.Collection, org.apache.flink.api.common.typeinfo.TypeInformation)}.
* <p>
* {@link #fromCollection(java.util.Collection, org.apache.flink.api.common.typeinfo.TypeInformation)}.</p>
*
* Note that this operation will result in a non-parallel data stream source, i.e. a data stream source with a
* degree of parallelism one.
* <p>Note that this operation will result in a non-parallel data stream source, i.e. a data stream source with a
* parallelism one.</p>
*
* @param data
* The collection of elements to create the data stream from
* The collection of elements to create the data stream from.
* @param <OUT>
* The type of the returned data stream
* @return The data stream representing the given collection
* The generic type of the returned data stream.
* @return
* The data stream representing the given collection
*/
public <OUT> DataStreamSource<OUT> fromCollection(Collection<OUT> data) {
Preconditions.checkNotNull(data, "Collection must not be null");
if (data.isEmpty()) {
throw new IllegalArgumentException("Collection must not be empty");
}

TypeInformation<OUT> typeInfo = TypeExtractor.getForObject(data.iterator().next());
SourceFunction<OUT> function = new FromElementsFunction<OUT>(typeInfo.createSerializer(getConfig()), data);
checkCollection(data, typeInfo.getTypeClass());
OUT first = data.iterator().next();
if (first == null) {
throw new IllegalArgumentException("Collection must not contain null elements");
}

return addSource(function, "Collection Source").returns(typeInfo);
TypeInformation<OUT> typeInfo;
try {
typeInfo = TypeExtractor.getForObject(first);
}
catch (Exception e) {
throw new RuntimeException("Could not create TypeInformation for type " + first.getClass()
+ "; please specify the TypeInformation manually via "
+ "StreamExecutionEnvironment#fromElements(Collection, TypeInformation)");
}
return fromCollection(data, typeInfo);
}

/**
* Creates a data stream from the given non-empty collection.Note that this operation will result in
* a non-parallel data stream source, i.e. a data stream source with a degree of parallelism one.
* Creates a data stream from the given non-empty collection.
*
* <p>Note that this operation will result in a non-parallel data stream source,
* i.e., a data stream source with a parallelism one.</p>
*
* @param data
* The collection of elements to create the data stream from
Expand All @@ -522,26 +539,31 @@ public <OUT> DataStreamSource<OUT> fromCollection(Collection<OUT> data) {
* The type of the returned data stream
* @return The data stream representing the given collection
*/
public <OUT> DataStreamSource<OUT> fromCollection(Collection<OUT> data, TypeInformation<OUT>
typeInfo) {
public <OUT> DataStreamSource<OUT> fromCollection(Collection<OUT> data, TypeInformation<OUT> typeInfo) {
Preconditions.checkNotNull(data, "Collection must not be null");
if (data.isEmpty()) {
throw new IllegalArgumentException("Collection must not be empty");

// must not have null elements and mixed elements
FromElementsFunction.checkCollection(data, typeInfo.getTypeClass());

SourceFunction<OUT> function;
try {
function = new FromElementsFunction<OUT>(typeInfo.createSerializer(getConfig()), data);
}

SourceFunction<OUT> function = new FromElementsFunction<OUT>(typeInfo.createSerializer(getConfig()), data);
checkCollection(data, typeInfo.getTypeClass());

return addSource(function, "Collection Source").returns(typeInfo);
catch (IOException e) {
throw new RuntimeException(e.getMessage(), e);
}
return addSource(function, "Collection Source", typeInfo);
}

/**
* Creates a data stream from the given iterator. Because the iterator will remain unmodified until the actual
* execution happens, the type of data returned by the iterator must be given explicitly in the form of the type
* class (this is due to the fact that the Java compiler erases the generic type information).
* <p>
* Note that this operation will result in a non-parallel data stream source, i.e. a data stream source with a
* degree of parallelism of one.
* Creates a data stream from the given iterator.
*
* <p>Because the iterator will remain unmodified until the actual execution happens,
* the type of data returned by the iterator must be given explicitly in the form of the type
* class (this is due to the fact that the Java compiler erases the generic type information).</p>
*
* <p>Note that this operation will result in a non-parallel data stream source, i.e.,
* a data stream source with a parallelism of one.</p>
*
* @param data
* The iterator of elements to create the data stream from
Expand All @@ -557,14 +579,16 @@ public <OUT> DataStreamSource<OUT> fromCollection(Iterator<OUT> data, Class<OUT>
}

/**
* Creates a data stream from the given iterator. Because the iterator will remain unmodified until the actual
* execution happens, the type of data returned by the iterator must be given explicitly in the form of the type
* information. This method is useful for cases where the type is generic. In that case, the type class (as
* given in
* {@link #fromCollection(java.util.Iterator, Class)} does not supply all type information.
* <p>
* Note that this operation will result in a non-parallel data stream source, i.e. a data stream source with a
* degree of parallelism one.
* Creates a data stream from the given iterator.
*
* <p>Because the iterator will remain unmodified until the actual execution happens,
* the type of data returned by the iterator must be given explicitly in the form of the type
* information. This method is useful for cases where the type is generic.
* In that case, the type class (as given in
* {@link #fromCollection(java.util.Iterator, Class)} does not supply all type information.</p>
*
* <p>Note that this operation will result in a non-parallel data stream source, i.e.,
* a data stream source with a parallelism one.</p>
*
* @param data
* The iterator of elements to create the data stream from
Expand All @@ -574,27 +598,20 @@ public <OUT> DataStreamSource<OUT> fromCollection(Iterator<OUT> data, Class<OUT>
* The type of the returned data stream
* @return The data stream representing the elements in the iterator
*/
public <OUT> DataStreamSource<OUT> fromCollection(Iterator<OUT> data, TypeInformation<OUT>
typeInfo) {
public <OUT> DataStreamSource<OUT> fromCollection(Iterator<OUT> data, TypeInformation<OUT> typeInfo) {
Preconditions.checkNotNull(data, "The iterator must not be null");

SourceFunction<OUT> function = new FromIteratorFunction<OUT>(data);
return addSource(function, "Collection Source").returns(typeInfo);
}

// private helper for passing different names
private <OUT> DataStreamSource<OUT> fromCollection(Iterator<OUT> iterator, TypeInformation<OUT>
typeInfo, String operatorName) {
return addSource(new FromIteratorFunction<OUT>(iterator), operatorName).returns(typeInfo);
return addSource(function, "Collection Source", typeInfo);
}

/**
* Creates a new data stream that contains elements in the iterator. The iterator is splittable, allowing the
* framework to create a parallel data stream source that returns the elements in the iterator.
* <p>
* Because the iterator will remain unmodified until the actual execution happens, the type of data returned by the
*
* <p>Because the iterator will remain unmodified until the actual execution happens, the type of data returned by the
* iterator must be given explicitly in the form of the type class (this is due to the fact that the Java compiler
* erases the generic type information).
* erases the generic type information).</p>
*
* @param iterator
* The iterator that produces the elements of the data stream
Expand Down Expand Up @@ -1194,19 +1211,6 @@ protected static void initializeFromFactory(StreamExecutionEnvironmentFactory ee
currentEnvironment = eef.createExecutionEnvironment();
}

private static <OUT> void checkCollection(Collection<OUT> elements, Class<OUT> viewedAs) {
Preconditions.checkNotNull(viewedAs);

for (OUT elem : elements) {
Preconditions.checkNotNull(elem, "The collection must not contain null elements.");

if (!viewedAs.isAssignableFrom(elem.getClass())) {
throw new IllegalArgumentException("The elements in the collection are not all subclasses of " +
viewedAs.getCanonicalName());
}
}
}

/**
* Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning
* is not disabled in the {@link org.apache.flink.api.common.ExecutionConfig}
Expand Down
Expand Up @@ -27,75 +27,108 @@
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Iterator;

import java.util.Arrays;
import java.util.Collection;

/**
* A stream source function that returns a sequence of elements.
*
* <p>Upon construction, this source function serializes the elements using Flink's type information.
* That way, any object transport using Java serialization will not be affected by the serializability
* if the elements.</p>
*
* @param <T> The type of elements returned by this function.
*/
public class FromElementsFunction<T> implements SourceFunction<T> {

private static final long serialVersionUID = 1L;

/** The (de)serializer to be used for the data elements */
private final TypeSerializer<T> serializer;
private final byte[] elements;

/** The actual data elements, in serialized form */
private final byte[] elementsSerialized;

/** The number of serialized elements */
private final int numElements;

/** Flag to make the source cancelable */
private volatile boolean isRunning = true;

public FromElementsFunction(TypeSerializer<T> serializer, final T... elements) {
this(serializer, new Iterable<T>() {
@Override
public Iterator<T> iterator() {
return new Iterator<T>() {
int index = 0;

@Override
public boolean hasNext() {
return index < elements.length;
}

@Override
public T next() {
return elements[index++];
}

@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
});

public FromElementsFunction(TypeSerializer<T> serializer, T... elements) throws IOException {
this(serializer, Arrays.asList(elements));
}

public FromElementsFunction(TypeSerializer<T> serializer, Iterable<T> elements) {
public FromElementsFunction(TypeSerializer<T> serializer, Iterable<T> elements) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
OutputViewDataOutputStreamWrapper wrapper = new OutputViewDataOutputStreamWrapper(new DataOutputStream(baos));

int count = 0;
try {
for (T element : elements) {
serializer.serialize(element, wrapper);
count++;
}
} catch (IOException e) {
// ByteArrayOutputStream doesn't throw IOExceptions when written to
}
// closing the DataOutputStream would just flush the ByteArrayOutputStream, which in turn doesn't do anything.
catch (Exception e) {
throw new IOException("Serializing the source elements failed: " + e.getMessage(), e);
}

this.serializer = serializer;
this.elements = baos.toByteArray();
this.elementsSerialized = baos.toByteArray();
this.numElements = count;
}

@Override
public void run(SourceContext<T> ctx) throws Exception {
T value = serializer.createInstance();
ByteArrayInputStream bais = new ByteArrayInputStream(elements);
ByteArrayInputStream bais = new ByteArrayInputStream(elementsSerialized);
DataInputView input = new InputViewDataInputStreamWrapper(new DataInputStream(bais));

while (isRunning && bais.available() > 0) {
value = serializer.deserialize(value, input);
ctx.collect(value);
int numEmitted = 0;
while (isRunning && numEmitted++ < numElements) {
T next;
try {
next = serializer.deserialize(input);
}
catch (Exception e) {
throw new IOException("Failed to deserialize an element from the source. " +
"If you are using user-defined serialization (Value and Writable types), check the " +
"serialization functions.\nSerializer is " + serializer);
}

ctx.collect(next);
}
// closing the DataInputStream would just close the ByteArrayInputStream, which doesn't do anything
}

@Override
public void cancel() {
isRunning = false;
}

// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------

/**
* Verifies that all elements in the collection are non-null, and are of the given class, or
* a subclass thereof.
*
* @param elements The collection to check.
* @param viewedAs The class to which the elements must be assignable to.
*
* @param <OUT> The generic type of the collection to be checked.
*/
public static <OUT> void checkCollection(Collection<OUT> elements, Class<OUT> viewedAs) {
for (OUT elem : elements) {
if (elem == null) {
throw new IllegalArgumentException("The collection contains a null element");
}

if (!viewedAs.isAssignableFrom(elem.getClass())) {
throw new IllegalArgumentException("The elements in the collection are not all subclasses of " +
viewedAs.getCanonicalName());
}
}
}
}

0 comments on commit 28713a2

Please sign in to comment.