From fe39e6ccba6396b53c3a9c2fce5f098d026a3f48 Mon Sep 17 00:00:00 2001 From: Dawid Wysakowicz Date: Fri, 14 Jul 2017 10:37:49 +0200 Subject: [PATCH 1/3] [FLINK-7190] Activate checkstyle flink-java/* --- .../apache/flink/api/java/ClosureCleaner.java | 44 +- .../flink/api/java/CollectionEnvironment.java | 5 +- .../org/apache/flink/api/java/DataSet.java | 879 +++++++++--------- .../flink/api/java/ExecutionEnvironment.java | 294 +++--- .../api/java/ExecutionEnvironmentFactory.java | 4 +- .../flink/api/java/LocalEnvironment.java | 34 +- .../flink/api/java/RemoteEnvironment.java | 42 +- .../java/org/apache/flink/api/java/Utils.java | 30 +- .../common/io/SequentialFormatTestBase.java | 36 +- .../api/common/io/SerializedFormatTest.java | 8 +- .../CollectionExecutionAccumulatorsTest.java | 36 +- .../CollectionExecutionIterationTest.java | 50 +- ...ionExecutionWithBroadcastVariableTest.java | 66 +- .../base/CoGroupOperatorCollectionTest.java | 5 +- .../base/GroupReduceOperatorTest.java | 21 +- .../base/InnerJoinOperatorBaseTest.java | 15 +- .../operators/base/ReduceOperatorTest.java | 14 +- .../api/java/MultipleInvokationsTest.java | 27 +- .../flink/api/java/TypeExtractionTest.java | 15 +- .../flink/api/java/tuple/TupleGenerator.java | 69 +- 20 files changed, 890 insertions(+), 804 deletions(-) diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java b/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java index 2f22a7583e7bd..dd4b5c54395e2 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java @@ -26,7 +26,6 @@ import org.objectweb.asm.ClassVisitor; import org.objectweb.asm.MethodVisitor; import org.objectweb.asm.Opcodes; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,20 +40,20 @@ */ @Internal public class ClosureCleaner { - - private static Logger LOG = LoggerFactory.getLogger(ClosureCleaner.class); - + + private static final Logger LOG = LoggerFactory.getLogger(ClosureCleaner.class); + /** * Tries to clean the closure of the given object, if the object is a non-static inner * class. - * + * * @param func The object whose closure should be cleaned. * @param checkSerializable Flag to indicate whether serializability should be checked after * the closure cleaning attempt. - * + * * @throws InvalidProgramException Thrown, if 'checkSerializable' is true, and the object was * not serializable after the closure cleaning. - * + * * @throws RuntimeException A RuntimeException may be thrown, if the code of the class could not * be loaded, in order to process during teh closure cleaning. */ @@ -62,32 +61,31 @@ public static void clean(Object func, boolean checkSerializable) { if (func == null) { return; } - + final Class cls = func.getClass(); // First find the field name of the "this$0" field, this can // be "this$x" depending on the nesting boolean closureAccessed = false; - + for (Field f: cls.getDeclaredFields()) { if (f.getName().startsWith("this$")) { // found a closure referencing field - now try to clean closureAccessed |= cleanThis0(func, cls, f.getName()); } } - + if (checkSerializable) { try { InstantiationUtil.serializeObject(func); } catch (Exception e) { String functionType = getSuperClassOrInterfaceName(func.getClass()); - + String msg = functionType == null ? (func + " is not serializable.") : ("The implementation of the " + functionType + " is not serializable."); - - + if (closureAccessed) { msg += " The implementation accesses fields of its enclosing class, which is " + "a common reason for non-serializability. " + @@ -96,7 +94,7 @@ public static void clean(Object func, boolean checkSerializable) { } else { msg += " The object probably contains or references non serializable fields."; } - + throw new InvalidProgramException(msg, e); } } @@ -109,14 +107,14 @@ public static void ensureSerializable(Object obj) { throw new InvalidProgramException("Object " + obj + " is not serializable", e); } } - + private static boolean cleanThis0(Object func, Class cls, String this0Name) { - + This0AccessFinder this0Finder = new This0AccessFinder(this0Name); getClassReader(cls).accept(this0Finder, 0); - + final boolean accessesClosure = this0Finder.isThis0Accessed(); - + if (LOG.isDebugEnabled()) { LOG.debug(this0Name + " is accessed: " + accessesClosure); } @@ -129,7 +127,7 @@ private static boolean cleanThis0(Object func, Class cls, String this0Name) { // has no this$0, just return throw new RuntimeException("Could not set " + this0Name + ": " + e); } - + try { this0.setAccessible(true); this0.set(func, null); @@ -139,10 +137,10 @@ private static boolean cleanThis0(Object func, Class cls, String this0Name) { throw new RuntimeException("Could not set " + this0Name + " to null. " + e.getMessage(), e); } } - + return accessesClosure; } - + private static ClassReader getClassReader(Class cls) { String className = cls.getName().replaceFirst("^.*\\.", "") + ".class"; try { @@ -151,8 +149,7 @@ private static ClassReader getClassReader(Class cls) { throw new RuntimeException("Could not create ClassReader: " + e.getMessage(), e); } } - - + private static String getSuperClassOrInterfaceName(Class cls) { Class superclass = cls.getSuperclass(); if (superclass.getName().startsWith("org.apache.flink")) { @@ -176,7 +173,6 @@ class This0AccessFinder extends ClassVisitor { private final String this0Name; private boolean isThis0Accessed; - public This0AccessFinder(String this0Name) { super(Opcodes.ASM5); diff --git a/flink-java/src/main/java/org/apache/flink/api/java/CollectionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/CollectionEnvironment.java index 0d6628640d74d..bcfaac037c5fc 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/CollectionEnvironment.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/CollectionEnvironment.java @@ -23,6 +23,9 @@ import org.apache.flink.api.common.Plan; import org.apache.flink.api.common.operators.CollectionExecutor; +/** + * Version of {@link ExecutionEnvironment} that allows serial, local, collection-based executions of Flink programs. + */ @PublicEvolving public class CollectionEnvironment extends ExecutionEnvironment { @@ -40,7 +43,7 @@ public JobExecutionResult execute(String jobName) throws Exception { public int getParallelism() { return 1; // always serial } - + @Override public String getExecutionPlan() throws Exception { throw new UnsupportedOperationException("Execution plans are not used for collection-based execution."); diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java index e3b2ec26bef02..3dd4f6a82166f 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java @@ -18,8 +18,8 @@ package org.apache.flink.api.java; -import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.annotation.Public; +import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.accumulators.SerializedListAccumulator; @@ -34,6 +34,7 @@ import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.io.FileOutputFormat; import org.apache.flink.api.common.io.OutputFormat; +import org.apache.flink.api.common.operators.Keys; import org.apache.flink.api.common.operators.Order; import org.apache.flink.api.common.operators.base.CrossOperatorBase.CrossHint; import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint; @@ -64,7 +65,6 @@ import org.apache.flink.api.java.operators.GroupReduceOperator; import org.apache.flink.api.java.operators.IterativeDataSet; import org.apache.flink.api.java.operators.JoinOperator.JoinOperatorSets; -import org.apache.flink.api.common.operators.Keys; import org.apache.flink.api.java.operators.MapOperator; import org.apache.flink.api.java.operators.MapPartitionOperator; import org.apache.flink.api.java.operators.PartitionOperator; @@ -93,8 +93,9 @@ import java.util.List; /** - * A DataSet represents a collection of elements of the same type.
- * A DataSet can be transformed into another DataSet by applying a transformation as for example + * A DataSet represents a collection of elements of the same type. + * + *

A DataSet can be transformed into another DataSet by applying a transformation as for example *

+ * + * @param fields One or more field positions on which the DataSet will be grouped. * @return A Grouping on which a transformation needs to be applied to obtain a transformed DataSet. - * + * * @see Tuple * @see UnsortedGrouping * @see AggregateOperator @@ -718,44 +727,46 @@ public UnsortedGrouping groupBy(int... fields) { public UnsortedGrouping groupBy(String... fields) { return new UnsortedGrouping<>(this, new Keys.ExpressionKeys<>(fields, getType())); } - + // -------------------------------------------------------------------------------------------- // Joining // -------------------------------------------------------------------------------------------- - - /** - * Initiates a Join transformation.
- * A Join transformation joins the elements of two - * {@link DataSet DataSets} on key equality and provides multiple ways to combine - * joining elements into one DataSet.
- * - * This method returns a {@link JoinOperatorSets} on which one of the {@code where} methods + + /** + * Initiates a Join transformation. + * + *

A Join transformation joins the elements of two + * {@link DataSet DataSets} on key equality and provides multiple ways to combine + * joining elements into one DataSet. + * + *

This method returns a {@link JoinOperatorSets} on which one of the {@code where} methods * can be called to define the join key of the first joining (i.e., this) DataSet. - * + * * @param other The other DataSet with which this DataSet is joined. * @return A JoinOperatorSets to continue the definition of the Join transformation. - * + * * @see JoinOperatorSets * @see DataSet */ public JoinOperatorSets join(DataSet other) { return new JoinOperatorSets<>(this, other); } - + /** - * Initiates a Join transformation.
- * A Join transformation joins the elements of two - * {@link DataSet DataSets} on key equality and provides multiple ways to combine - * joining elements into one DataSet.
- * - * This method returns a {@link JoinOperatorSets} on which one of the {@code where} methods + * Initiates a Join transformation. + * + *

A Join transformation joins the elements of two + * {@link DataSet DataSets} on key equality and provides multiple ways to combine + * joining elements into one DataSet. + * + *

This method returns a {@link JoinOperatorSets} on which one of the {@code where} methods * can be called to define the join key of the first joining (i.e., this) DataSet. - * + * * @param other The other DataSet with which this DataSet is joined. * @param strategy The strategy that should be used execute the join. If {@code null} is given, then the * optimizer will pick the join strategy. * @return A JoinOperatorSets to continue the definition of the Join transformation. - * + * * @see JoinOperatorSets * @see DataSet */ @@ -764,39 +775,45 @@ public JoinOperatorSets join(DataSet other, JoinHint strategy) { } /** - * Initiates a Join transformation.
- * A Join transformation joins the elements of two - * {@link DataSet DataSets} on key equality and provides multiple ways to combine - * joining elements into one DataSet.
- * This method also gives the hint to the optimizer that the second DataSet to join is much - * smaller than the first one.
- * This method returns a {@link JoinOperatorSets} on which + * Initiates a Join transformation. + * + *

A Join transformation joins the elements of two + * {@link DataSet DataSets} on key equality and provides multiple ways to combine + * joining elements into one DataSet. + * + *

This method also gives the hint to the optimizer that the second DataSet to join is much + * smaller than the first one. + * + *

This method returns a {@link JoinOperatorSets} on which * {@link JoinOperatorSets#where(String...)} needs to be called to define the join key of the first * joining (i.e., this) DataSet. - * + * * @param other The other DataSet with which this DataSet is joined. * @return A JoinOperatorSets to continue the definition of the Join transformation. - * + * * @see JoinOperatorSets * @see DataSet */ public JoinOperatorSets joinWithTiny(DataSet other) { return new JoinOperatorSets<>(this, other, JoinHint.BROADCAST_HASH_SECOND); } - + /** - * Initiates a Join transformation.
- * A Join transformation joins the elements of two - * {@link DataSet DataSets} on key equality and provides multiple ways to combine - * joining elements into one DataSet.
- * This method also gives the hint to the optimizer that the second DataSet to join is much - * larger than the first one.
- * This method returns a {@link JoinOperatorSets} on which one of the {@code where} methods + * Initiates a Join transformation. + * + *

A Join transformation joins the elements of two + * {@link DataSet DataSets} on key equality and provides multiple ways to combine + * joining elements into one DataSet. + * + *

This method also gives the hint to the optimizer that the second DataSet to join is much + * larger than the first one. + * + *

This method returns a {@link JoinOperatorSets} on which one of the {@code where} methods * can be called to define the join key of the first joining (i.e., this) DataSet. - * + * * @param other The other DataSet with which this DataSet is joined. * @return A JoinOperatorSet to continue the definition of the Join transformation. - * + * * @see JoinOperatorSets * @see DataSet */ @@ -805,11 +822,13 @@ public JoinOperatorSets joinWithHuge(DataSet other) { } /** - * Initiates a Left Outer Join transformation.
- * An Outer Join transformation joins two elements of two + * Initiates a Left Outer Join transformation. + * + *

An Outer Join transformation joins two elements of two * {@link DataSet DataSets} on key equality and provides multiple ways to combine - * joining elements into one DataSet.
- * Elements of the left DataSet (i.e. {@code this}) that do not have a matching + * joining elements into one DataSet. + * + *

Elements of the left DataSet (i.e. {@code this}) that do not have a matching * element on the other side are joined with {@code null} and emitted to the * resulting DataSet. * @@ -824,11 +843,13 @@ public JoinOperatorSetsBase leftOuterJoin(DataSet other) { } /** - * Initiates a Left Outer Join transformation.
- * An Outer Join transformation joins two elements of two + * Initiates a Left Outer Join transformation. + * + *

An Outer Join transformation joins two elements of two * {@link DataSet DataSets} on key equality and provides multiple ways to combine - * joining elements into one DataSet.
- * Elements of the left DataSet (i.e. {@code this}) that do not have a matching + * joining elements into one DataSet. + * + *

Elements of the left DataSet (i.e. {@code this}) that do not have a matching * element on the other side are joined with {@code null} and emitted to the * resulting DataSet. * @@ -849,17 +870,19 @@ public JoinOperatorSetsBase leftOuterJoin(DataSet other, JoinHint s case BROADCAST_HASH_SECOND: return new JoinOperatorSetsBase<>(this, other, strategy, JoinType.LEFT_OUTER); default: - throw new InvalidProgramException("Invalid JoinHint for LeftOuterJoin: "+strategy); + throw new InvalidProgramException("Invalid JoinHint for LeftOuterJoin: " + strategy); } } /** - * Initiates a Right Outer Join transformation.
- * An Outer Join transformation joins two elements of two + * Initiates a Right Outer Join transformation. + * + *

An Outer Join transformation joins two elements of two * {@link DataSet DataSets} on key equality and provides multiple ways to combine - * joining elements into one DataSet.
- * Elements of the right DataSet (i.e. {@code other}) that do not have a matching + * joining elements into one DataSet. + * + *

Elements of the right DataSet (i.e. {@code other}) that do not have a matching * element on {@code this} side are joined with {@code null} and emitted to the * resulting DataSet. * @@ -874,11 +897,13 @@ public JoinOperatorSetsBase rightOuterJoin(DataSet other) { } /** - * Initiates a Right Outer Join transformation.
- * An Outer Join transformation joins two elements of two + * Initiates a Right Outer Join transformation. + * + *

An Outer Join transformation joins two elements of two * {@link DataSet DataSets} on key equality and provides multiple ways to combine - * joining elements into one DataSet.
- * Elements of the right DataSet (i.e. {@code other}) that do not have a matching + * joining elements into one DataSet. + * + *

Elements of the right DataSet (i.e. {@code other}) that do not have a matching * element on {@code this} side are joined with {@code null} and emitted to the * resulting DataSet. * @@ -899,16 +924,18 @@ public JoinOperatorSetsBase rightOuterJoin(DataSet other, JoinHint case BROADCAST_HASH_FIRST: return new JoinOperatorSetsBase<>(this, other, strategy, JoinType.RIGHT_OUTER); default: - throw new InvalidProgramException("Invalid JoinHint for RightOuterJoin: "+strategy); + throw new InvalidProgramException("Invalid JoinHint for RightOuterJoin: " + strategy); } } /** - * Initiates a Full Outer Join transformation.
- * An Outer Join transformation joins two elements of two + * Initiates a Full Outer Join transformation. + * + *

An Outer Join transformation joins two elements of two * {@link DataSet DataSets} on key equality and provides multiple ways to combine - * joining elements into one DataSet.
- * Elements of both DataSets that do not have a matching + * joining elements into one DataSet. + * + *

Elements of both DataSets that do not have a matching * element on the opposing side are joined with {@code null} and emitted to the * resulting DataSet. * @@ -923,11 +950,13 @@ public JoinOperatorSetsBase fullOuterJoin(DataSet other) { } /** - * Initiates a Full Outer Join transformation.
- * An Outer Join transformation joins two elements of two + * Initiates a Full Outer Join transformation. + * + *

An Outer Join transformation joins two elements of two * {@link DataSet DataSets} on key equality and provides multiple ways to combine - * joining elements into one DataSet.
- * Elements of both DataSets that do not have a matching + * joining elements into one DataSet. + * + *

Elements of both DataSets that do not have a matching * element on the opposing side are joined with {@code null} and emitted to the * resulting DataSet. * @@ -947,30 +976,32 @@ public JoinOperatorSetsBase fullOuterJoin(DataSet other, JoinHint s case REPARTITION_HASH_SECOND: return new JoinOperatorSetsBase<>(this, other, strategy, JoinType.FULL_OUTER); default: - throw new InvalidProgramException("Invalid JoinHint for FullOuterJoin: "+strategy); + throw new InvalidProgramException("Invalid JoinHint for FullOuterJoin: " + strategy); } } - // -------------------------------------------------------------------------------------------- // Co-Grouping // -------------------------------------------------------------------------------------------- /** - * Initiates a CoGroup transformation.
- * A CoGroup transformation combines the elements of - * two {@link DataSet DataSets} into one DataSet. It groups each DataSet individually on a key and + * Initiates a CoGroup transformation. + * + *

A CoGroup transformation combines the elements of + * two {@link DataSet DataSets} into one DataSet. It groups each DataSet individually on a key and * gives groups of both DataSets with equal keys together into a {@link org.apache.flink.api.common.functions.RichCoGroupFunction}. * If a DataSet has a group with no matching key in the other DataSet, the CoGroupFunction - * is called with an empty group for the non-existing group.
- * The CoGroupFunction can iterate over the elements of both groups and return any number - * of elements including none.
- * This method returns a {@link CoGroupOperatorSets} on which one of the {@code where} methods + * is called with an empty group for the non-existing group. + * + *

The CoGroupFunction can iterate over the elements of both groups and return any number + * of elements including none. + * + *

This method returns a {@link CoGroupOperatorSets} on which one of the {@code where} methods * can be called to define the join key of the first joining (i.e., this) DataSet. - * + * * @param other The other DataSet of the CoGroup transformation. * @return A CoGroupOperatorSets to continue the definition of the CoGroup transformation. - * + * * @see CoGroupOperatorSets * @see CoGroupOperator * @see DataSet @@ -984,37 +1015,39 @@ public CoGroupOperator.CoGroupOperatorSets coGroup(DataSet other) { // -------------------------------------------------------------------------------------------- /** - * Continues a Join transformation and defines the {@link Tuple} fields of the second join - * {@link DataSet} that should be used as join keys.
- * Note: Fields can only be selected as join keys on Tuple DataSets.
- * - * The resulting {@link DefaultJoin} wraps each pair of joining elements into a {@link Tuple2}, with - * the element of the first input being the first field of the tuple and the element of the - * second input being the second field of the tuple. - * + * Continues a Join transformation and defines the {@link Tuple} fields of the second join + * {@link DataSet} that should be used as join keys. + * + *

Note: Fields can only be selected as join keys on Tuple DataSets. + * + *

The resulting {@link DefaultJoin} wraps each pair of joining elements into a {@link Tuple2}, with + * the element of the first input being the first field of the tuple and the element of the + * second input being the second field of the tuple. + * * @param fields The indexes of the Tuple fields of the second join DataSet that should be used as keys. * @return A DefaultJoin that represents the joined DataSet. */ - + /** - * Initiates a Cross transformation.
- * A Cross transformation combines the elements of two - * {@link DataSet DataSets} into one DataSet. It builds all pair combinations of elements of + * Initiates a Cross transformation. + * + *

A Cross transformation combines the elements of two + * {@link DataSet DataSets} into one DataSet. It builds all pair combinations of elements of * both DataSets, i.e., it builds a Cartesian product. - * - *

- * The resulting {@link org.apache.flink.api.java.operators.CrossOperator.DefaultCross} wraps each pair of crossed elements into a {@link Tuple2}, with - * the element of the first input being the first field of the tuple and the element of the + * + * + *

The resulting {@link org.apache.flink.api.java.operators.CrossOperator.DefaultCross} wraps each pair of crossed elements into a {@link Tuple2}, with + * the element of the first input being the first field of the tuple and the element of the * second input being the second field of the tuple. - * - *

- * Call {@link org.apache.flink.api.java.operators.CrossOperator.DefaultCross#with(org.apache.flink.api.common.functions.CrossFunction)} to define a + * + * + *

Call {@link org.apache.flink.api.java.operators.CrossOperator.DefaultCross#with(org.apache.flink.api.common.functions.CrossFunction)} to define a * {@link org.apache.flink.api.common.functions.CrossFunction} which is called for - * each pair of crossed elements. The CrossFunction returns a exactly one element for each pair of input elements.
- * - * @param other The other DataSet with which this DataSet is crossed. + * each pair of crossed elements. The CrossFunction returns a exactly one element for each pair of input elements. + * + * @param other The other DataSet with which this DataSet is crossed. * @return A DefaultCross that returns a Tuple2 for each pair of crossed elements. - * + * * @see org.apache.flink.api.java.operators.CrossOperator.DefaultCross * @see org.apache.flink.api.common.functions.CrossFunction * @see DataSet @@ -1023,28 +1056,29 @@ public CoGroupOperator.CoGroupOperatorSets coGroup(DataSet other) { public CrossOperator.DefaultCross cross(DataSet other) { return new CrossOperator.DefaultCross<>(this, other, CrossHint.OPTIMIZER_CHOOSES, Utils.getCallLocationName()); } - + /** - * Initiates a Cross transformation.
- * A Cross transformation combines the elements of two - * {@link DataSet DataSets} into one DataSet. It builds all pair combinations of elements of + * Initiates a Cross transformation. + * + *

A Cross transformation combines the elements of two + * {@link DataSet DataSets} into one DataSet. It builds all pair combinations of elements of * both DataSets, i.e., it builds a Cartesian product. * This method also gives the hint to the optimizer that the second DataSet to cross is much * smaller than the first one. - * - *

- * The resulting {@link org.apache.flink.api.java.operators.CrossOperator.DefaultCross} wraps each pair of crossed elements into a {@link Tuple2}, with - * the element of the first input being the first field of the tuple and the element of the + * + * + *

The resulting {@link org.apache.flink.api.java.operators.CrossOperator.DefaultCross} wraps each pair of crossed elements into a {@link Tuple2}, with + * the element of the first input being the first field of the tuple and the element of the * second input being the second field of the tuple. - * - *

- * Call {@link org.apache.flink.api.java.operators.CrossOperator.DefaultCross#with(org.apache.flink.api.common.functions.CrossFunction)} to define a + * + * + *

Call {@link org.apache.flink.api.java.operators.CrossOperator.DefaultCross#with(org.apache.flink.api.common.functions.CrossFunction)} to define a * {@link org.apache.flink.api.common.functions.CrossFunction} which is called for - * each pair of crossed elements. The CrossFunction returns a exactly one element for each pair of input elements.
- * - * @param other The other DataSet with which this DataSet is crossed. + * each pair of crossed elements. The CrossFunction returns a exactly one element for each pair of input elements. + * + * @param other The other DataSet with which this DataSet is crossed. * @return A DefaultCross that returns a Tuple2 for each pair of crossed elements. - * + * * @see org.apache.flink.api.java.operators.CrossOperator.DefaultCross * @see org.apache.flink.api.common.functions.CrossFunction * @see DataSet @@ -1053,28 +1087,29 @@ public CrossOperator.DefaultCross cross(DataSet other) { public CrossOperator.DefaultCross crossWithTiny(DataSet other) { return new CrossOperator.DefaultCross<>(this, other, CrossHint.SECOND_IS_SMALL, Utils.getCallLocationName()); } - + /** - * Initiates a Cross transformation.
- * A Cross transformation combines the elements of two - * {@link DataSet DataSets} into one DataSet. It builds all pair combinations of elements of + * Initiates a Cross transformation. + * + *

A Cross transformation combines the elements of two + * {@link DataSet DataSets} into one DataSet. It builds all pair combinations of elements of * both DataSets, i.e., it builds a Cartesian product. * This method also gives the hint to the optimizer that the second DataSet to cross is much * larger than the first one. - * - *

- * The resulting {@link org.apache.flink.api.java.operators.CrossOperator.DefaultCross} wraps each pair of crossed elements into a {@link Tuple2}, with - * the element of the first input being the first field of the tuple and the element of the + * + * + *

The resulting {@link org.apache.flink.api.java.operators.CrossOperator.DefaultCross} wraps each pair of crossed elements into a {@link Tuple2}, with + * the element of the first input being the first field of the tuple and the element of the * second input being the second field of the tuple. - * - *

- * Call {@link org.apache.flink.api.java.operators.CrossOperator.DefaultCross#with(org.apache.flink.api.common.functions.CrossFunction)} to define a + * + * + *

Call {@link org.apache.flink.api.java.operators.CrossOperator.DefaultCross#with(org.apache.flink.api.common.functions.CrossFunction)} to define a * {@link org.apache.flink.api.common.functions.CrossFunction} which is called for - * each pair of crossed elements. The CrossFunction returns a exactly one element for each pair of input elements.
- * - * @param other The other DataSet with which this DataSet is crossed. + * each pair of crossed elements. The CrossFunction returns a exactly one element for each pair of input elements. + * + * @param other The other DataSet with which this DataSet is crossed. * @return A DefaultCross that returns a Tuple2 for each pair of crossed elements. - * + * * @see org.apache.flink.api.java.operators.CrossOperator.DefaultCross * @see org.apache.flink.api.common.functions.CrossFunction * @see DataSet @@ -1094,13 +1129,13 @@ public CrossOperator.DefaultCross crossWithHuge(DataSet other) { * given to the {@code closeWith(DataSet)} method is the data set that will be fed back and used as the input * to the next iteration. The return value of the {@code closeWith(DataSet)} method is the resulting * data set after the iteration has terminated. - *

- * An example of an iterative computation is as follows: + * + *

An example of an iterative computation is as follows: * *

 	 * {@code
 	 * DataSet input = ...;
-	 * 
+	 *
 	 * DataSet startOfIteration = input.iterate(10);
 	 * DataSet toBeFedBack = startOfIteration
 	 *                               .map(new MyMapper())
@@ -1108,20 +1143,20 @@ public  CrossOperator.DefaultCross crossWithHuge(DataSet other) {
 	 * DataSet result = startOfIteration.closeWith(toBeFedBack);
 	 * }
 	 * 
- *

- * The iteration has a maximum number of times that it executes. A dynamic termination can be realized by using a + * + *

The iteration has a maximum number of times that it executes. A dynamic termination can be realized by using a * termination criterion (see {@link org.apache.flink.api.java.operators.IterativeDataSet#closeWith(DataSet, DataSet)}). - * + * * @param maxIterations The maximum number of times that the iteration is executed. * @return An IterativeDataSet that marks the start of the iterative part and needs to be closed by * {@link org.apache.flink.api.java.operators.IterativeDataSet#closeWith(DataSet)}. - * + * * @see org.apache.flink.api.java.operators.IterativeDataSet */ public IterativeDataSet iterate(int maxIterations) { return new IterativeDataSet<>(getExecutionEnvironment(), getType(), this, maxIterations); } - + /** * Initiates a delta iteration. A delta iteration is similar to a regular iteration (as started by {@link #iterate(int)}, * but maintains state across the individual iteration steps. The Solution set, which represents the current state @@ -1130,48 +1165,48 @@ public IterativeDataSet iterate(int maxIterations) { * can be obtained via {@link org.apache.flink.api.java.operators.DeltaIteration#getWorkset()}. * The solution set is updated by producing a delta for it, which is merged into the solution set at the end of each * iteration step. - *

- * The delta iteration must be closed by calling {@link org.apache.flink.api.java.operators.DeltaIteration#closeWith(DataSet, DataSet)}. The two + * + *

The delta iteration must be closed by calling {@link org.apache.flink.api.java.operators.DeltaIteration#closeWith(DataSet, DataSet)}. The two * parameters are the delta for the solution set and the new workset (the data set that will be fed back). * The return value of the {@code closeWith(DataSet, DataSet)} method is the resulting * data set after the iteration has terminated. Delta iterations terminate when the feed back data set * (the workset) is empty. In addition, a maximum number of steps is given as a fall back termination guard. - *

- * Elements in the solution set are uniquely identified by a key. When merging the solution set delta, contained elements + * + *

Elements in the solution set are uniquely identified by a key. When merging the solution set delta, contained elements * with the same key are replaced. - *

- * NOTE: Delta iterations currently support only tuple valued data types. This restriction + * + *

NOTE: Delta iterations currently support only tuple valued data types. This restriction * will be removed in the future. The key is specified by the tuple position. - *

- * A code example for a delta iteration is as follows + * + *

A code example for a delta iteration is as follows *

 	 * {@code
 	 * DeltaIteration, Tuple2> iteration =
 	 *                                                  initialState.iterateDelta(initialFeedbackSet, 100, 0);
-	 * 
+	 *
 	 * DataSet> delta = iteration.groupBy(0).aggregate(Aggregations.AVG, 1)
 	 *                                              .join(iteration.getSolutionSet()).where(0).equalTo(0)
 	 *                                              .flatMap(new ProjectAndFilter());
-	 *                                              
+	 *
 	 * DataSet> feedBack = delta.join(someOtherSet).where(...).equalTo(...).with(...);
-	 * 
+	 *
 	 * // close the delta iteration (delta and new workset are identical)
 	 * DataSet> result = iteration.closeWith(delta, feedBack);
 	 * }
 	 * 
- * + * * @param workset The initial version of the data set that is fed back to the next iteration step (the workset). * @param maxIterations The maximum number of iteration steps, as a fall back safeguard. * @param keyPositions The position of the tuple fields that is used as the key of the solution set. - * + * * @return The DeltaIteration that marks the start of a delta iteration. - * + * * @see org.apache.flink.api.java.operators.DeltaIteration */ public DeltaIteration iterateDelta(DataSet workset, int maxIterations, int... keyPositions) { Preconditions.checkNotNull(workset); Preconditions.checkNotNull(keyPositions); - + Keys.ExpressionKeys keys = new Keys.ExpressionKeys<>(keyPositions, getType()); return new DeltaIteration<>(getExecutionEnvironment(), getType(), this, workset, keys, maxIterations); } @@ -1179,12 +1214,11 @@ public DeltaIteration iterateDelta(DataSet workset, int maxIteratio // -------------------------------------------------------------------------------------------- // Custom Operators // ------------------------------------------------------------------------------------------- - /** * Runs a {@link CustomUnaryOperation} on the data set. Custom operations are typically complex * operators that are composed of multiple steps. - * + * * @param operation The operation to run. * @return The data set produced by the operation. */ @@ -1193,14 +1227,14 @@ public DataSet runOperation(CustomUnaryOperation operation) { operation.setInput(this); return operation.createResult(); } - + // -------------------------------------------------------------------------------------------- // Union // -------------------------------------------------------------------------------------------- /** * Creates a union of this DataSet with an other DataSet. The other DataSet must be of the same data type. - * + * * @param other The other DataSet which is unioned with the current DataSet. * @return The resulting DataSet. */ @@ -1211,39 +1245,39 @@ public UnionOperator union(DataSet other){ // -------------------------------------------------------------------------------------------- // Partitioning // -------------------------------------------------------------------------------------------- - + /** * Hash-partitions a DataSet on the specified key fields. - *

- * Important:This operation shuffles the whole DataSet over the network and can take significant amount of time. - * + * + *

Important:This operation shuffles the whole DataSet over the network and can take significant amount of time. + * * @param fields The field indexes on which the DataSet is hash-partitioned. * @return The partitioned DataSet. */ public PartitionOperator partitionByHash(int... fields) { return new PartitionOperator<>(this, PartitionMethod.HASH, new Keys.ExpressionKeys<>(fields, getType()), Utils.getCallLocationName()); } - + /** * Hash-partitions a DataSet on the specified key fields. - *

- * Important:This operation shuffles the whole DataSet over the network and can take significant amount of time. - * + * + *

Important:This operation shuffles the whole DataSet over the network and can take significant amount of time. + * * @param fields The field expressions on which the DataSet is hash-partitioned. * @return The partitioned DataSet. */ public PartitionOperator partitionByHash(String... fields) { return new PartitionOperator<>(this, PartitionMethod.HASH, new Keys.ExpressionKeys<>(fields, getType()), Utils.getCallLocationName()); } - + /** * Partitions a DataSet using the specified KeySelector. - *

- * Important:This operation shuffles the whole DataSet over the network and can take significant amount of time. - * + * + *

Important:This operation shuffles the whole DataSet over the network and can take significant amount of time. + * * @param keyExtractor The KeyExtractor with which the DataSet is hash-partitioned. * @return The partitioned DataSet. - * + * * @see KeySelector */ public > PartitionOperator partitionByHash(KeySelector keyExtractor) { @@ -1253,8 +1287,8 @@ public > PartitionOperator partitionByHash(KeySelecto /** * Range-partitions a DataSet on the specified key fields. - *

- * Important:This operation requires an extra pass over the DataSet to compute the range boundaries and + * + *

Important:This operation requires an extra pass over the DataSet to compute the range boundaries and * shuffles the whole DataSet over the network. This can take significant amount of time. * * @param fields The field indexes on which the DataSet is range-partitioned. @@ -1266,8 +1300,8 @@ public PartitionOperator partitionByRange(int... fields) { /** * Range-partitions a DataSet on the specified key fields. - *

- * Important:This operation requires an extra pass over the DataSet to compute the range boundaries and + * + *

Important:This operation requires an extra pass over the DataSet to compute the range boundaries and * shuffles the whole DataSet over the network. This can take significant amount of time. * * @param fields The field expressions on which the DataSet is range-partitioned. @@ -1279,8 +1313,8 @@ public PartitionOperator partitionByRange(String... fields) { /** * Range-partitions a DataSet using the specified KeySelector. - *

- * Important:This operation requires an extra pass over the DataSet to compute the range boundaries and + * + *

Important:This operation requires an extra pass over the DataSet to compute the range boundaries and * shuffles the whole DataSet over the network. This can take significant amount of time. * * @param keyExtractor The KeyExtractor with which the DataSet is range-partitioned. @@ -1296,9 +1330,9 @@ public > PartitionOperator partitionByRange(KeySelect /** * Partitions a tuple DataSet on the specified key fields using a custom partitioner. * This method takes the key position to partition on, and a partitioner that accepts the key type. - *

- * Note: This method works only on single field keys. - * + * + *

Note: This method works only on single field keys. + * * @param partitioner The partitioner to assign partitions to keys. * @param field The field index on which the DataSet is to partitioned. * @return The partitioned DataSet. @@ -1306,13 +1340,13 @@ public > PartitionOperator partitionByRange(KeySelect public PartitionOperator partitionCustom(Partitioner partitioner, int field) { return new PartitionOperator<>(this, new Keys.ExpressionKeys<>(new int[] {field}, getType()), clean(partitioner), Utils.getCallLocationName()); } - + /** * Partitions a POJO DataSet on the specified key fields using a custom partitioner. * This method takes the key expression to partition on, and a partitioner that accepts the key type. - *

- * Note: This method works only on single field keys. - * + * + *

Note: This method works only on single field keys. + * * @param partitioner The partitioner to assign partitions to keys. * @param field The field index on which the DataSet is to partitioned. * @return The partitioned DataSet. @@ -1320,32 +1354,32 @@ public PartitionOperator partitionCustom(Partitioner partitioner, int public PartitionOperator partitionCustom(Partitioner partitioner, String field) { return new PartitionOperator<>(this, new Keys.ExpressionKeys<>(new String[] {field}, getType()), clean(partitioner), Utils.getCallLocationName()); } - + /** * Partitions a DataSet on the key returned by the selector, using a custom partitioner. * This method takes the key selector to get the key to partition on, and a partitioner that * accepts the key type. - *

- * Note: This method works only on single field keys, i.e. the selector cannot return tuples + * + *

Note: This method works only on single field keys, i.e. the selector cannot return tuples * of fields. - * + * * @param partitioner The partitioner to assign partitions to keys. * @param keyExtractor The KeyExtractor with which the DataSet is partitioned. * @return The partitioned DataSet. - * + * * @see KeySelector */ public > PartitionOperator partitionCustom(Partitioner partitioner, KeySelector keyExtractor) { final TypeInformation keyType = TypeExtractor.getKeySelectorTypes(keyExtractor, getType()); return new PartitionOperator<>(this, new Keys.SelectorFunctionKeys<>(keyExtractor, getType(), keyType), clean(partitioner), Utils.getCallLocationName()); } - + /** - * Enforces a re-balancing of the DataSet, i.e., the DataSet is evenly distributed over all parallel instances of the + * Enforces a re-balancing of the DataSet, i.e., the DataSet is evenly distributed over all parallel instances of the * following task. This can help to improve performance in case of heavy data skew and compute intensive operations. - *

- * Important:This operation shuffles the whole DataSet over the network and can take significant amount of time. - * + * + *

Important:This operation shuffles the whole DataSet over the network and can take significant amount of time. + * * @return The re-balanced DataSet. */ public PartitionOperator rebalance() { @@ -1384,7 +1418,7 @@ public SortPartitionOperator sortPartition(String field, Order order) { * Locally sorts the partitions of the DataSet on the extracted key in the specified order. * The DataSet can be sorted on multiple values by returning a tuple from the KeySelector. * - * Note that no additional sort keys can be appended to a KeySelector sort keys. To sort + *

Note that no additional sort keys can be appended to a KeySelector sort keys. To sort * the partitions by multiple values using KeySelector, the KeySelector must return a tuple * consisting of the values. * @@ -1401,14 +1435,15 @@ public SortPartitionOperator sortPartition(KeySelector keyExtractor // -------------------------------------------------------------------------------------------- // Top-K // -------------------------------------------------------------------------------------------- - + // -------------------------------------------------------------------------------------------- // Result writing // -------------------------------------------------------------------------------------------- - + /** - * Writes a DataSet as text file(s) to the specified location.
- * For each element of the DataSet the result of {@link Object#toString()} is written.
+ * Writes a DataSet as text file(s) to the specified location. + * + *

For each element of the DataSet the result of {@link Object#toString()} is written.
*
* Output files and directories
* What output how writeAsText() method produces is depending on other circumstance @@ -1432,8 +1467,8 @@ public SortPartitionOperator sortPartition(KeySelector keyExtractor *

{@code // Parallelism is set to only this particular operation
 	 *dataset.writeAsText("file:///path1").setParallelism(1);
 	 *
-	 * // This will creates the same effect but note all operators' parallelism are set to one 
-	 *env.setParallelism(1); 
+	 * // This will creates the same effect but note all operators' parallelism are set to one
+	 *env.setParallelism(1);
 	 *...
 	 *dataset.writeAsText("file:///path1"); }
* @@ -1448,10 +1483,10 @@ public SortPartitionOperator sortPartition(KeySelector keyExtractor *dataset.writeAsText("file:///path1").setParallelism(1); } * * - * + * * @param filePath The path pointing to the location the text file or files under the directory is written to. * @return The DataSink that writes the DataSet. - * + * * @see TextOutputFormat */ public DataSink writeAsText(String filePath) { @@ -1459,13 +1494,14 @@ public DataSink writeAsText(String filePath) { } /** - * Writes a DataSet as text file(s) to the specified location.
- * For each element of the DataSet the result of {@link Object#toString()} is written. - * + * Writes a DataSet as text file(s) to the specified location. + * + *

For each element of the DataSet the result of {@link Object#toString()} is written. + * * @param filePath The path pointing to the location the text file is written to. * @param writeMode Control the behavior for existing files. Options are NO_OVERWRITE and OVERWRITE. * @return The DataSink that writes the DataSet. - * + * * @see TextOutputFormat * @see DataSet#writeAsText(String) Output files and directories */ @@ -1476,8 +1512,9 @@ public DataSink writeAsText(String filePath, WriteMode writeMode) { } /** - * Writes a DataSet as text file(s) to the specified location.
- * For each element of the DataSet the result of {@link TextFormatter#format(Object)} is written. + * Writes a DataSet as text file(s) to the specified location. + * + *

For each element of the DataSet the result of {@link TextFormatter#format(Object)} is written. * * @param filePath The path pointing to the location the text file is written to. * @param formatter formatter that is applied on every element of the DataSet. @@ -1491,8 +1528,9 @@ public DataSink writeAsFormattedText(String filePath, TextFormatter f } /** - * Writes a DataSet as text file(s) to the specified location.
- * For each element of the DataSet the result of {@link TextFormatter#format(Object)} is written. + * Writes a DataSet as text file(s) to the specified location. + * + *

For each element of the DataSet the result of {@link TextFormatter#format(Object)} is written. * * @param filePath The path pointing to the location the text file is written to. * @param writeMode Control the behavior for existing files. Options are NO_OVERWRITE and OVERWRITE. @@ -1507,51 +1545,59 @@ public DataSink writeAsFormattedText(String filePath, WriteMode writeMod } /** - * Writes a {@link Tuple} DataSet as CSV file(s) to the specified location.
- * Note: Only a Tuple DataSet can written as a CSV file.
- * For each Tuple field the result of {@link Object#toString()} is written. - * Tuple fields are separated by the default field delimiter {@code "comma" (,)}.
- * Tuples are are separated by the newline character ({@code \n}). - * + * Writes a {@link Tuple} DataSet as CSV file(s) to the specified location. + * + *

Note: Only a Tuple DataSet can written as a CSV file. + * + *

For each Tuple field the result of {@link Object#toString()} is written. + * Tuple fields are separated by the default field delimiter {@code "comma" (,)}. + * + *

Tuples are are separated by the newline character ({@code \n}). + * * @param filePath The path pointing to the location the CSV file is written to. * @return The DataSink that writes the DataSet. - * + * * @see Tuple * @see CsvOutputFormat - * @see DataSet#writeAsText(String) Output files and directories + * @see DataSet#writeAsText(String) Output files and directories */ public DataSink writeAsCsv(String filePath) { return writeAsCsv(filePath, CsvOutputFormat.DEFAULT_LINE_DELIMITER, CsvOutputFormat.DEFAULT_FIELD_DELIMITER); } /** - * Writes a {@link Tuple} DataSet as CSV file(s) to the specified location.
- * Note: Only a Tuple DataSet can written as a CSV file.
- * For each Tuple field the result of {@link Object#toString()} is written. - * Tuple fields are separated by the default field delimiter {@code "comma" (,)}.
- * Tuples are are separated by the newline character ({@code \n}). - * + * Writes a {@link Tuple} DataSet as CSV file(s) to the specified location. + * + *

Note: Only a Tuple DataSet can written as a CSV file. + * + *

For each Tuple field the result of {@link Object#toString()} is written. + * Tuple fields are separated by the default field delimiter {@code "comma" (,)}. + * + *

Tuples are are separated by the newline character ({@code \n}). + * * @param filePath The path pointing to the location the CSV file is written to. * @param writeMode The behavior regarding existing files. Options are NO_OVERWRITE and OVERWRITE. * @return The DataSink that writes the DataSet. - * + * * @see Tuple * @see CsvOutputFormat * @see DataSet#writeAsText(String) Output files and directories */ public DataSink writeAsCsv(String filePath, WriteMode writeMode) { - return internalWriteAsCsv(new Path(filePath),CsvOutputFormat.DEFAULT_LINE_DELIMITER, CsvOutputFormat.DEFAULT_FIELD_DELIMITER, writeMode); + return internalWriteAsCsv(new Path(filePath), CsvOutputFormat.DEFAULT_LINE_DELIMITER, CsvOutputFormat.DEFAULT_FIELD_DELIMITER, writeMode); } /** - * Writes a {@link Tuple} DataSet as CSV file(s) to the specified location with the specified field and line delimiters.
- * Note: Only a Tuple DataSet can written as a CSV file.
- * For each Tuple field the result of {@link Object#toString()} is written. - * + * Writes a {@link Tuple} DataSet as CSV file(s) to the specified location with the specified field and line delimiters. + * + *

Note: Only a Tuple DataSet can written as a CSV file. + * + *

For each Tuple field the result of {@link Object#toString()} is written. + * * @param filePath The path pointing to the location the CSV file is written to. * @param rowDelimiter The row delimiter to separate Tuples. * @param fieldDelimiter The field delimiter to separate Tuple fields. - * + * * @see Tuple * @see CsvOutputFormat * @see DataSet#writeAsText(String) Output files and directories @@ -1561,15 +1607,16 @@ public DataSink writeAsCsv(String filePath, String rowDelimiter, String field } /** - * Writes a {@link Tuple} DataSet as CSV file(s) to the specified location with the specified field and line delimiters.
- * Note: Only a Tuple DataSet can written as a CSV file.
-§ * For each Tuple field the result of {@link Object#toString()} is written. - * + * Writes a {@link Tuple} DataSet as CSV file(s) to the specified location with the specified field and line delimiters. + * + *

Note: Only a Tuple DataSet can written as a CSV file. + * For each Tuple field the result of {@link Object#toString()} is written. + * * @param filePath The path pointing to the location the CSV file is written to. * @param rowDelimiter The row delimiter to separate Tuples. * @param fieldDelimiter The field delimiter to separate Tuple fields. * @param writeMode The behavior regarding existing files. Options are NO_OVERWRITE and OVERWRITE. - * + * * @see Tuple * @see CsvOutputFormat * @see DataSet#writeAsText(String) Output files and directories @@ -1577,27 +1624,27 @@ public DataSink writeAsCsv(String filePath, String rowDelimiter, String field public DataSink writeAsCsv(String filePath, String rowDelimiter, String fieldDelimiter, WriteMode writeMode) { return internalWriteAsCsv(new Path(filePath), rowDelimiter, fieldDelimiter, writeMode); } - + @SuppressWarnings("unchecked") private DataSink internalWriteAsCsv(Path filePath, String rowDelimiter, String fieldDelimiter, WriteMode wm) { Preconditions.checkArgument(getType().isTupleType(), "The writeAsCsv() method can only be used on data sets of tuples."); CsvOutputFormat of = new CsvOutputFormat<>(filePath, rowDelimiter, fieldDelimiter); - if(wm != null) { + if (wm != null) { of.setWriteMode(wm); } return output((OutputFormat) of); } - + /** * Prints the elements in a DataSet to the standard output stream {@link System#out} of the JVM that calls * the print() method. For programs that are executed in a cluster, this method needs * to gather the contents of the DataSet back to the client, to print it there. - * - *

The string written for each element is defined by the {@link Object#toString()} method.

- * + * + *

The string written for each element is defined by the {@link Object#toString()} method. + * *

This method immediately triggers the program execution, similar to the - * {@link #collect()} and {@link #count()} methods.

- * + * {@link #collect()} and {@link #count()} methods. + * * @see #printToErr() * @see #printOnTaskManager(String) */ @@ -1613,11 +1660,11 @@ public void print() throws Exception { * the print() method. For programs that are executed in a cluster, this method needs * to gather the contents of the DataSet back to the client, to print it there. * - *

The string written for each element is defined by the {@link Object#toString()} method.

+ *

The string written for each element is defined by the {@link Object#toString()} method. * *

This method immediately triggers the program execution, similar to the - * {@link #collect()} and {@link #count()} methods.

- * + * {@link #collect()} and {@link #count()} methods. + * * @see #print() * @see #printOnTaskManager(String) */ @@ -1632,30 +1679,30 @@ public void printToErr() throws Exception { * Writes a DataSet to the standard output streams (stdout) of the TaskManagers that execute * the program (or more specifically, the data sink operators). On a typical cluster setup, the * data will appear in the TaskManagers' .out files. - * + * *

To print the data to the console or stdout stream of the client process instead, use the - * {@link #print()} method.

- * - *

For each element of the DataSet the result of {@link Object#toString()} is written.

+ * {@link #print()} method. + * + *

For each element of the DataSet the result of {@link Object#toString()} is written. * * @param prefix The string to prefix each line of the output with. This helps identifying outputs - * from different printing sinks. + * from different printing sinks. * @return The DataSink operator that writes the DataSet. - * + * * @see #print() */ public DataSink printOnTaskManager(String prefix) { return output(new PrintingOutputFormat(prefix, false)); } - + /** * Writes a DataSet to the standard output stream (stdout). - * - *

For each element of the DataSet the result of {@link Object#toString()} is written.

+ * + *

For each element of the DataSet the result of {@link Object#toString()} is written. * * @param sinkIdentifier The string to prefix the output with. * @return The DataSink that writes the DataSet. - * + * * @deprecated Use {@link #printOnTaskManager(String)} instead. */ @Deprecated @@ -1666,29 +1713,28 @@ public DataSink print(String sinkIdentifier) { /** * Writes a DataSet to the standard error stream (stderr). - * - *

For each element of the DataSet the result of {@link Object#toString()} is written.

+ * + *

For each element of the DataSet the result of {@link Object#toString()} is written. * * @param sinkIdentifier The string to prefix the output with. * @return The DataSink that writes the DataSet. - * - * @deprecated Use {@link #printOnTaskManager(String)} instead, othe - * {@link PrintingOutputFormat} instead. + * + * @deprecated Use {@link #printOnTaskManager(String)} instead, or the {@link PrintingOutputFormat}. */ @Deprecated @PublicEvolving public DataSink printToErr(String sinkIdentifier) { return output(new PrintingOutputFormat(sinkIdentifier, true)); } - + /** * Writes a DataSet using a {@link FileOutputFormat} to a specified location. * This method adds a data sink to the program. - * + * * @param outputFormat The FileOutputFormat to write the DataSet. * @param filePath The path to the location where the DataSet is written. * @return The DataSink that writes the DataSet. - * + * * @see FileOutputFormat */ public DataSink write(FileOutputFormat outputFormat, String filePath) { @@ -1698,16 +1744,16 @@ public DataSink write(FileOutputFormat outputFormat, String filePath) { outputFormat.setOutputFilePath(new Path(filePath)); return output(outputFormat); } - + /** * Writes a DataSet using a {@link FileOutputFormat} to a specified location. * This method adds a data sink to the program. - * + * * @param outputFormat The FileOutputFormat to write the DataSet. * @param filePath The path to the location where the DataSet is written. * @param writeMode The mode of writing, indicating whether to overwrite existing files. * @return The DataSink that writes the DataSet. - * + * * @see FileOutputFormat */ public DataSink write(FileOutputFormat outputFormat, String filePath, WriteMode writeMode) { @@ -1719,26 +1765,26 @@ public DataSink write(FileOutputFormat outputFormat, String filePath, Writ outputFormat.setWriteMode(writeMode); return output(outputFormat); } - + /** * Emits a DataSet using an {@link OutputFormat}. This method adds a data sink to the program. * Programs may have multiple data sinks. A DataSet may also have multiple consumers (data sinks * or transformations) at the same time. - * + * * @param outputFormat The OutputFormat to process the DataSet. * @return The DataSink that processes the DataSet. - * + * * @see OutputFormat * @see DataSink */ public DataSink output(OutputFormat outputFormat) { Preconditions.checkNotNull(outputFormat); - + // configure the type if needed if (outputFormat instanceof InputTypeConfigurable) { - ((InputTypeConfigurable) outputFormat).setInputType(getType(), context.getConfig() ); + ((InputTypeConfigurable) outputFormat).setInputType(getType(), context.getConfig()); } - + DataSink sink = new DataSink<>(this, outputFormat, getType()); this.context.registerDataSink(sink); return sink; @@ -1747,12 +1793,11 @@ public DataSink output(OutputFormat outputFormat) { // -------------------------------------------------------------------------------------------- // Utilities // -------------------------------------------------------------------------------------------- - + protected static void checkSameExecutionContext(DataSet set1, DataSet set2) { if (set1.getExecutionEnvironment() != set2.getExecutionEnvironment()) { throw new IllegalArgumentException("The two inputs have different execution contexts."); } } - } diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java index 3d8a38471c990..c09916815a891 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java @@ -18,11 +18,9 @@ package org.apache.flink.api.java; -import com.esotericsoftware.kryo.Serializer; - -import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.Public; +import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.JobExecutionResult; @@ -62,9 +60,9 @@ import org.apache.flink.util.SplittableIterator; import org.apache.flink.util.Visitor; +import com.esotericsoftware.kryo.Serializer; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Job; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -84,11 +82,11 @@ * The ExecutionEnvironment is the context in which a program is executed. A * {@link LocalEnvironment} will cause execution in the current JVM, a * {@link RemoteEnvironment} will cause execution on a remote setup. - *

- * The environment provides methods to control the job execution (such as setting the parallelism) + * + *

The environment provides methods to control the job execution (such as setting the parallelism) * and to interact with the outside world (data access). - *

- * Please note that the execution environment needs strong type information for the input and return types + * + *

Please note that the execution environment needs strong type information for the input and return types * of all operations that are executed. This means that the environments needs to know that the return * value of an operation is for example a Tuple of String and Integer. * Because the Java compiler throws much of the generic type information away, most methods attempt to re- @@ -101,13 +99,13 @@ @Public public abstract class ExecutionEnvironment { - /** The logger used by the environment and its subclasses */ + /** The logger used by the environment and its subclasses. */ protected static final Logger LOG = LoggerFactory.getLogger(ExecutionEnvironment.class); - /** The environment of the context (local by default, cluster if invoked through command line) */ + /** The environment of the context (local by default, cluster if invoked through command line). */ private static ExecutionEnvironmentFactory contextEnvironmentFactory; - /** The default parallelism used by local environments */ + /** The default parallelism used by local environments. */ private static int defaultLocalDop = Runtime.getRuntime().availableProcessors(); // -------------------------------------------------------------------------------------------- @@ -118,20 +116,19 @@ public abstract class ExecutionEnvironment { private final ExecutionConfig config = new ExecutionConfig(); - /** Result from the latest execution, to make it retrievable when using eager execution methods */ + /** Result from the latest execution, to make it retrievable when using eager execution methods. */ protected JobExecutionResult lastJobExecutionResult; /** The ID of the session, defined by this execution environment. Sessions and Jobs are same in - * Flink, as Jobs can consist of multiple parts that are attached to the growing dataflow graph */ + * Flink, as Jobs can consist of multiple parts that are attached to the growing dataflow graph. */ protected JobID jobID; - /** The session timeout in seconds */ + /** The session timeout in seconds. */ protected long sessionTimeout; - /** Flag to indicate whether sinks have been cleared in previous executions */ + /** Flag to indicate whether sinks have been cleared in previous executions. */ private boolean wasExecuted = false; - /** * Creates a new Execution Environment. */ @@ -171,8 +168,8 @@ public int getParallelism() { * Sets the parallelism for operations executed through this environment. * Setting a parallelism of x here will cause all operators (such as join, map, reduce) to run with * x parallel instances. - *

- * This method overrides the default parallelism for this environment. + * + *

This method overrides the default parallelism for this environment. * The {@link LocalEnvironment} uses by default a value equal to the number of hardware * contexts (CPU cores / threads). When executing the program via the command line client * from a JAR file, the default parallelism is the one configured for that setup. @@ -316,7 +313,7 @@ public long getSessionTimeout() { /** * Adds a new Kryo default serializer to the Runtime. * - * Note that the serializer instance must be serializable (as defined by java.io.Serializable), + *

Note that the serializer instance must be serializable (as defined by java.io.Serializable), * because it may be distributed to the worker nodes by java serialization. * * @param type The class of the types serialized with the given serializer. @@ -339,7 +336,7 @@ public void addDefaultKryoSerializer(Class type, ClassNote that the serializer instance must be serializable (as defined by java.io.Serializable), * because it may be distributed to the worker nodes by java serialization. * * @param type The class of the types serialized with the given serializer. @@ -350,7 +347,7 @@ public & Serializable>void registerTypeWithKryoSerializ } /** - * Registers the given Serializer via its class as a serializer for the given type at the KryoSerializer + * Registers the given Serializer via its class as a serializer for the given type at the KryoSerializer. * * @param type The class of the types serialized with the given serializer. * @param serializerClass The class of the serializer to use. @@ -423,8 +420,8 @@ public DataSource readTextFile(String filePath, String charsetName) { * This method is similar to {@link #readTextFile(String)}, but it produces a DataSet with mutable * {@link StringValue} objects, rather than Java Strings. StringValues can be used to tune implementations * to be less object and garbage collection heavy. - *

- * The file will be read with the system's default character set. + * + *

The file will be read with the system's default character set. * * @param filePath The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path"). * @return A {@link DataSet} that represents the data read from the given file as text lines. @@ -440,8 +437,8 @@ public DataSource readTextFileWithValue(String filePath) { * This method is similar to {@link #readTextFile(String, String)}, but it produces a DataSet with mutable * {@link StringValue} objects, rather than Java Strings. StringValues can be used to tune implementations * to be less object and garbage collection heavy. - *

- * The {@link java.nio.charset.Charset} with the given name will be used to read the files. + * + *

The {@link java.nio.charset.Charset} with the given name will be used to read the files. * * @param filePath The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path"). * @param charsetName The name of the character set used to read the file. @@ -532,8 +529,8 @@ public DataSource readFile(FileInputFormat inputFormat, String filePat * Generic method to create an input {@link DataSet} with in {@link InputFormat}. The DataSet will not be * immediately created - instead, this method returns a DataSet that will be lazily created from * the input format once the program is executed. - *

- * Since all data sets need specific information about their types, this method needs to determine + * + *

Since all data sets need specific information about their types, this method needs to determine * the type of the data produced by the input format. It will attempt to determine the data type * by reflection, unless the input format implements the {@link ResultTypeQueryable} interface. * In the latter case, this method will invoke the {@link ResultTypeQueryable#getProducedType()} @@ -563,8 +560,8 @@ public DataSource createInput(InputFormat inputFormat) { * Generic method to create an input DataSet with in {@link InputFormat}. The {@link DataSet} will not be * immediately created - instead, this method returns a {@link DataSet} that will be lazily created from * the input format once the program is executed. - *

- * The {@link DataSet} is typed to the given TypeInformation. This method is intended for input formats that + * + *

The {@link DataSet} is typed to the given TypeInformation. This method is intended for input formats that * where the return type cannot be determined by reflection analysis, and that do not implement the * {@link ResultTypeQueryable} interface. * @@ -595,7 +592,7 @@ public DataSource createInput(InputFormat inputFormat, TypeInformat */ @Deprecated @PublicEvolving - public DataSource> readHadoopFile(org.apache.hadoop.mapred.FileInputFormat mapredInputFormat, Class key, Class value, String inputPath, JobConf job) { + public DataSource> readHadoopFile(org.apache.hadoop.mapred.FileInputFormat mapredInputFormat, Class key, Class value, String inputPath, JobConf job) { DataSource> result = createHadoopInput(mapredInputFormat, key, value, job); org.apache.hadoop.mapred.FileInputFormat.addInputPath(job, new org.apache.hadoop.fs.Path(inputPath)); @@ -612,7 +609,7 @@ public DataSource> readHadoopFile(org.apache.hadoop.mapred.Fi */ @Deprecated @PublicEvolving - public DataSource> readSequenceFile(Class key, Class value, String inputPath) throws IOException { + public DataSource> readSequenceFile(Class key, Class value, String inputPath) throws IOException { return readHadoopFile(new org.apache.hadoop.mapred.SequenceFileInputFormat(), key, value, inputPath); } @@ -625,7 +622,7 @@ public DataSource> readSequenceFile(Class key, Class va */ @Deprecated @PublicEvolving - public DataSource> readHadoopFile(org.apache.hadoop.mapred.FileInputFormat mapredInputFormat, Class key, Class value, String inputPath) { + public DataSource> readHadoopFile(org.apache.hadoop.mapred.FileInputFormat mapredInputFormat, Class key, Class value, String inputPath) { return readHadoopFile(mapredInputFormat, key, value, inputPath, new JobConf()); } @@ -637,7 +634,7 @@ public DataSource> readHadoopFile(org.apache.hadoop.mapred.Fi */ @Deprecated @PublicEvolving - public DataSource> createHadoopInput(org.apache.hadoop.mapred.InputFormat mapredInputFormat, Class key, Class value, JobConf job) { + public DataSource> createHadoopInput(org.apache.hadoop.mapred.InputFormat mapredInputFormat, Class key, Class value, JobConf job) { HadoopInputFormat hadoopInputFormat = new HadoopInputFormat<>(mapredInputFormat, key, value, job); return this.createInput(hadoopInputFormat); @@ -652,7 +649,7 @@ public DataSource> createHadoopInput(org.apache.hadoop.mapred */ @Deprecated @PublicEvolving - public DataSource> readHadoopFile(org.apache.hadoop.mapreduce.lib.input.FileInputFormat mapreduceInputFormat, Class key, Class value, String inputPath, Job job) throws IOException { + public DataSource> readHadoopFile(org.apache.hadoop.mapreduce.lib.input.FileInputFormat mapreduceInputFormat, Class key, Class value, String inputPath, Job job) throws IOException { DataSource> result = createHadoopInput(mapreduceInputFormat, key, value, job); org.apache.hadoop.mapreduce.lib.input.FileInputFormat.addInputPath(job, new org.apache @@ -670,7 +667,7 @@ public DataSource> readHadoopFile(org.apache.hadoop.mapreduce */ @Deprecated @PublicEvolving - public DataSource> readHadoopFile(org.apache.hadoop.mapreduce.lib.input.FileInputFormat mapreduceInputFormat, Class key, Class value, String inputPath) throws IOException { + public DataSource> readHadoopFile(org.apache.hadoop.mapreduce.lib.input.FileInputFormat mapreduceInputFormat, Class key, Class value, String inputPath) throws IOException { return readHadoopFile(mapreduceInputFormat, key, value, inputPath, Job.getInstance()); } @@ -682,28 +679,28 @@ public DataSource> readHadoopFile(org.apache.hadoop.mapreduce */ @Deprecated @PublicEvolving - public DataSource> createHadoopInput(org.apache.hadoop.mapreduce.InputFormat mapreduceInputFormat, Class key, Class value, Job job) { + public DataSource> createHadoopInput(org.apache.hadoop.mapreduce.InputFormat mapreduceInputFormat, Class key, Class value, Job job) { org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat hadoopInputFormat = new org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<>(mapreduceInputFormat, key, value, job); return this.createInput(hadoopInputFormat); } - + // ----------------------------------- Collection --------------------------------------- - + /** * Creates a DataSet from the given non-empty collection. The type of the data set is that * of the elements in the collection. - *

- * The framework will try and determine the exact type from the collection elements. + * + *

The framework will try and determine the exact type from the collection elements. * In case of generic elements, it may be necessary to manually supply the type information * via {@link #fromCollection(Collection, TypeInformation)}. - *

- * Note that this operation will result in a non-parallel data source, i.e. a data source with + * + *

Note that this operation will result in a non-parallel data source, i.e. a data source with * a parallelism of one. - * + * * @param data The collection of elements to create the data set from. * @return A DataSet representing the given collection. - * + * * @see #fromCollection(Collection, TypeInformation) */ public DataSource fromCollection(Collection data) { @@ -713,86 +710,85 @@ public DataSource fromCollection(Collection data) { if (data.size() == 0) { throw new IllegalArgumentException("The size of the collection must not be empty."); } - + X firstValue = data.iterator().next(); - + TypeInformation type = TypeExtractor.getForObject(firstValue); CollectionInputFormat.checkCollection(data, type.getTypeClass()); return new DataSource<>(this, new CollectionInputFormat<>(data, type.createSerializer(config)), type, Utils.getCallLocationName()); } - + /** * Creates a DataSet from the given non-empty collection. Note that this operation will result * in a non-parallel data source, i.e. a data source with a parallelism of one. - *

- * The returned DataSet is typed to the given TypeInformation. - * + * + *

The returned DataSet is typed to the given TypeInformation. + * * @param data The collection of elements to create the data set from. * @param type The TypeInformation for the produced data set. * @return A DataSet representing the given collection. - * + * * @see #fromCollection(Collection) */ public DataSource fromCollection(Collection data, TypeInformation type) { return fromCollection(data, type, Utils.getCallLocationName()); } - + private DataSource fromCollection(Collection data, TypeInformation type, String callLocationName) { CollectionInputFormat.checkCollection(data, type.getTypeClass()); return new DataSource<>(this, new CollectionInputFormat<>(data, type.createSerializer(config)), type, callLocationName); } - + /** * Creates a DataSet from the given iterator. Because the iterator will remain unmodified until * the actual execution happens, the type of data returned by the iterator must be given * explicitly in the form of the type class (this is due to the fact that the Java compiler * erases the generic type information). - *

- * Note that this operation will result in a non-parallel data source, i.e. a data source with + * + *

Note that this operation will result in a non-parallel data source, i.e. a data source with * a parallelism of one. - * + * * @param data The collection of elements to create the data set from. * @param type The class of the data produced by the iterator. Must not be a generic class. * @return A DataSet representing the elements in the iterator. - * + * * @see #fromCollection(Iterator, TypeInformation) */ public DataSource fromCollection(Iterator data, Class type) { return fromCollection(data, TypeExtractor.getForClass(type)); } - + /** * Creates a DataSet from the given iterator. Because the iterator will remain unmodified until * the actual execution happens, the type of data returned by the iterator must be given * explicitly in the form of the type information. This method is useful for cases where the type * is generic. In that case, the type class (as given in {@link #fromCollection(Iterator, Class)} * does not supply all type information. - *

- * Note that this operation will result in a non-parallel data source, i.e. a data source with + * + *

Note that this operation will result in a non-parallel data source, i.e. a data source with * a parallelism of one. - * + * * @param data The collection of elements to create the data set from. * @param type The TypeInformation for the produced data set. * @return A DataSet representing the elements in the iterator. - * + * * @see #fromCollection(Iterator, Class) */ public DataSource fromCollection(Iterator data, TypeInformation type) { return new DataSource<>(this, new IteratorInputFormat<>(data), type, Utils.getCallLocationName()); } - - + /** * Creates a new data set that contains the given elements. The elements must all be of the same type, * for example, all of the {@link String} or {@link Integer}. The sequence of elements must not be empty. - *

- * The framework will try and determine the exact type from the collection elements. + * + *

The framework will try and determine the exact type from the collection elements. * In case of generic elements, it may be necessary to manually supply the type information * via {@link #fromCollection(Collection, TypeInformation)}. - *

- * Note that this operation will result in a non-parallel data source, i.e. a data source with + * + *

Note that this operation will result in a non-parallel data source, i.e. a data source with * a parallelism of one. - * + * * @param data The elements to make up the data set. * @return A DataSet representing the given list of elements. */ @@ -804,7 +800,7 @@ public final DataSource fromElements(X... data) { if (data.length == 0) { throw new IllegalArgumentException("The number of elements must not be zero."); } - + TypeInformation typeInfo; try { typeInfo = TypeExtractor.getForObject(data[0]); @@ -817,10 +813,10 @@ public final DataSource fromElements(X... data) { return fromCollection(Arrays.asList(data), typeInfo, Utils.getCallLocationName()); } - + /** - * Creates a new data set that contains the given elements. The framework will determine the type according to the - * based type user supplied. The elements should be the same or be the subclass to the based type. + * Creates a new data set that contains the given elements. The framework will determine the type according to the + * based type user supplied. The elements should be the same or be the subclass to the based type. * The sequence of elements must not be empty. * Note that this operation will result in a non-parallel data source, i.e. a data source with * a parallelism of one. @@ -837,7 +833,7 @@ public final DataSource fromElements(Class type, X... data) { if (data.length == 0) { throw new IllegalArgumentException("The number of elements must not be zero."); } - + TypeInformation typeInfo; try { typeInfo = TypeExtractor.getForClass(type); @@ -850,136 +846,135 @@ public final DataSource fromElements(Class type, X... data) { return fromCollection(Arrays.asList(data), typeInfo, Utils.getCallLocationName()); } - - + /** * Creates a new data set that contains elements in the iterator. The iterator is splittable, allowing the * framework to create a parallel data source that returns the elements in the iterator. - *

- * Because the iterator will remain unmodified until the actual execution happens, the type of data + * + *

Because the iterator will remain unmodified until the actual execution happens, the type of data * returned by the iterator must be given explicitly in the form of the type class (this is due to the * fact that the Java compiler erases the generic type information). - * + * * @param iterator The iterator that produces the elements of the data set. * @param type The class of the data produced by the iterator. Must not be a generic class. * @return A DataSet representing the elements in the iterator. - * + * * @see #fromParallelCollection(SplittableIterator, TypeInformation) */ public DataSource fromParallelCollection(SplittableIterator iterator, Class type) { return fromParallelCollection(iterator, TypeExtractor.getForClass(type)); } - + /** * Creates a new data set that contains elements in the iterator. The iterator is splittable, allowing the * framework to create a parallel data source that returns the elements in the iterator. - *

- * Because the iterator will remain unmodified until the actual execution happens, the type of data + * + *

Because the iterator will remain unmodified until the actual execution happens, the type of data * returned by the iterator must be given explicitly in the form of the type information. * This method is useful for cases where the type is generic. In that case, the type class * (as given in {@link #fromParallelCollection(SplittableIterator, Class)} does not supply all type information. - * + * * @param iterator The iterator that produces the elements of the data set. * @param type The TypeInformation for the produced data set. * @return A DataSet representing the elements in the iterator. - * + * * @see #fromParallelCollection(SplittableIterator, Class) */ public DataSource fromParallelCollection(SplittableIterator iterator, TypeInformation type) { return fromParallelCollection(iterator, type, Utils.getCallLocationName()); } - + // private helper for passing different call location names private DataSource fromParallelCollection(SplittableIterator iterator, TypeInformation type, String callLocationName) { return new DataSource<>(this, new ParallelIteratorInputFormat<>(iterator), type, callLocationName); } - + /** * Creates a new data set that contains a sequence of numbers. The data set will be created in parallel, * so there is no guarantee about the order of the elements. - * + * * @param from The number to start at (inclusive). * @param to The number to stop at (inclusive). * @return A DataSet, containing all number in the {@code [from, to]} interval. */ public DataSource generateSequence(long from, long to) { return fromParallelCollection(new NumberSequenceIterator(from, to), BasicTypeInfo.LONG_TYPE_INFO, Utils.getCallLocationName()); - } - + } + // -------------------------------------------------------------------------------------------- // Executing // -------------------------------------------------------------------------------------------- - + /** * Triggers the program execution. The environment will execute all parts of the program that have * resulted in a "sink" operation. Sink operations are for example printing results ({@link DataSet#print()}, * writing results (e.g. {@link DataSet#writeAsText(String)}, * {@link DataSet#write(org.apache.flink.api.common.io.FileOutputFormat, String)}, or other generic * data sinks created with {@link DataSet#output(org.apache.flink.api.common.io.OutputFormat)}. - *

- * The program execution will be logged and displayed with a generated default name. - * + * + *

The program execution will be logged and displayed with a generated default name. + * * @return The result of the job execution, containing elapsed time and accumulators. * @throws Exception Thrown, if the program executions fails. */ public JobExecutionResult execute() throws Exception { return execute(getDefaultName()); } - + /** * Triggers the program execution. The environment will execute all parts of the program that have * resulted in a "sink" operation. Sink operations are for example printing results ({@link DataSet#print()}, * writing results (e.g. {@link DataSet#writeAsText(String)}, * {@link DataSet#write(org.apache.flink.api.common.io.FileOutputFormat, String)}, or other generic * data sinks created with {@link DataSet#output(org.apache.flink.api.common.io.OutputFormat)}. - *

- * The program execution will be logged and displayed with the given job name. - * + * + *

The program execution will be logged and displayed with the given job name. + * * @return The result of the job execution, containing elapsed time and accumulators. * @throws Exception Thrown, if the program executions fails. */ public abstract JobExecutionResult execute(String jobName) throws Exception; /** - * Creates the plan with which the system will execute the program, and returns it as + * Creates the plan with which the system will execute the program, and returns it as * a String using a JSON representation of the execution data flow graph. * Note that this needs to be called, before the plan is executed. - * + * * @return The execution plan of the program, as a JSON String. * @throws Exception Thrown, if the compiler could not be instantiated, or the master could not * be contacted to retrieve information relevant to the execution planning. */ public abstract String getExecutionPlan() throws Exception; - + /** * Registers a file at the distributed cache under the given name. The file will be accessible * from any user-defined function in the (distributed) runtime under a local path. Files * may be local files (as long as all relevant workers have access to it), or files in a distributed file system. * The runtime will copy the files temporarily to a local cache, if needed. - *

- * The {@link org.apache.flink.api.common.functions.RuntimeContext} can be obtained inside UDFs via + * + *

The {@link org.apache.flink.api.common.functions.RuntimeContext} can be obtained inside UDFs via * {@link org.apache.flink.api.common.functions.RichFunction#getRuntimeContext()} and provides access - * {@link org.apache.flink.api.common.cache.DistributedCache} via + * {@link org.apache.flink.api.common.cache.DistributedCache} via * {@link org.apache.flink.api.common.functions.RuntimeContext#getDistributedCache()}. - * + * * @param filePath The path of the file, as a URI (e.g. "file:///some/path" or "hdfs://host:port/and/path") * @param name The name under which the file is registered. */ public void registerCachedFile(String filePath, String name){ registerCachedFile(filePath, name, false); } - + /** * Registers a file at the distributed cache under the given name. The file will be accessible * from any user-defined function in the (distributed) runtime under a local path. Files - * may be local files (as long as all relevant workers have access to it), or files in a distributed file system. + * may be local files (as long as all relevant workers have access to it), or files in a distributed file system. * The runtime will copy the files temporarily to a local cache, if needed. - *

- * The {@link org.apache.flink.api.common.functions.RuntimeContext} can be obtained inside UDFs via + * + *

The {@link org.apache.flink.api.common.functions.RuntimeContext} can be obtained inside UDFs via * {@link org.apache.flink.api.common.functions.RichFunction#getRuntimeContext()} and provides access - * {@link org.apache.flink.api.common.cache.DistributedCache} via + * {@link org.apache.flink.api.common.cache.DistributedCache} via * {@link org.apache.flink.api.common.functions.RuntimeContext#getDistributedCache()}. - * + * * @param filePath The path of the file, as a URI (e.g. "file:///some/path" or "hdfs://host:port/and/path") * @param name The name under which the file is registered. * @param executable flag indicating whether the file should be executable @@ -987,11 +982,11 @@ public void registerCachedFile(String filePath, String name){ public void registerCachedFile(String filePath, String name, boolean executable){ this.cacheFile.add(new Tuple2<>(name, new DistributedCacheEntry(filePath, executable))); } - + /** * Registers all files that were registered at this execution environment's cache registry of the * given plan's cache registry. - * + * * @param p The plan to register files at. * @throws IOException Thrown if checks for existence and sanity fail. */ @@ -1000,7 +995,7 @@ protected void registerCachedFilesWithPlan(Plan p) throws IOException { p.registerCachedFile(entry.f0, entry.f1); } } - + /** * Creates the program's {@link Plan}. The plan is a description of all data sources, data sinks, * and operations and how they interact, as an isolated unit that can be executed with a @@ -1008,14 +1003,14 @@ protected void registerCachedFilesWithPlan(Plan p) throws IOException { * executor is an alternative way to run a program and is only possible if the program consists * only of distributed operations. * This automatically starts a new stage of execution. - * + * * @return The program's plan. */ @Internal public Plan createProgramPlan() { return createProgramPlan(null); } - + /** * Creates the program's {@link Plan}. The plan is a description of all data sources, data sinks, * and operations and how they interact, as an isolated unit that can be executed with a @@ -1023,7 +1018,7 @@ public Plan createProgramPlan() { * executor is an alternative way to run a program and is only possible if the program consists * only of distributed operations. * This automatically starts a new stage of execution. - * + * * @param jobName The name attached to the plan (displayed in logs and monitoring). * @return The program's plan. */ @@ -1056,11 +1051,11 @@ public Plan createProgramPlan(String jobName, boolean clearSinks) { "Examples are writing the data set or printing it."); } } - + if (jobName == null) { jobName = getDefaultName(); } - + OperatorTranslation translator = new OperatorTranslation(); Plan plan = translator.translateToPlan(this.sinks, jobName); @@ -1068,19 +1063,20 @@ public Plan createProgramPlan(String jobName, boolean clearSinks) { plan.setDefaultParallelism(getParallelism()); } plan.setExecutionConfig(getConfig()); - + // Check plan for GenericTypeInfo's and register the types at the serializers. if (!config.isAutoTypeRegistrationDisabled()) { plan.accept(new Visitor>() { - + private final HashSet> deduplicator = new HashSet<>(); - + @Override public boolean preVisit(org.apache.flink.api.common.operators.Operator visitable) { OperatorInformation opInfo = visitable.getOperatorInfo(); Serializers.recursivelyRegisterType(opInfo.getOutputType(), config, deduplicator); return true; } + @Override public void postVisit(org.apache.flink.api.common.operators.Operator visitable) {} }); @@ -1091,7 +1087,7 @@ public void postVisit(org.apache.flink.api.common.operators.Operator visitabl } catch (Exception e) { throw new RuntimeException("Error while registering cached files: " + e.getMessage(), e); } - + // clear all the sinks such that the next execution does not redo everything if (clearSinks) { this.sinks.clear(); @@ -1107,17 +1103,17 @@ public void postVisit(org.apache.flink.api.common.operators.Operator visitabl config.getDefaultKryoSerializerClasses().size(); LOG.info("The job has {} registered types and {} default Kryo serializers", registeredTypes, defaultKryoSerializers); - if(config.isForceKryoEnabled() && config.isForceAvroEnabled()) { + if (config.isForceKryoEnabled() && config.isForceAvroEnabled()) { LOG.warn("In the ExecutionConfig, both Avro and Kryo are enforced. Using Kryo serializer"); } - if(config.isForceKryoEnabled()) { + if (config.isForceKryoEnabled()) { LOG.info("Using KryoSerializer for serializing POJOs"); } - if(config.isForceAvroEnabled()) { + if (config.isForceAvroEnabled()) { LOG.info("Using AvroSerializer for serializing POJOs"); } - if(LOG.isDebugEnabled()) { + if (LOG.isDebugEnabled()) { LOG.debug("Registered Kryo types: {}", config.getRegisteredKryoTypes().toString()); LOG.debug("Registered Kryo with Serializers types: {}", config.getRegisteredTypesWithKryoSerializers().entrySet().toString()); LOG.debug("Registered Kryo with Serializer Classes types: {}", config.getRegisteredTypesWithKryoSerializerClasses().entrySet().toString()); @@ -1131,27 +1127,27 @@ public void postVisit(org.apache.flink.api.common.operators.Operator visitabl return plan; } - + /** * Adds the given sink to this environment. Only sinks that have been added will be executed once * the {@link #execute()} or {@link #execute(String)} method is called. - * + * * @param sink The sink to add for execution. */ @Internal void registerDataSink(DataSink sink) { this.sinks.add(sink); } - + /** * Gets a default job name, based on the timestamp when this method is invoked. - * + * * @return A default job name. */ private static String getDefaultName() { return "Flink Java Job at " + Calendar.getInstance().getTime(); } - + // -------------------------------------------------------------------------------------------- // Instantiation of Execution Contexts // -------------------------------------------------------------------------------------------- @@ -1161,11 +1157,11 @@ private static String getDefaultName() { * If the program is invoked standalone, this method returns a local execution environment, as returned by * {@link #createLocalEnvironment()}. If the program is invoked from within the command line client to be * submitted to a cluster, this method returns the execution environment of this cluster. - * + * * @return The execution environment of the context in which the program is executed. */ public static ExecutionEnvironment getExecutionEnvironment() { - return contextEnvironmentFactory == null ? + return contextEnvironmentFactory == null ? createLocalEnvironment() : contextEnvironmentFactory.createExecutionEnvironment(); } @@ -1187,18 +1183,18 @@ public static CollectionEnvironment createCollectionsEnvironment(){ * multi-threaded fashion in the same JVM as the environment was created in. The default * parallelism of the local environment is the number of hardware contexts (CPU cores / threads), * unless it was specified differently by {@link #setDefaultLocalParallelism(int)}. - * + * * @return A local execution environment. */ public static LocalEnvironment createLocalEnvironment() { return createLocalEnvironment(defaultLocalDop); } - + /** * Creates a {@link LocalEnvironment}. The local execution environment will run the program in a * multi-threaded fashion in the same JVM as the environment was created in. It will use the * parallelism specified in the parameter. - * + * * @param parallelism The parallelism for the local environment. * @return A local execution environment with the specified parallelism. */ @@ -1244,13 +1240,13 @@ public static ExecutionEnvironment createLocalEnvironmentWithWebUI(Configuration } /** - * Creates a {@link RemoteEnvironment}. The remote environment sends (parts of) the program + * Creates a {@link RemoteEnvironment}. The remote environment sends (parts of) the program * to a cluster for execution. Note that all file paths used in the program must be accessible from the * cluster. The execution will use the cluster's default parallelism, unless the parallelism is * set explicitly via {@link ExecutionEnvironment#setParallelism(int)}. - * + * * @param host The host name or address of the master (JobManager), where the program should be executed. - * @param port The port of the master (JobManager), where the program should be executed. + * @param port The port of the master (JobManager), where the program should be executed. * @param jarFiles The JAR files with code that needs to be shipped to the cluster. If the program uses * user-defined functions, user-defined input formats, or any libraries, those must be * provided in the JAR files. @@ -1266,7 +1262,7 @@ public static ExecutionEnvironment createRemoteEnvironment(String host, int port * cluster. The custom configuration file is used to configure Akka specific configuration parameters * for the Client only; Program parallelism can be set via {@link ExecutionEnvironment#setParallelism(int)}. * - * Cluster configuration has to be done in the remotely running Flink instance. + *

Cluster configuration has to be done in the remotely running Flink instance. * * @param host The host name or address of the master (JobManager), where the program should be executed. * @param port The port of the master (JobManager), where the program should be executed. @@ -1282,12 +1278,12 @@ public static ExecutionEnvironment createRemoteEnvironment( } /** - * Creates a {@link RemoteEnvironment}. The remote environment sends (parts of) the program + * Creates a {@link RemoteEnvironment}. The remote environment sends (parts of) the program * to a cluster for execution. Note that all file paths used in the program must be accessible from the * cluster. The execution will use the specified parallelism. - * + * * @param host The host name or address of the master (JobManager), where the program should be executed. - * @param port The port of the master (JobManager), where the program should be executed. + * @param port The port of the master (JobManager), where the program should be executed. * @param parallelism The parallelism to use during the execution. * @param jarFiles The JAR files with code that needs to be shipped to the cluster. If the program uses * user-defined functions, user-defined input formats, or any libraries, those must be @@ -1307,7 +1303,7 @@ public static ExecutionEnvironment createRemoteEnvironment(String host, int port /** * Gets the default parallelism that will be used for the local execution environment created by * {@link #createLocalEnvironment()}. - * + * * @return The default local parallelism */ public static int getDefaultLocalParallelism() { @@ -1317,7 +1313,7 @@ public static int getDefaultLocalParallelism() { /** * Sets the default parallelism that will be used for the local execution environment created by * {@link #createLocalEnvironment()}. - * + * * @param parallelism The parallelism to use as the default local parallelism. */ public static void setDefaultLocalParallelism(int parallelism) { @@ -1333,9 +1329,9 @@ public static void setDefaultLocalParallelism(int parallelism) { * Sets a context environment factory, that creates the context environment for running programs * with pre-configured environments. Examples are running programs from the command line, and * running programs in the Scala shell. - * + * *

When the context environment factory is set, no other environments can be explicitly used. - * + * * @param ctx The context environment factory. */ protected static void initializeContextEnvironment(ExecutionEnvironmentFactory ctx) { @@ -1354,7 +1350,7 @@ protected static void resetContextEnvironment() { /** * Checks whether it is currently permitted to explicitly instantiate a LocalEnvironment * or a RemoteEnvironment. - * + * * @return True, if it is possible to explicitly instantiate a LocalEnvironment or a * RemoteEnvironment, false otherwise. */ diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironmentFactory.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironmentFactory.java index b75835f6fe586..829049ebce338 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironmentFactory.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironmentFactory.java @@ -25,10 +25,10 @@ */ @Public public interface ExecutionEnvironmentFactory { - + /** * Creates an ExecutionEnvironment from this factory. - * + * * @return An ExecutionEnvironment. */ ExecutionEnvironment createExecutionEnvironment(); diff --git a/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java index 0b2567a9f7171..dcb71cb848699 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java @@ -18,8 +18,8 @@ package org.apache.flink.api.java; -import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.annotation.Public; +import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.JobExecutionResult; @@ -31,28 +31,28 @@ /** * An {@link ExecutionEnvironment} that runs the program locally, multi-threaded, in the JVM where the * environment is instantiated. - * + * *

When this environment is instantiated, it uses a default parallelism of {@code 1}. The default * parallelism can be set via {@link #setParallelism(int)}. - * + * *

Local environments can also be instantiated through {@link ExecutionEnvironment#createLocalEnvironment()} * and {@link ExecutionEnvironment#createLocalEnvironment(int)}. The former version will pick a * default parallelism equal to the number of hardware contexts in the local machine. */ @Public public class LocalEnvironment extends ExecutionEnvironment { - - /** The user-defined configuration for the local execution */ + + /** The user-defined configuration for the local execution. */ private final Configuration configuration; - /** Create lazily upon first use */ + /** Create lazily upon first use. */ private PlanExecutor executor; /** In case we keep the executor alive for sessions, this reaper shuts it down eventually. * The reaper's finalize method triggers the executor shutdown. */ @SuppressWarnings("all") private ExecutorReaper executorReaper; - + /** * Creates a new local environment. */ @@ -62,7 +62,7 @@ public LocalEnvironment() { /** * Creates a new local environment that configures its local executor with the given configuration. - * + * * @param config The configuration used to configure the local executor. */ public LocalEnvironment(Configuration config) { @@ -73,9 +73,9 @@ public LocalEnvironment(Configuration config) { } this.configuration = config == null ? new Configuration() : config; } - + // -------------------------------------------------------------------------------------------- - + @Override public JobExecutionResult execute(String jobName) throws Exception { if (executor == null) { @@ -93,11 +93,11 @@ public JobExecutionResult execute(String jobName) throws Exception { this.lastJobExecutionResult = result; return result; } - + @Override public String getExecutionPlan() throws Exception { Plan p = createProgramPlan(null, false); - + // make sure that we do not start an executor in any case here. // if one runs, fine, of not, we only create the class but disregard immediately afterwards if (executor != null) { @@ -118,15 +118,15 @@ public void startNewSession() throws Exception { // create also a new JobID jobID = JobID.generate(); } - + // create a new local executor executor = PlanExecutor.createLocalExecutor(configuration); executor.setPrintStatusDuringExecution(getConfig().isSysoutLoggingEnabled()); - + // if we have a session, start the mini cluster eagerly to have it available across sessions if (getSessionTimeout() > 0) { executor.start(); - + // also install the reaper that will shut it down eventually executorReaper = new ExecutorReaper(executor); } @@ -135,7 +135,7 @@ public void startNewSession() throws Exception { // ------------------------------------------------------------------------ // Utilities // ------------------------------------------------------------------------ - + @Override public String toString() { return "Local Environment (parallelism = " + (getParallelism() == ExecutionConfig.PARALLELISM_DEFAULT ? "default" : getParallelism()) @@ -150,7 +150,7 @@ public String toString() { * This thread shuts down the local executor. * *

IMPORTANT: This must be a static inner class to hold no reference to the outer class. - * Otherwise, the outer class could never become garbage collectible while this thread runs.

+ * Otherwise, the outer class could never become garbage collectible while this thread runs. */ private static class ShutdownThread extends Thread { diff --git a/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java index 34a54babbb545..fa223bd406367 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java @@ -18,8 +18,8 @@ package org.apache.flink.api.java; -import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.annotation.Public; +import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.JobID; @@ -39,7 +39,7 @@ * An {@link ExecutionEnvironment} that sends programs to a cluster for execution. The environment * needs to be created with the address and port of the JobManager of the Flink cluster that * should execute the programs. - * + * *

Many programs executed via the remote environment depend on additional classes. Such classes * may be the classes of functions (transformation, aggregation, ...) or libraries. Those classes * must be attached to the remote environment as JAR files, to allow the environment to ship the @@ -47,26 +47,26 @@ */ @Public public class RemoteEnvironment extends ExecutionEnvironment { - - /** The hostname of the JobManager */ + + /** The hostname of the JobManager. */ protected final String host; - /** The port of the JobManager main actor system */ + /** The port of the JobManager main actor system. */ protected final int port; - /** The jar files that need to be attached to each job */ + /** The jar files that need to be attached to each job. */ protected final List jarFiles; - /** The configuration used by the client that connects to the cluster */ + /** The configuration used by the client that connects to the cluster. */ protected Configuration clientConfiguration; - - /** The remote executor lazily created upon first use */ + + /** The remote executor lazily created upon first use. */ protected PlanExecutor executor; - - /** Optional shutdown hook, used in session mode to eagerly terminate the last session */ + + /** Optional shutdown hook, used in session mode to eagerly terminate the last session. */ private Thread shutdownHook; - /** The classpaths that need to be attached to each job */ + /** The classpaths that need to be attached to each job. */ protected final List globalClasspaths; /** @@ -105,7 +105,7 @@ public RemoteEnvironment(String host, int port, Configuration clientConfig, Stri /** * Creates a new RemoteEnvironment that points to the master (JobManager) described by the * given host name and port. - * + * *

Each program execution will have all the given JAR files in its classpath. * * @param host The host name or address of the master (JobManager), where the program should be executed. @@ -114,8 +114,8 @@ public RemoteEnvironment(String host, int port, Configuration clientConfig, Stri * @param jarFiles The JAR files with code that needs to be shipped to the cluster. If the program uses * user-defined functions, user-defined input formats, or any libraries, those must be * provided in the JAR files. - * @param globalClasspaths The paths of directories and JAR files that are added to each user code - * classloader on all nodes in the cluster. Note that the paths must specify a + * @param globalClasspaths The paths of directories and JAR files that are added to each user code + * classloader on all nodes in the cluster. Note that the paths must specify a * protocol (e.g. file://) and be accessible on all nodes (e.g. by means of a NFS share). * The protocol must be supported by the {@link java.net.URLClassLoader}. */ @@ -202,14 +202,14 @@ public void startNewSession() throws Exception { jobID = JobID.generate(); installShutdownHook(); } - + protected PlanExecutor getExecutor() throws Exception { if (executor == null) { executor = PlanExecutor.createRemoteExecutor(host, port, clientConfiguration, jarFiles, globalClasspaths); executor.setPrintStatusDuringExecution(getConfig().isSysoutLoggingEnabled()); } - + // if we are using sessions, we keep the executor running if (getSessionTimeout() > 0 && !executor.isRunning()) { executor.start(); @@ -237,7 +237,7 @@ protected void dispose() { LOG.warn("Exception while unregistering the cleanup shutdown hook."); } } - + try { PlanExecutor executor = this.executor; if (executor != null) { @@ -249,13 +249,13 @@ protected void dispose() { throw new RuntimeException("Failed to dispose the session shutdown hook."); } } - + @Override public String toString() { return "Remote Environment (" + this.host + ":" + this.port + " - parallelism = " + (getParallelism() == -1 ? "default" : getParallelism()) + ") : " + getIdString(); } - + // ------------------------------------------------------------------------ // Shutdown hooks and reapers // ------------------------------------------------------------------------ @@ -273,7 +273,7 @@ public void run() { } } }); - + try { // Add JVM shutdown hook to call shutdown of service Runtime.getRuntime().addShutdownHook(shutdownHook); diff --git a/flink-java/src/main/java/org/apache/flink/api/java/Utils.java b/flink-java/src/main/java/org/apache/flink/api/java/Utils.java index 36ccb23b265d1..44e176c4746bc 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/Utils.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/Utils.java @@ -18,8 +18,6 @@ package org.apache.flink.api.java; -import org.apache.commons.lang3.StringUtils; - import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.accumulators.SerializedListAccumulator; @@ -31,6 +29,8 @@ import org.apache.flink.api.java.typeutils.GenericTypeInfo; import org.apache.flink.configuration.Configuration; +import org.apache.commons.lang3.StringUtils; + import java.io.IOException; import java.lang.reflect.Field; import java.lang.reflect.Modifier; @@ -43,7 +43,7 @@ */ @Internal public final class Utils { - + public static final Random RNG = new Random(); public static String getCallLocationName() { @@ -63,12 +63,12 @@ public static String getCallLocationName(int depth) { } // -------------------------------------------------------------------------------------------- - + /** * Utility sink function that counts elements and writes the count into an accumulator, * from which it can be retrieved by the client. This sink is used by the * {@link DataSet#count()} function. - * + * * @param Type of elements to count. */ @SkipCodeAnalysis @@ -115,7 +115,7 @@ public static class CollectHelper extends RichOutputFormat { private final String id; private final TypeSerializer serializer; - + private SerializedListAccumulator accumulator; public CollectHelper(String id, TypeSerializer serializer) { @@ -143,6 +143,9 @@ public void close() { } } + /** + * Accumulator of {@link ChecksumHashCode}. + */ public static class ChecksumHashCode implements SimpleAccumulator { private static final long serialVersionUID = 1L; @@ -213,6 +216,10 @@ public String toString() { } } + /** + * {@link RichOutputFormat} for {@link ChecksumHashCode}. + * @param + */ @SkipCodeAnalysis public static class ChecksumHashCodeHelper extends RichOutputFormat { @@ -248,7 +255,6 @@ public void close() throws IOException { } } - // -------------------------------------------------------------------------------------------- /** @@ -262,19 +268,19 @@ public static String getSerializerTree(TypeInformation ti) { private static String getSerializerTree(TypeInformation ti, int indent) { String ret = ""; if (ti instanceof CompositeType) { - ret += StringUtils.repeat(' ', indent) + ti.getClass().getSimpleName()+"\n"; + ret += StringUtils.repeat(' ', indent) + ti.getClass().getSimpleName() + "\n"; CompositeType cti = (CompositeType) ti; String[] fieldNames = cti.getFieldNames(); for (int i = 0; i < cti.getArity(); i++) { TypeInformation fieldType = cti.getTypeAt(i); - ret += StringUtils.repeat(' ', indent + 2) + fieldNames[i]+":"+getSerializerTree(fieldType, indent); + ret += StringUtils.repeat(' ', indent + 2) + fieldNames[i] + ":" + getSerializerTree(fieldType, indent); } } else { if (ti instanceof GenericTypeInfo) { - ret += StringUtils.repeat(' ', indent) + "GenericTypeInfo ("+ti.getTypeClass().getSimpleName()+")\n"; + ret += StringUtils.repeat(' ', indent) + "GenericTypeInfo (" + ti.getTypeClass().getSimpleName() + ")\n"; ret += getGenericTypeTree(ti.getTypeClass(), indent + 4); } else { - ret += StringUtils.repeat(' ', indent) + ti.toString()+"\n"; + ret += StringUtils.repeat(' ', indent) + ti.toString() + "\n"; } } return ret; @@ -286,7 +292,7 @@ private static String getGenericTypeTree(Class type, int indent) { if (Modifier.isStatic(field.getModifiers()) || Modifier.isTransient(field.getModifiers())) { continue; } - ret += StringUtils.repeat(' ', indent) + field.getName() + ":" + field.getType().getName() + + ret += StringUtils.repeat(' ', indent) + field.getName() + ":" + field.getType().getName() + (field.getType().isEnum() ? " (is enum)" : "") + "\n"; if (!field.getType().isPrimitive()) { ret += getGenericTypeTree(field.getType(), indent + 4); diff --git a/flink-java/src/test/java/org/apache/flink/api/common/io/SequentialFormatTestBase.java b/flink-java/src/test/java/org/apache/flink/api/common/io/SequentialFormatTestBase.java index 039f4de89028e..60884538514ca 100644 --- a/flink-java/src/test/java/org/apache/flink/api/common/io/SequentialFormatTestBase.java +++ b/flink-java/src/test/java/org/apache/flink/api/common/io/SequentialFormatTestBase.java @@ -26,6 +26,7 @@ import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.util.TestLogger; + import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -45,7 +46,7 @@ */ public abstract class SequentialFormatTestBase extends TestLogger { - public class InputSplitSorter implements Comparator { + private class InputSplitSorter implements Comparator { @Override public int compare(FileInputSplit o1, FileInputSplit o2) { int pathOrder = o1.getPath().getName().compareTo(o2.getPath().getName()); @@ -74,7 +75,7 @@ public SequentialFormatTestBase(int numberOfTuples, long blockSize, int parallel } /** - * Count how many bytes would be written if all records were directly serialized + * Count how many bytes would be written if all records were directly serialized. */ @Before public void calcRawDataSize() throws IOException { @@ -83,7 +84,7 @@ public void calcRawDataSize() throws IOException { ByteCounter byteCounter = new ByteCounter(); for (int fileCount = 0; fileCount < this.getNumberOfTuplesPerFile(fileIndex); fileCount++, recordIndex++) { - writeRecord(this.getRecord(recordIndex), + writeRecord(this.getRecord(recordIndex), new DataOutputViewStreamWrapper(byteCounter)); } this.rawDataSizes[fileIndex] = byteCounter.getLength(); @@ -91,7 +92,7 @@ public void calcRawDataSize() throws IOException { } /** - * Checks if the expected input splits were created + * Checks if the expected input splits were created. */ @Test public void checkInputSplits() throws IOException { @@ -124,7 +125,7 @@ public void checkInputSplits() throws IOException { } /** - * Tests if the expected sequence and amount of data can be read + * Tests if the expected sequence and amount of data can be read. */ @Test public void checkRead() throws Exception { @@ -204,7 +205,7 @@ public void writeTuples() throws IOException { int recordIndex = 0; for (int fileIndex = 0; fileIndex < this.parallelism; fileIndex++) { BinaryOutputFormat output = createOutputFormat(this.tempFile.toURI() + "/" + - (fileIndex+1), configuration); + (fileIndex + 1), configuration); for (int fileCount = 0; fileCount < this.getNumberOfTuplesPerFile(fileIndex); fileCount++, recordIndex++) { output.writeRecord(this.getRecord(recordIndex)); } @@ -233,27 +234,26 @@ public void checkLength() { } } - abstract protected BinaryInputFormat createInputFormat(); + protected abstract BinaryInputFormat createInputFormat(); - abstract protected BinaryOutputFormat createOutputFormat(final String path, final - Configuration configuration) + protected abstract BinaryOutputFormat createOutputFormat(String path, Configuration configuration) throws IOException; - abstract protected int getInfoSize(); + protected abstract int getInfoSize(); /** - * Returns the record to write at the given position + * Returns the record to write at the given position. */ - abstract protected T getRecord(int index); + protected abstract T getRecord(int index); - abstract protected T createInstance(); + protected abstract T createInstance(); - abstract protected void writeRecord(T record, DataOutputView outputView) throws IOException; + protected abstract void writeRecord(T record, DataOutputView outputView) throws IOException; /** - * Checks if both records are equal + * Checks if both records are equal. */ - abstract protected void checkEquals(T expected, T actual); + protected abstract void checkEquals(T expected, T actual); private int getExpectedBlockCount(int fileIndex) { int expectedBlockCount = @@ -278,14 +278,14 @@ public static List getParameters() { /** * Counts the bytes that would be written. - * + * */ private static final class ByteCounter extends OutputStream { int length = 0; /** * Returns the length. - * + * * @return the length */ public int getLength() { diff --git a/flink-java/src/test/java/org/apache/flink/api/common/io/SerializedFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/common/io/SerializedFormatTest.java index ac1e19cb57923..cfcfdfd3c2731 100644 --- a/flink-java/src/test/java/org/apache/flink/api/common/io/SerializedFormatTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/common/io/SerializedFormatTest.java @@ -25,6 +25,7 @@ import org.apache.flink.types.IntValue; import org.apache.flink.types.Record; import org.apache.flink.types.StringValue; + import org.junit.Assert; import org.junit.Before; import org.junit.runner.RunWith; @@ -32,6 +33,9 @@ import java.io.IOException; +/** + * Tests for serialized formats. + */ @RunWith(Parameterized.class) public class SerializedFormatTest extends SequentialFormatTestBase { @@ -58,11 +62,9 @@ protected BinaryInputFormat createInputFormat() { return inputFormat; } - @Override protected BinaryOutputFormat createOutputFormat(String path, Configuration configuration) - throws IOException - { + throws IOException { final SerializedOutputFormat outputFormat = new SerializedOutputFormat(); outputFormat.setOutputFilePath(new Path(path)); outputFormat.setWriteMode(FileSystem.WriteMode.OVERWRITE); diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionAccumulatorsTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionAccumulatorsTest.java index dce9713d0d14f..821d9564b25b5 100644 --- a/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionAccumulatorsTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionAccumulatorsTest.java @@ -18,53 +18,59 @@ package org.apache.flink.api.common.operators; -import static org.junit.Assert.*; - import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.accumulators.IntCounter; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.io.DiscardingOutputFormat; import org.apache.flink.configuration.Configuration; + import org.junit.Test; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Tests for {@link CollectionExecutor} with accumulators. + */ public class CollectionExecutionAccumulatorsTest { private static final String ACCUMULATOR_NAME = "TEST ACC"; - + @Test public void testAccumulator() { try { - final int NUM_ELEMENTS = 100; - + final int numElements = 100; + ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment(); - - env.generateSequence(1, NUM_ELEMENTS) + + env.generateSequence(1, numElements) .map(new CountingMapper()) .output(new DiscardingOutputFormat()); - + JobExecutionResult result = env.execute(); - + assertTrue(result.getNetRuntime() >= 0); - assertEquals(NUM_ELEMENTS, (int) result.getAccumulatorResult(ACCUMULATOR_NAME)); + assertEquals(numElements, (int) result.getAccumulatorResult(ACCUMULATOR_NAME)); } catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); } } - + @SuppressWarnings("serial") - public static class CountingMapper extends RichMapFunction { - + private static class CountingMapper extends RichMapFunction { + private IntCounter accumulator; - + @Override public void open(Configuration parameters) { accumulator = getRuntimeContext().getIntCounter(ACCUMULATOR_NAME); } - + @Override public Long map(Long value) { accumulator.add(1); diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionIterationTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionIterationTest.java index 6ca0eb181005f..ddd0894ecfa6b 100644 --- a/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionIterationTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionIterationTest.java @@ -18,12 +18,6 @@ package org.apache.flink.api.common.operators; -//CHECKSTYLE.OFF: AvoidStarImport - Needed for TupleGenerator -import static org.junit.Assert.*; - -import java.util.ArrayList; -import java.util.List; - import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.JoinFunction; @@ -36,8 +30,18 @@ import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; + import org.junit.Test; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +/** + * Tests for {@link CollectionExecutor} with iterations. + */ @SuppressWarnings("serial") public class CollectionExecutionIterationTest implements java.io.Serializable { @@ -45,16 +49,16 @@ public class CollectionExecutionIterationTest implements java.io.Serializable { public void testBulkIteration() { try { ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment(); - + IterativeDataSet iteration = env.fromElements(1).iterate(10); - + DataSet result = iteration.closeWith(iteration.map(new AddSuperstepNumberMapper())); - + List collected = new ArrayList(); result.output(new LocalCollectionOutputFormat(collected)); - + env.execute(); - + assertEquals(1, collected.size()); assertEquals(56, collected.get(0).intValue()); } @@ -63,14 +67,14 @@ public void testBulkIteration() { fail(e.getMessage()); } } - + @Test public void testBulkIterationWithTerminationCriterion() { try { ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment(); - + IterativeDataSet iteration = env.fromElements(1).iterate(100); - + DataSet iterationResult = iteration.map(new AddSuperstepNumberMapper()); DataSet terminationCriterion = iterationResult.filter(new FilterFunction() { @@ -78,14 +82,14 @@ public boolean filter(Integer value) { return value < 50; } }); - + List collected = new ArrayList(); - + iteration.closeWith(iterationResult, terminationCriterion) .output(new LocalCollectionOutputFormat(collected)); - + env.execute(); - + assertEquals(1, collected.size()); assertEquals(56, collected.get(0).intValue()); } @@ -106,7 +110,7 @@ public void testDeltaIteration() { new Tuple2(2, 0), new Tuple2(3, 0), new Tuple2(4, 0)); - + @SuppressWarnings("unchecked") DataSet> workInput = env.fromElements( new Tuple1(1), @@ -114,7 +118,6 @@ public void testDeltaIteration() { new Tuple1(3), new Tuple1(4)); - // Perform a delta iteration where we add those values to the workset where // the second tuple field is smaller than the first tuple field. // At the end both tuple fields must be the same. @@ -144,7 +147,6 @@ public void flatMap(Tuple2 in, Collector> } }); - List> collected = new ArrayList>(); iteration.closeWith(solDelta, nextWorkset) @@ -162,9 +164,9 @@ public void flatMap(Tuple2 in, Collector> fail(e.getMessage()); } } - - public static class AddSuperstepNumberMapper extends RichMapFunction { - + + private static class AddSuperstepNumberMapper extends RichMapFunction { + @Override public Integer map(Integer value) { int superstep = getIterationRuntimeContext().getSuperstepNumber(); diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionWithBroadcastVariableTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionWithBroadcastVariableTest.java index 2cdd68faedf12..096e309516a0d 100644 --- a/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionWithBroadcastVariableTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionWithBroadcastVariableTest.java @@ -18,42 +18,48 @@ package org.apache.flink.api.common.operators; -import static org.junit.Assert.*; - -import java.util.ArrayList; -import java.util.List; - import org.apache.flink.api.common.functions.RichCrossFunction; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.io.LocalCollectionOutputFormat; import org.apache.flink.configuration.Configuration; + import org.junit.Test; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Tests for {@link CollectionExecutor} with broadcasted variables. + */ @SuppressWarnings("serial") public class CollectionExecutionWithBroadcastVariableTest { private static final String BC_VAR_NAME = "BC"; - - private final String[] TEST_DATA = { "A", "B", "C", "D" }; - private final String SUFFIX = "-suffixed"; - + + private static final String[] TEST_DATA = { "A", "B", "C", "D" }; + private static final String SUFFIX = "-suffixed"; + @Test public void testUnaryOp() { try { ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment(); - + DataSet bcData = env.fromElements(SUFFIX); - + List result = new ArrayList(); - + env.fromElements(TEST_DATA) .map(new SuffixAppender()).withBroadcastSet(bcData, BC_VAR_NAME) .output(new LocalCollectionOutputFormat(result)); - + env.execute(); - + assertEquals(TEST_DATA.length, result.size()); for (String s : result) { assertTrue(s.indexOf(SUFFIX) > 0); @@ -64,22 +70,22 @@ public void testUnaryOp() { fail(e.getMessage()); } } - + @Test public void testBinaryOp() { try { ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment(); - + DataSet bcData = env.fromElements(SUFFIX); DataSet inData = env.fromElements(TEST_DATA); - + List result = new ArrayList(); - + inData.cross(inData).with(new SuffixCross()).withBroadcastSet(bcData, BC_VAR_NAME) .output(new LocalCollectionOutputFormat(result)); - + env.execute(); - + assertEquals(TEST_DATA.length * TEST_DATA.length, result.size()); for (String s : result) { assertTrue(s.indexOf(SUFFIX) == 2); @@ -90,31 +96,31 @@ public void testBinaryOp() { fail(e.getMessage()); } } - - public static final class SuffixAppender extends RichMapFunction { - + + private static final class SuffixAppender extends RichMapFunction { + private String suffix; - + @Override public void open(Configuration parameters) { suffix = getRuntimeContext().getBroadcastVariable(BC_VAR_NAME).get(0); } - + @Override public String map(String value) { return value + suffix; } } - - public static final class SuffixCross extends RichCrossFunction { - + + private static final class SuffixCross extends RichCrossFunction { + private String suffix; - + @Override public void open(Configuration parameters) { suffix = getRuntimeContext().getBroadcastVariable(BC_VAR_NAME).get(0); } - + @Override public String cross(String s1, String s2) { return s1 + s2 + suffix; diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java index a4426e02bf2a7..e1e393b08f973 100644 --- a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java @@ -46,6 +46,9 @@ import java.util.Set; import java.util.concurrent.Future; +/** + * Tests for {@link CoGroupOperatorBase} on collections. + */ @SuppressWarnings("serial") public class CoGroupOperatorCollectionTest implements Serializable { @@ -94,7 +97,7 @@ public void testExecuteOnCollection() { Assert.assertTrue(udf1.isClosed); Assert.assertTrue(udf2.isClosed); - + Set> expected = new HashSet>( Arrays.asList(new Tuple2Builder() .add("foo", 8) diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java index d0784a8af1f6c..d788efda4a42d 100644 --- a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java @@ -32,6 +32,7 @@ import org.apache.flink.core.fs.Path; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.util.Collector; + import org.junit.Test; import java.util.ArrayList; @@ -44,8 +45,13 @@ import java.util.concurrent.atomic.AtomicBoolean; import static java.util.Arrays.asList; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +/** + * Tests for {@link GroupReduceFunction}. + */ @SuppressWarnings({"serial", "unchecked"}) public class GroupReduceOperatorTest implements java.io.Serializable { @@ -58,7 +64,7 @@ public void testGroupReduceCollection() { @Override public void reduce(Iterable> values, - Collector> out) throws Exception { + Collector> out) throws Exception { Iterator> input = values.iterator(); Tuple2 result = input.next(); @@ -86,13 +92,12 @@ public void reduce(Iterable> values, Integer>("foo", 3), new Tuple2("bar", 2), new Tuple2("bar", 4))); - ExecutionConfig executionConfig = new ExecutionConfig(); executionConfig.disableObjectReuse(); List> resultMutableSafe = op.executeOnCollections(input, null, executionConfig); executionConfig.enableObjectReuse(); List> resultRegular = op.executeOnCollections(input, null, executionConfig); - + Set> resultSetMutableSafe = new HashSet>(resultMutableSafe); Set> resultSetRegular = new HashSet>(resultRegular); @@ -120,10 +125,9 @@ public void testGroupReduceCollectionWithRuntimeContext() { Integer>> reducer = new RichGroupReduceFunction, Tuple2>() { - @Override public void reduce(Iterable> values, - Collector> out) throws Exception { + Collector> out) throws Exception { Iterator> input = values.iterator(); Tuple2 result = input.next(); @@ -175,7 +179,7 @@ public void close() throws Exception { new HashMap>(), new UnregisteredMetricsGroup()), executionConfig); - + executionConfig.enableObjectReuse(); List> resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskInfo, null, executionConfig, @@ -183,8 +187,7 @@ public void close() throws Exception { new HashMap>(), new UnregisteredMetricsGroup()), executionConfig); - - + Set> resultSetMutableSafe = new HashSet>(resultMutableSafe); Set> resultSetRegular = new HashSet>(resultRegular); diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBaseTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBaseTest.java index ef33ac0ca580f..23d0e653c199e 100644 --- a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBaseTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBaseTest.java @@ -45,15 +45,16 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; +/** + * Tests for {@link InnerJoinOperatorBase}. + */ @SuppressWarnings({ "unchecked", "serial" }) public class InnerJoinOperatorBaseTest implements Serializable { - @Test public void testTupleBaseJoiner(){ final FlatJoinFunction, Tuple2, Tuple2> joiner = - new FlatJoinFunction, Tuple2, Tuple2>() - { + new FlatJoinFunction, Tuple2, Tuple2>() { @Override public void join(Tuple3 first, Tuple2 second, Collector> out) { @@ -71,8 +72,8 @@ public void join(Tuple3 first, Tuple2 final TupleTypeInfo> outTypeInfo = TupleTypeInfo.getBasicTupleTypeInfo(Double.class, String.class); - final int[] leftKeys = new int[]{0,2}; - final int[] rightKeys = new int[]{1,0}; + final int[] leftKeys = new int[]{0, 2}; + final int[] rightKeys = new int[]{1, 0}; final String taskName = "Collection based tuple joiner"; @@ -109,7 +110,7 @@ public void join(Tuple3 first, Tuple2 try { final TaskInfo taskInfo = new TaskInfo("op", 1, 0, 1, 0); ExecutionConfig executionConfig = new ExecutionConfig(); - + executionConfig.disableObjectReuse(); List> resultSafe = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext(taskInfo, null, executionConfig, @@ -117,7 +118,7 @@ public void join(Tuple3 first, Tuple2 new HashMap>(), new UnregisteredMetricsGroup()), executionConfig); - + executionConfig.enableObjectReuse(); List> resultRegular = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext(taskInfo, null, executionConfig, diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java index 9427d6fb5f5ec..396b78f6e6f9a 100644 --- a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java @@ -26,6 +26,7 @@ import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.functions.util.RuntimeUDFContext; import org.apache.flink.api.common.operators.UnaryOperatorInformation; +import org.apache.flink.api.java.operators.ReduceOperator; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TypeInfoParser; import org.apache.flink.configuration.Configuration; @@ -47,6 +48,9 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +/** + * Tests for {@link ReduceOperator}. + */ @SuppressWarnings({"serial", "unchecked"}) public class ReduceOperatorTest implements java.io.Serializable { @@ -58,7 +62,7 @@ public void testReduceCollection() { @Override public Tuple2 reduce(Tuple2 value1, - Tuple2 value2) throws + Tuple2 value2) throws Exception { return new Tuple2(value1.f0, value1.f1 + value2.f1); } @@ -83,7 +87,7 @@ public Tuple2 reduce(Tuple2 value1, List> resultMutableSafe = op.executeOnCollections(input, null, executionConfig); executionConfig.enableObjectReuse(); List> resultRegular = op.executeOnCollections(input, null, executionConfig); - + Set> resultSetMutableSafe = new HashSet>(resultMutableSafe); Set> resultSetRegular = new HashSet>(resultRegular); @@ -111,7 +115,7 @@ public void testReduceCollectionWithRuntimeContext() { RichReduceFunction>() { @Override public Tuple2 reduce(Tuple2 value1, - Tuple2 value2) throws + Tuple2 value2) throws Exception { return new Tuple2(value1.f0, value1.f1 + value2.f1); } @@ -148,7 +152,7 @@ public void close() throws Exception { final TaskInfo taskInfo = new TaskInfo(taskName, 1, 0, 1, 0); ExecutionConfig executionConfig = new ExecutionConfig(); - + executionConfig.disableObjectReuse(); List> resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskInfo, null, executionConfig, @@ -156,7 +160,7 @@ public void close() throws Exception { new HashMap>(), new UnregisteredMetricsGroup()), executionConfig); - + executionConfig.enableObjectReuse(); List> resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskInfo, null, executionConfig, diff --git a/flink-java/src/test/java/org/apache/flink/api/java/MultipleInvokationsTest.java b/flink-java/src/test/java/org/apache/flink/api/java/MultipleInvokationsTest.java index 4fc51bb7e870f..d5ec4faa6927c 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/MultipleInvokationsTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/MultipleInvokationsTest.java @@ -16,47 +16,52 @@ * limitations under the License. */ - package org.apache.flink.api.java; -import static org.junit.Assert.*; - import org.apache.flink.api.common.Plan; import org.apache.flink.api.common.operators.GenericDataSinkBase; import org.apache.flink.api.java.io.DiscardingOutputFormat; + import org.junit.Test; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Tests for multiple invocations of a plan. + */ public class MultipleInvokationsTest { @Test public void testMultipleInvocationsGetPlan() { try { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - + // ----------- Execution 1 --------------- - + DataSet data = env.fromElements("Some", "test", "data").name("source1"); //data.print(); data.output(new DiscardingOutputFormat()).name("print1"); data.output(new DiscardingOutputFormat()).name("output1"); - + { Plan p = env.createProgramPlan(); - + assertEquals(2, p.getDataSinks().size()); for (GenericDataSinkBase sink : p.getDataSinks()) { assertTrue(sink.getName().equals("print1") || sink.getName().equals("output1")); assertEquals("source1", sink.getInput().getName()); } } - + // ----------- Execution 2 --------------- - + data.writeAsText("/some/file/path").name("textsink"); - + { Plan p = env.createProgramPlan(); - + assertEquals(1, p.getDataSinks().size()); GenericDataSinkBase sink = p.getDataSinks().iterator().next(); assertEquals("textsink", sink.getName()); diff --git a/flink-java/src/test/java/org/apache/flink/api/java/TypeExtractionTest.java b/flink-java/src/test/java/org/apache/flink/api/java/TypeExtractionTest.java index fb20d7808b20c..22620c21626fc 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/TypeExtractionTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/TypeExtractionTest.java @@ -31,11 +31,11 @@ */ @SuppressWarnings("serial") public class TypeExtractionTest { - + @SuppressWarnings({"unchecked", "rawtypes"}) @Test public void testFunctionWithMissingGenericsAndReturns() { - + RichMapFunction function = new RichMapFunction() { private static final long serialVersionUID = 1L; @@ -57,11 +57,16 @@ public void testGetterSetterWithVertex() { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.fromElements(new VertexTyped(0L, 3.0), new VertexTyped(1L, 1.0)); } - + // ------------------------------------------------------------------------ // Test types // ------------------------------------------------------------------------ + /** + * Representation of Vertex with maximum of 2 keys and a value. + * @param keys type + * @param value type + */ public static class Vertex { private K key1; @@ -107,10 +112,14 @@ public V getValue() { } } + /** + * A {@link Vertex} with {@link Long} as key and {@link Double} as value. + */ public static class VertexTyped extends Vertex{ public VertexTyped(Long l, Double d) { super(l, d); } + public VertexTyped() { } } diff --git a/flink-java/src/test/java/org/apache/flink/api/java/tuple/TupleGenerator.java b/flink-java/src/test/java/org/apache/flink/api/java/tuple/TupleGenerator.java index 4420e99161fb7..dbc76c015db8d 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/tuple/TupleGenerator.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/tuple/TupleGenerator.java @@ -18,13 +18,13 @@ package org.apache.flink.api.java.tuple; +import com.google.common.io.Files; + import java.io.File; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Scanner; -import com.google.common.io.Files; - /** * Source code generator for tuple classes and classes which depend on the arity of tuples. */ @@ -67,12 +67,12 @@ class TupleGenerator { private static final int LAST = 25; public static void main(String[] args) throws Exception { - System.err.println("Current directory "+System.getProperty("user.dir")); + System.err.println("Current directory " + System.getProperty("user.dir")); String rootDir = ROOT_DIRECTORY; - if(args.length > 0) { + if (args.length > 0) { rootDir = args[0] + "/" + ROOT_DIRECTORY; } - System.err.println("Using root directory: "+rootDir); + System.err.println("Using root directory: " + rootDir); File root = new File(rootDir); modifyCsvReader(root); @@ -96,13 +96,13 @@ private static File getPackage(File root, String packageString) { private static void insertCodeIntoFile(String code, File file) throws IOException { String fileContent = Files.toString(file, StandardCharsets.UTF_8); - + try (Scanner s = new Scanner(fileContent)) { StringBuilder sb = new StringBuilder(); String line; - + boolean indicatorFound = false; - + // add file beginning while (s.hasNextLine() && (line = s.nextLine()) != null) { sb.append(line).append("\n"); @@ -111,19 +111,19 @@ private static void insertCodeIntoFile(String code, File file) throws IOExceptio break; } } - - if(!indicatorFound) { + + if (!indicatorFound) { System.out.println("No indicator found in '" + file + "'. Will skip code generation."); s.close(); return; } - + // add generator signature sb.append("\t// GENERATED FROM ").append(TupleGenerator.class.getName()).append(".\n"); - + // add tuple dependent code sb.append(code).append("\n"); - + // skip generated code while (s.hasNextLine() && (line = s.nextLine()) != null) { if (line.contains(END_INDICATOR)) { @@ -131,7 +131,7 @@ private static void insertCodeIntoFile(String code, File file) throws IOExceptio break; } } - + // add file ending while (s.hasNextLine() && (line = s.nextLine()) != null) { sb.append(line).append("\n"); @@ -162,7 +162,7 @@ private static void modifyCrossProjectOperator(File root) throws IOException { sb.append("\t\t\tProjectCross projectionCross = null;\n\n"); sb.append("\t\t\tswitch (fieldIndexes.length) {\n"); for (int numFields = FIRST; numFields <= LAST; numFields++) { - sb.append("\t\t\tcase " + numFields +":" + " projectionCross = (ProjectCross) projectTuple"+numFields+"(); break;\n"); + sb.append("\t\t\tcase " + numFields + ":" + " projectionCross = (ProjectCross) projectTuple" + numFields + "(); break;\n"); } sb.append("\t\t\tdefault: throw new IllegalStateException(\"Excessive arity in tuple.\");\n"); sb.append("\t\t\t}\n\n"); @@ -189,23 +189,23 @@ private static void modifyCrossProjectOperator(File root) throws IOException { // method signature sb.append("\t\tpublic <"); appendTupleTypeGenerics(sb, numFields); - sb.append("> ProjectCross ProjectCross> projectTuple"+numFields+"("); + sb.append(">> projectTuple" + numFields + "("); sb.append(") {\n"); // extract field types sb.append("\t\t\tTypeInformation[] fTypes = extractFieldTypes(fieldIndexes);\n"); // create new tuple type info - sb.append("\t\t\tTupleTypeInfo> tType = new TupleTypeInfo> tType = new TupleTypeInfo>(fTypes);\n\n"); // create and return new project operator - sb.append("\t\t\treturn new ProjectCross>(this.ds1, this.ds2, this.fieldIndexes, this.isFieldInFirst, tType, this, hint);\n"); @@ -243,7 +243,7 @@ private static void modifyProjectOperator(File root) throws IOException { sb.append("\t\t\tProjectOperator projOperator;\n\n"); sb.append("\t\t\tswitch (fieldIndexes.length) {\n"); for (int numFields = FIRST; numFields <= LAST; numFields++) { - sb.append("\t\t\tcase " + numFields +":" + " projOperator = (ProjectOperator) projectTuple"+numFields+"(); break;\n"); + sb.append("\t\t\tcase " + numFields + ":" + " projOperator = (ProjectOperator) projectTuple" + numFields + "(); break;\n"); } sb.append("\t\t\tdefault: throw new IllegalStateException(\"Excessive arity in tuple.\");\n"); sb.append("\t\t\t}\n\n"); @@ -270,23 +270,23 @@ private static void modifyProjectOperator(File root) throws IOException { // method signature sb.append("\t\tpublic <"); appendTupleTypeGenerics(sb, numFields); - sb.append("> ProjectOperator ProjectOperator> projectTuple"+numFields+"("); + sb.append(">> projectTuple" + numFields + "("); sb.append(") {\n"); // extract field types sb.append("\t\t\tTypeInformation[] fTypes = extractFieldTypes(fieldIndexes, ds.getType());\n"); // create new tuple type info - sb.append("\t\t\tTupleTypeInfo> tType = new TupleTypeInfo> tType = new TupleTypeInfo>(fTypes);\n\n"); // create and return new project operator - sb.append("\t\t\treturn new ProjectOperator>(this.ds, this.fieldIndexes, tType);\n"); @@ -324,7 +324,7 @@ private static void modifyJoinProjectOperator(File root) throws IOException { sb.append("\t\t\tProjectJoin projectJoin = null;\n\n"); sb.append("\t\t\tswitch (fieldIndexes.length) {\n"); for (int numFields = FIRST; numFields <= LAST; numFields++) { - sb.append("\t\t\tcase " + numFields +":" + " projectJoin = (ProjectJoin) projectTuple"+numFields+"(); break;\n"); + sb.append("\t\t\tcase " + numFields + ":" + " projectJoin = (ProjectJoin) projectTuple" + numFields + "(); break;\n"); } sb.append("\t\t\tdefault: throw new IllegalStateException(\"Excessive arity in tuple.\");\n"); sb.append("\t\t\t}\n\n"); @@ -352,23 +352,23 @@ private static void modifyJoinProjectOperator(File root) throws IOException { // method signature sb.append("\t\tpublic <"); appendTupleTypeGenerics(sb, numFields); - sb.append("> ProjectJoin ProjectJoin> projectTuple"+numFields+"("); + sb.append(">> projectTuple" + numFields + "("); sb.append(") {\n"); // extract field types sb.append("\t\t\tTypeInformation[] fTypes = extractFieldTypes(fieldIndexes);\n"); // create new tuple type info - sb.append("\t\t\tTupleTypeInfo> tType = new TupleTypeInfo> tType = new TupleTypeInfo>(fTypes);\n\n"); // create and return new project operator - sb.append("\t\t\treturn new ProjectJoin>(this.ds1, this.ds2, this.keys1, this.keys2, this.hint, this.fieldIndexes, this.isFieldInFirst, tType, this);\n"); @@ -468,9 +468,8 @@ private static void appendTupleTypeGenerics(StringBuilder sb, int numFields) { sb.append(GEN_TYPE_PREFIX + i); } } - - - private static String HEADER = + + private static final String HEADER = "/*\n" + " * Licensed to the Apache Software Foundation (ASF) under one\n" + " * or more contributor license agreements. See the NOTICE file\n" From 1c8c4414780eb3e3168de4b34676a667be6854f8 Mon Sep 17 00:00:00 2001 From: Dawid Wysakowicz Date: Sat, 15 Jul 2017 10:36:30 +0200 Subject: [PATCH 2/3] Removed suppression for ExecutionEnvironment --- .../flink/api/java/ExecutionEnvironment.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java index c09916815a891..089c90b458853 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java @@ -587,7 +587,7 @@ public DataSource createInput(InputFormat inputFormat, TypeInformat /** * Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapred.FileInputFormat}. * - * @deprecated Please use {@link org.apache.flink.hadoopcompatibility.HadoopInputs#readHadoopFile(org.apache.hadoop.mapred.FileInputFormat, Class, Class, String, JobConf)} + * @deprecated Please use {@code org.apache.flink.hadoopcompatibility.HadoopInputs#readHadoopFile(org.apache.hadoop.mapred.FileInputFormat, Class, Class, String, JobConf)} * from the flink-hadoop-compatibility module. */ @Deprecated @@ -604,7 +604,7 @@ public DataSource> readHadoopFile(org.apache.hadoop.mapred.F * Creates a {@link DataSet} from {@link org.apache.hadoop.mapred.SequenceFileInputFormat} * A {@link org.apache.hadoop.mapred.JobConf} with the given inputPath is created. * - * @deprecated Please use {@link org.apache.flink.hadoopcompatibility.HadoopInputs#readSequenceFile(Class, Class, String)} + * @deprecated Please use {@code org.apache.flink.hadoopcompatibility.HadoopInputs#readSequenceFile(Class, Class, String)} * from the flink-hadoop-compatibility module. */ @Deprecated @@ -617,7 +617,7 @@ public DataSource> readSequenceFile(Class key, Class v * Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapred.FileInputFormat}. A * {@link org.apache.hadoop.mapred.JobConf} with the given inputPath is created. * - * @deprecated Please use {@link org.apache.flink.hadoopcompatibility.HadoopInputs#readHadoopFile(org.apache.hadoop.mapred.FileInputFormat, Class, Class, String)} + * @deprecated Please use {@code org.apache.flink.hadoopcompatibility.HadoopInputs#readHadoopFile(org.apache.hadoop.mapred.FileInputFormat, Class, Class, String)} * from the flink-hadoop-compatibility module. */ @Deprecated @@ -629,7 +629,7 @@ public DataSource> readHadoopFile(org.apache.hadoop.mapred.F /** * Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapred.InputFormat}. * - * @deprecated Please use {@link org.apache.flink.hadoopcompatibility.HadoopInputs#createHadoopInput(org.apache.hadoop.mapred.InputFormat, Class, Class, JobConf)} + * @deprecated Please use {@code org.apache.flink.hadoopcompatibility.HadoopInputs#createHadoopInput(org.apache.hadoop.mapred.InputFormat, Class, Class, JobConf)} * from the flink-hadoop-compatibility module. */ @Deprecated @@ -644,7 +644,7 @@ public DataSource> createHadoopInput(org.apache.hadoop.mapre * Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}. The * given inputName is set on the given job. * - * @deprecated Please use {@link org.apache.flink.hadoopcompatibility.HadoopInputs#readHadoopFile(org.apache.hadoop.mapreduce.lib.input.FileInputFormat, Class, Class, String, Job)} + * @deprecated Please use {@code org.apache.flink.hadoopcompatibility.HadoopInputs#readHadoopFile(org.apache.hadoop.mapreduce.lib.input.FileInputFormat, Class, Class, String, Job)} * from the flink-hadoop-compatibility module. */ @Deprecated @@ -662,7 +662,7 @@ public DataSource> readHadoopFile(org.apache.hadoop.mapreduc * Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}. A * {@link org.apache.hadoop.mapreduce.Job} with the given inputPath is created. * - * @deprecated Please use {@link org.apache.flink.hadoopcompatibility.HadoopInputs#readHadoopFile(org.apache.hadoop.mapreduce.lib.input.FileInputFormat, Class, Class, String)} + * @deprecated Please use {@code org.apache.flink.hadoopcompatibility.HadoopInputs#readHadoopFile(org.apache.hadoop.mapreduce.lib.input.FileInputFormat, Class, Class, String)} * from the flink-hadoop-compatibility module. */ @Deprecated @@ -674,7 +674,7 @@ public DataSource> readHadoopFile(org.apache.hadoop.mapreduc /** * Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapreduce.InputFormat}. * - * @deprecated Please use {@link org.apache.flink.hadoopcompatibility.HadoopInputs#createHadoopInput(org.apache.hadoop.mapreduce.InputFormat, Class, Class, Job)} + * @deprecated Please use {@code org.apache.flink.hadoopcompatibility.HadoopInputs#createHadoopInput(org.apache.hadoop.mapreduce.InputFormat, Class, Class, Job)} * from the flink-hadoop-compatibility module. */ @Deprecated From b1f5abdd678ae3e0af0292b29c3d1f10d283022d Mon Sep 17 00:00:00 2001 From: Dawid Wysakowicz Date: Tue, 25 Jul 2017 09:52:58 +0200 Subject: [PATCH 3/3] removed appropriate suppressions --- tools/maven/suppressions-java.xml | 8 -------- 1 file changed, 8 deletions(-) diff --git a/tools/maven/suppressions-java.xml b/tools/maven/suppressions-java.xml index 3b7d60ba697bb..d6aeb2d19b38b 100644 --- a/tools/maven/suppressions-java.xml +++ b/tools/maven/suppressions-java.xml @@ -67,14 +67,6 @@ under the License. files="(.*)api[/\\]java[/\\]operator[/\\]([^/\\]*\.java)" checks="NewlineAtEndOfFile|RegexpSingleline|TodoComment|RedundantImport|ImportOrder|RedundantModifier|JavadocMethod|JavadocParagraph|JavadocType|JavadocStyle|PackageName|TypeNameCheck|ConstantNameCheck|StaticVariableNameCheck|MemberNameCheck|MethodNameCheck|ParameterName|LocalFinalVariableName|LocalVariableName|LeftCurly|UpperEll|FallThrough|reliefPattern|SimplifyBooleanExpression|EmptyStatement|ModifierOrder|EmptyLineSeparator|WhitespaceAround|WhitespaceAfter|NoWhitespaceAfter|NoWhitespaceBefore|OperatorWrap|ParenPad"/> - - - -