diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index f98a9f09c6e6e..58348e38d8e95 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -65,6 +65,8 @@ import org.apache.flink.util.SplittableIterator; import java.io.File; +import java.io.IOException; +import java.util.Arrays; import java.util.Collection; import java.util.Iterator; import java.util.List; @@ -432,7 +434,7 @@ public void registerType(Class type) { /** * Creates a new data stream that contains a sequence of numbers. This is a parallel source, * if you manually set the parallelism to {@code 1} - * (using {@link org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator.setParallelism()}) + * (using {@link org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator#setParallelism(int)}) * the generated sequence of elements is in order. * * @param from @@ -467,35 +469,38 @@ public DataStreamSource generateSequence(long from, long to) { */ public DataStreamSource fromElements(OUT... data) { if (data.length == 0) { - throw new IllegalArgumentException( - "fromElements needs at least one element as argument"); + throw new IllegalArgumentException("fromElements needs at least one element as argument"); } - TypeInformation typeInfo = TypeExtractor.getForObject(data[0]); - - SourceFunction function = new FromElementsFunction(typeInfo.createSerializer(getConfig()), data); - - return addSource(function, "Elements source").returns(typeInfo); + TypeInformation typeInfo; + try { + typeInfo = TypeExtractor.getForObject(data[0]); + } + catch (Exception e) { + throw new RuntimeException("Could not create TypeInformation for type " + data[0].getClass().getName() + + "; please specify the TypeInformation manually via " + + "StreamExecutionEnvironment#fromElements(Collection, TypeInformation)"); + } + return fromCollection(Arrays.asList(data), typeInfo); } /** * Creates a data stream from the given non-empty collection. The type of the data stream is that of the * elements in the collection. * - *

- * The framework will try and determine the exact type from the collection elements. In case of generic + *

The framework will try and determine the exact type from the collection elements. In case of generic * elements, it may be necessary to manually supply the type information via - * {@link #fromCollection(java.util.Collection, org.apache.flink.api.common.typeinfo.TypeInformation)}. - *

+ * {@link #fromCollection(java.util.Collection, org.apache.flink.api.common.typeinfo.TypeInformation)}.

* - * Note that this operation will result in a non-parallel data stream source, i.e. a data stream source with a - * degree of parallelism one. + *

Note that this operation will result in a non-parallel data stream source, i.e. a data stream source with a + * parallelism one.

* * @param data - * The collection of elements to create the data stream from + * The collection of elements to create the data stream from. * @param - * The type of the returned data stream - * @return The data stream representing the given collection + * The generic type of the returned data stream. + * @return + * The data stream representing the given collection */ public DataStreamSource fromCollection(Collection data) { Preconditions.checkNotNull(data, "Collection must not be null"); @@ -503,16 +508,28 @@ public DataStreamSource fromCollection(Collection data) { throw new IllegalArgumentException("Collection must not be empty"); } - TypeInformation typeInfo = TypeExtractor.getForObject(data.iterator().next()); - SourceFunction function = new FromElementsFunction(typeInfo.createSerializer(getConfig()), data); - checkCollection(data, typeInfo.getTypeClass()); + OUT first = data.iterator().next(); + if (first == null) { + throw new IllegalArgumentException("Collection must not contain null elements"); + } - return addSource(function, "Collection Source").returns(typeInfo); + TypeInformation typeInfo; + try { + typeInfo = TypeExtractor.getForObject(first); + } + catch (Exception e) { + throw new RuntimeException("Could not create TypeInformation for type " + first.getClass() + + "; please specify the TypeInformation manually via " + + "StreamExecutionEnvironment#fromElements(Collection, TypeInformation)"); + } + return fromCollection(data, typeInfo); } /** - * Creates a data stream from the given non-empty collection.Note that this operation will result in - * a non-parallel data stream source, i.e. a data stream source with a degree of parallelism one. + * Creates a data stream from the given non-empty collection. + * + *

Note that this operation will result in a non-parallel data stream source, + * i.e., a data stream source with a parallelism one.

* * @param data * The collection of elements to create the data stream from @@ -522,26 +539,31 @@ public DataStreamSource fromCollection(Collection data) { * The type of the returned data stream * @return The data stream representing the given collection */ - public DataStreamSource fromCollection(Collection data, TypeInformation - typeInfo) { + public DataStreamSource fromCollection(Collection data, TypeInformation typeInfo) { Preconditions.checkNotNull(data, "Collection must not be null"); - if (data.isEmpty()) { - throw new IllegalArgumentException("Collection must not be empty"); + + // must not have null elements and mixed elements + FromElementsFunction.checkCollection(data, typeInfo.getTypeClass()); + + SourceFunction function; + try { + function = new FromElementsFunction(typeInfo.createSerializer(getConfig()), data); } - - SourceFunction function = new FromElementsFunction(typeInfo.createSerializer(getConfig()), data); - checkCollection(data, typeInfo.getTypeClass()); - - return addSource(function, "Collection Source").returns(typeInfo); + catch (IOException e) { + throw new RuntimeException(e.getMessage(), e); + } + return addSource(function, "Collection Source", typeInfo); } /** - * Creates a data stream from the given iterator. Because the iterator will remain unmodified until the actual - * execution happens, the type of data returned by the iterator must be given explicitly in the form of the type - * class (this is due to the fact that the Java compiler erases the generic type information). - *

- * Note that this operation will result in a non-parallel data stream source, i.e. a data stream source with a - * degree of parallelism of one. + * Creates a data stream from the given iterator. + * + *

Because the iterator will remain unmodified until the actual execution happens, + * the type of data returned by the iterator must be given explicitly in the form of the type + * class (this is due to the fact that the Java compiler erases the generic type information).

+ * + *

Note that this operation will result in a non-parallel data stream source, i.e., + * a data stream source with a parallelism of one.

* * @param data * The iterator of elements to create the data stream from @@ -557,14 +579,16 @@ public DataStreamSource fromCollection(Iterator data, Class } /** - * Creates a data stream from the given iterator. Because the iterator will remain unmodified until the actual - * execution happens, the type of data returned by the iterator must be given explicitly in the form of the type - * information. This method is useful for cases where the type is generic. In that case, the type class (as - * given in - * {@link #fromCollection(java.util.Iterator, Class)} does not supply all type information. - *

- * Note that this operation will result in a non-parallel data stream source, i.e. a data stream source with a - * degree of parallelism one. + * Creates a data stream from the given iterator. + * + *

Because the iterator will remain unmodified until the actual execution happens, + * the type of data returned by the iterator must be given explicitly in the form of the type + * information. This method is useful for cases where the type is generic. + * In that case, the type class (as given in + * {@link #fromCollection(java.util.Iterator, Class)} does not supply all type information.

+ * + *

Note that this operation will result in a non-parallel data stream source, i.e., + * a data stream source with a parallelism one.

* * @param data * The iterator of elements to create the data stream from @@ -574,27 +598,20 @@ public DataStreamSource fromCollection(Iterator data, Class * The type of the returned data stream * @return The data stream representing the elements in the iterator */ - public DataStreamSource fromCollection(Iterator data, TypeInformation - typeInfo) { + public DataStreamSource fromCollection(Iterator data, TypeInformation typeInfo) { Preconditions.checkNotNull(data, "The iterator must not be null"); SourceFunction function = new FromIteratorFunction(data); - return addSource(function, "Collection Source").returns(typeInfo); - } - - // private helper for passing different names - private DataStreamSource fromCollection(Iterator iterator, TypeInformation - typeInfo, String operatorName) { - return addSource(new FromIteratorFunction(iterator), operatorName).returns(typeInfo); + return addSource(function, "Collection Source", typeInfo); } /** * Creates a new data stream that contains elements in the iterator. The iterator is splittable, allowing the * framework to create a parallel data stream source that returns the elements in the iterator. - *

- * Because the iterator will remain unmodified until the actual execution happens, the type of data returned by the + * + *

Because the iterator will remain unmodified until the actual execution happens, the type of data returned by the * iterator must be given explicitly in the form of the type class (this is due to the fact that the Java compiler - * erases the generic type information). + * erases the generic type information).

* * @param iterator * The iterator that produces the elements of the data stream @@ -1194,19 +1211,6 @@ protected static void initializeFromFactory(StreamExecutionEnvironmentFactory ee currentEnvironment = eef.createExecutionEnvironment(); } - private static void checkCollection(Collection elements, Class viewedAs) { - Preconditions.checkNotNull(viewedAs); - - for (OUT elem : elements) { - Preconditions.checkNotNull(elem, "The collection must not contain null elements."); - - if (!viewedAs.isAssignableFrom(elem.getClass())) { - throw new IllegalArgumentException("The elements in the collection are not all subclasses of " + - viewedAs.getCanonicalName()); - } - } - } - /** * Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning * is not disabled in the {@link org.apache.flink.api.common.ExecutionConfig} diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java index 63eb0ad66ef7f..394fa7749ac33 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java @@ -27,75 +27,108 @@ import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; -import java.util.Iterator; - +import java.util.Arrays; +import java.util.Collection; + +/** + * A stream source function that returns a sequence of elements. + * + *

Upon construction, this source function serializes the elements using Flink's type information. + * That way, any object transport using Java serialization will not be affected by the serializability + * if the elements.

+ * + * @param The type of elements returned by this function. + */ public class FromElementsFunction implements SourceFunction { private static final long serialVersionUID = 1L; + /** The (de)serializer to be used for the data elements */ private final TypeSerializer serializer; - private final byte[] elements; + + /** The actual data elements, in serialized form */ + private final byte[] elementsSerialized; + + /** The number of serialized elements */ + private final int numElements; + /** Flag to make the source cancelable */ private volatile boolean isRunning = true; - public FromElementsFunction(TypeSerializer serializer, final T... elements) { - this(serializer, new Iterable() { - @Override - public Iterator iterator() { - return new Iterator() { - int index = 0; - - @Override - public boolean hasNext() { - return index < elements.length; - } - - @Override - public T next() { - return elements[index++]; - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - }; - } - }); + + public FromElementsFunction(TypeSerializer serializer, T... elements) throws IOException { + this(serializer, Arrays.asList(elements)); } - - public FromElementsFunction(TypeSerializer serializer, Iterable elements) { + + public FromElementsFunction(TypeSerializer serializer, Iterable elements) throws IOException { ByteArrayOutputStream baos = new ByteArrayOutputStream(); OutputViewDataOutputStreamWrapper wrapper = new OutputViewDataOutputStreamWrapper(new DataOutputStream(baos)); + int count = 0; try { for (T element : elements) { serializer.serialize(element, wrapper); + count++; } - } catch (IOException e) { - // ByteArrayOutputStream doesn't throw IOExceptions when written to } - // closing the DataOutputStream would just flush the ByteArrayOutputStream, which in turn doesn't do anything. + catch (Exception e) { + throw new IOException("Serializing the source elements failed: " + e.getMessage(), e); + } this.serializer = serializer; - this.elements = baos.toByteArray(); + this.elementsSerialized = baos.toByteArray(); + this.numElements = count; } @Override public void run(SourceContext ctx) throws Exception { - T value = serializer.createInstance(); - ByteArrayInputStream bais = new ByteArrayInputStream(elements); + ByteArrayInputStream bais = new ByteArrayInputStream(elementsSerialized); DataInputView input = new InputViewDataInputStreamWrapper(new DataInputStream(bais)); - while (isRunning && bais.available() > 0) { - value = serializer.deserialize(value, input); - ctx.collect(value); + int numEmitted = 0; + while (isRunning && numEmitted++ < numElements) { + T next; + try { + next = serializer.deserialize(input); + } + catch (Exception e) { + throw new IOException("Failed to deserialize an element from the source. " + + "If you are using user-defined serialization (Value and Writable types), check the " + + "serialization functions.\nSerializer is " + serializer); + } + + ctx.collect(next); } - // closing the DataInputStream would just close the ByteArrayInputStream, which doesn't do anything } @Override public void cancel() { isRunning = false; } + + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ + + /** + * Verifies that all elements in the collection are non-null, and are of the given class, or + * a subclass thereof. + * + * @param elements The collection to check. + * @param viewedAs The class to which the elements must be assignable to. + * + * @param The generic type of the collection to be checked. + */ + public static void checkCollection(Collection elements, Class viewedAs) { + for (OUT elem : elements) { + if (elem == null) { + throw new IllegalArgumentException("The collection contains a null element"); + } + + if (!viewedAs.isAssignableFrom(elem.getClass())) { + throw new IllegalArgumentException("The elements in the collection are not all subclasses of " + + viewedAs.getCanonicalName()); + } + } + } } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java index 921a33bceae76..58ee1daebf1dc 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java @@ -45,11 +45,10 @@ * {@code * public class ExampleSource implements SourceFunction, Checkpointed { * private long count = 0L; - * private volatile boolean isRunning; + * private volatile boolean isRunning = true; * * @Override * public void run(SourceContext ctx) { - * isRunning = true; * while (isRunning && count < 1000) { * synchronized (ctx.getCheckpointLock()) { * ctx.collect(count); @@ -104,16 +103,20 @@ public interface SourceFunction extends Function, Serializable { * * @param The type of the elements produced by the source. */ - interface SourceContext { + public static interface SourceContext { /** * Emits one element from the source. + * + * @param element The element to emit. */ void collect(T element); /** * Returns the checkpoint lock. Please refer to the explanation about checkpointed sources * in {@link org.apache.flink.streaming.api.functions.source.SourceFunction}. + * + * @return The object to use the lock. */ Object getCheckpointLock(); } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java index 7373276d21061..e2fe599129cf5 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java @@ -24,14 +24,14 @@ import java.util.Arrays; import java.util.Iterator; import java.util.List; +import java.util.NoSuchElementException; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.FromElementsFunction; -import org.apache.flink.streaming.api.functions.source.FromSplittableIteratorFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource; import org.apache.flink.streaming.api.graph.StreamGraph; @@ -49,27 +49,33 @@ public class StreamExecutionEnvironmentTest { @Test @SuppressWarnings("unchecked") public void testFromCollectionParallelism() { - TypeInformation typeInfo = TypeExtractor.getForClass(Object.class); - StreamExecutionEnvironment env = new TestStreamEnvironment(PARALLELISM, MEMORYSIZE); - boolean seenExpectedException = false; - try { - DataStream dataStream1 = env.fromCollection(new DummySplittableIterator(), typeInfo) - .setParallelism(4); - } catch (IllegalArgumentException e) { - seenExpectedException = true; + TypeInformation typeInfo = BasicTypeInfo.INT_TYPE_INFO; + StreamExecutionEnvironment env = new TestStreamEnvironment(PARALLELISM, MEMORYSIZE); + + DataStreamSource dataStream1 = env.fromCollection(new DummySplittableIterator(), typeInfo); + + try { + dataStream1.setParallelism(4); + fail("should throw an exception"); + } + catch (IllegalArgumentException e) { + // expected + } + + env.fromParallelCollection(new DummySplittableIterator(), typeInfo).setParallelism(4); + + String plan = env.getExecutionPlan(); + + assertTrue("Parallelism for dataStream1 is not right.", + plan.contains("\"contents\":\"Collection Source\",\"parallelism\":1")); + assertTrue("Parallelism for dataStream2 is not right.", + plan.contains("\"contents\":\"Parallel Collection Source\",\"parallelism\":4")); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); } - - DataStream dataStream2 = env.fromParallelCollection(new DummySplittableIterator(), typeInfo) - .setParallelism(4); - - String plan = env.getExecutionPlan(); - - assertTrue("Expected Exception for setting parallelism was not thrown.", seenExpectedException); - assertTrue("Parallelism for dataStream1 is not right.", - plan.contains("\"contents\":\"Collection Source\",\"parallelism\":1")); - assertTrue("Parallelism for dataStream2 is not right.", - plan.contains("\"contents\":\"Parallel Collection Source\",\"parallelism\":4")); } @Test @@ -119,12 +125,13 @@ private static SourceFunction getFunctionFromDataSource(DataStreamSource< return (SourceFunction) operator.getUserFunction(); } - public static class DummySplittableIterator extends SplittableIterator { + public static class DummySplittableIterator extends SplittableIterator { private static final long serialVersionUID = 1312752876092210499L; + @SuppressWarnings("unchecked") @Override - public Iterator[] split(int numPartitions) { - return new Iterator[0]; + public Iterator[] split(int numPartitions) { + return (Iterator[]) new Iterator[0]; } @Override @@ -138,8 +145,8 @@ public boolean hasNext() { } @Override - public Object next() { - return null; + public T next() { + throw new NoSuchElementException(); } @Override diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/FromElementsFunctionTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/FromElementsFunctionTest.java new file mode 100644 index 0000000000000..db91b33607df4 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/FromElementsFunctionTest.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.api.java.typeutils.ValueTypeInfo; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.streaming.api.functions.source.FromElementsFunction; +import org.apache.flink.types.Value; +import org.apache.flink.util.ExceptionUtils; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.*; + +/** + * Tests for the {@link org.apache.flink.streaming.api.functions.source.FromElementsFunction}. + */ +public class FromElementsFunctionTest { + + @Test + public void testStrings() { + try { + String[] data = { "Oh", "boy", "what", "a", "show", "!"}; + + FromElementsFunction source = new FromElementsFunction( + BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), data); + + List result = new ArrayList(); + source.run(new ListSourceContext(result)); + + assertEquals(Arrays.asList(data), result); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testNonJavaSerializableType() { + try { + MyPojo[] data = { new MyPojo(1, 2), new MyPojo(3, 4), new MyPojo(5, 6) }; + + FromElementsFunction source = new FromElementsFunction( + TypeExtractor.getForClass(MyPojo.class).createSerializer(new ExecutionConfig()), data); + + List result = new ArrayList(); + source.run(new ListSourceContext(result)); + + assertEquals(Arrays.asList(data), result); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testSerializationError() { + try { + TypeInformation info = + new ValueTypeInfo(SerializationErrorType.class); + + try { + new FromElementsFunction( + info.createSerializer(new ExecutionConfig()), new SerializationErrorType()); + + fail("should fail with an exception"); + } + catch (IOException e) { + assertTrue(ExceptionUtils.stringifyException(e).contains("test exception")); + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testDeSerializationError() { + try { + TypeInformation info = + new ValueTypeInfo(DeserializeTooMuchType.class); + + FromElementsFunction source = new FromElementsFunction( + info.createSerializer(new ExecutionConfig()), new DeserializeTooMuchType()); + + try { + source.run(new ListSourceContext(new ArrayList())); + fail("should fail with an exception"); + } + catch (IOException e) { + assertTrue(ExceptionUtils.stringifyException(e).contains("user-defined serialization")); + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + + // ------------------------------------------------------------------------ + // Test Types + // ------------------------------------------------------------------------ + + public static class MyPojo { + + public long val1; + public int val2; + + public MyPojo() {} + + public MyPojo(long val1, int val2) { + this.val1 = val1; + this.val2 = val2; + } + + @Override + public int hashCode() { + return this.val2; + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof MyPojo) { + MyPojo that = (MyPojo) obj; + return this.val1 == that.val1 && this.val2 == that.val2; + } + else { + return false; + } + } + } + + public static class SerializationErrorType implements Value { + + private static final long serialVersionUID = -6037206294939421807L; + + @Override + public void write(DataOutputView out) throws IOException { + throw new IOException("test exception"); + } + + @Override + public void read(DataInputView in) throws IOException { + throw new IOException("test exception"); + } + } + + public static class DeserializeTooMuchType implements Value { + + private static final long serialVersionUID = -6037206294939421807L; + + @Override + public void write(DataOutputView out) throws IOException { + out.writeInt(42); + } + + @Override + public void read(DataInputView in) throws IOException { + in.readLong(); + } + } +} diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/ListSourceContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/ListSourceContext.java new file mode 100644 index 0000000000000..e718633bc6ead --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/ListSourceContext.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions; + +import org.apache.flink.streaming.api.functions.source.SourceFunction; + +import java.util.List; + +/** + * Mock context that collects elements in a List. + * + * @param Type of the collected elements. + */ +public class ListSourceContext implements SourceFunction.SourceContext { + + private final Object lock = new Object(); + + private final List target; + + + public ListSourceContext(List target) { + this.target = target; + } + + @Override + public void collect(T element) { + target.add(element); + } + + @Override + public Object getCheckpointLock() { + return lock; + } +} diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala index 7215a4db14d2f..70e652f3edec9 100644 --- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala @@ -27,7 +27,7 @@ import org.apache.flink.runtime.state.StateHandleProvider import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaEnv} import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext -import org.apache.flink.streaming.api.functions.source.{FromElementsFunction, SourceFunction} +import org.apache.flink.streaming.api.functions.source.SourceFunction import org.apache.flink.types.StringValue import org.apache.flink.util.SplittableIterator @@ -47,7 +47,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { * Sets the parallelism for operations executed through this environment. * Setting a parallelism of x here will cause all operators (such as join, map, reduce) to run * with x parallel instances. This value can be overridden by specific operations using - * [[DataStream.setParallelism]]. + * [[DataStream#setParallelism(int)]]. * @deprecated Please use [[setParallelism]] */ @deprecated @@ -57,7 +57,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { /** * Returns the default parallelism for this execution environment. Note that this - * value can be overridden by individual operations using [[DataStream.setParallelism]] + * value can be overridden by individual operations using [[DataStream#setParallelism(int)]] * @deprecated Please use [[getParallelism]] */ @deprecated @@ -67,7 +67,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { * Sets the parallelism for operations executed through this environment. * Setting a parallelism of x here will cause all operators (such as join, map, reduce) to run * with x parallel instances. This value can be overridden by specific operations using - * [[DataStream.setParallelism]]. + * [[DataStream#setParallelism(int)]]. */ def setParallelism(parallelism: Int): Unit = { javaEnv.setParallelism(parallelism) @@ -75,7 +75,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { /** * Returns the default parallelism for this execution environment. Note that this - * value can be overridden by individual operations using [[DataStream.setParallelism]] + * value can be overridden by individual operations using [[DataStream#setParallelism(int)]] */ def getParallelism = javaEnv.getParallelism @@ -86,15 +86,10 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { * can result in three logical modes: * *
    - *
  • - * A positive integer triggers flushing periodically by that integer
  • - *
  • - * 0 triggers flushing after every record thus minimizing latency
  • - *
  • - * -1 triggers flushing only when the output buffer is full thus maximizing - * throughput
  • + *
  • A positive integer triggers flushing periodically by that integer
  • + *
  • 0 triggers flushing after every record thus minimizing latency
  • + *
  • -1 triggers flushing only when the output buffer is full thus maximizing throughput
  • *
- * */ def setBufferTimeout(timeoutMillis: Long): StreamExecutionEnvironment = { javaEnv.setBufferTimeout(timeoutMillis) @@ -127,7 +122,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { * * Setting this option assumes that the job is used in production and thus if not stated * explicitly otherwise with calling with the - * {@link #setNumberOfExecutionRetries(int numberOfExecutionRetries)} method in case of + * [[setNumberOfExecutionRetries(int)]] method in case of * failure the job will be resubmitted to the cluster indefinitely. */ @deprecated @@ -142,7 +137,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { * * Setting this option assumes that the job is used in production and thus if not stated * explicitly otherwise with calling with the - * {@link #setNumberOfExecutionRetries(int numberOfExecutionRetries)} method in case of + * [[setNumberOfExecutionRetries(int)]] method in case of * failure the job will be resubmitted to the cluster indefinitely. */ def enableCheckpointing(interval : Long) : StreamExecutionEnvironment = { @@ -156,7 +151,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { * * Setting this option assumes that the job is used in production and thus if not stated * explicitly otherwise with calling with the - * {@link #setNumberOfExecutionRetries(int numberOfExecutionRetries)} method in case of + * [[setNumberOfExecutionRetries(int)]] method in case of * failure the job will be resubmitted to the cluster indefinitely. */ def enableCheckpointing() : StreamExecutionEnvironment = { @@ -284,10 +279,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { require(data != null, "Data must not be null.") val typeInfo = implicitly[TypeInformation[T]] - val sourceFunction = new FromElementsFunction[T](typeInfo.createSerializer(getConfig), - scala.collection.JavaConversions.asJavaCollection(data)) - - javaEnv.addSource(sourceFunction).returns(typeInfo) + javaEnv.fromCollection(scala.collection.JavaConversions.asJavaCollection(data), typeInfo) } /** @@ -460,7 +452,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { def getExecutionPlan = javaEnv.getExecutionPlan /** - * Getter of the {@link org.apache.flink.streaming.api.graph.StreamGraph} of the streaming job. + * Getter of the [[org.apache.flink.streaming.api.graph.StreamGraph]] of the streaming job. * * @return The StreamGraph representing the transformations */ @@ -468,7 +460,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { /** * Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning - * is not disabled in the {@link org.apache.flink.api.common.ExecutionConfig} + * is not disabled in the [[org.apache.flink.api.common.ExecutionConfig]] */ private[flink] def scalaClean[F <: AnyRef](f: F): F = { if (getConfig.isClosureCleanerEnabled) { @@ -484,7 +476,7 @@ object StreamExecutionEnvironment { /** * Sets the default parallelism that will be used for the local execution - * environment created by {@link #createLocalEnvironment()}. + * environment created by [[createLocalEnvironment()]]. * * @param parallelism * The parallelism to use as the default local parallelism.