diff --git a/docs/dataset_transformations.md b/docs/dataset_transformations.md index 2bec61b6c0422..3bb3cec1fb58a 100644 --- a/docs/dataset_transformations.md +++ b/docs/dataset_transformations.md @@ -554,7 +554,7 @@ an alternative WordCount implementation. In the implementation, DataSet input = [..] // The words received as input DataSet groupedInput = input.groupBy(0); // group identical words -DataSet> combinedWords = groupedInput.combineGroup(new FlatCombineFunction() { +DataSet> combinedWords = groupedInput.combineGroup(new GroupCombineFunction() { public void combine(Iterable words, Collector>) { // combine int count = 0; diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java index ef52b32d9de0b..af115b032b6a4 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java @@ -29,7 +29,7 @@ * reduce the data volume earlier, before the entire groups have been collected. *

* This special variant of the combine function reduces the group of elements into a single element. A variant - * that can return multiple values per group is defined in {@link FlatCombineFunction}. + * that can return multiple values per group is defined in {@link GroupCombineFunction}. * * @param The data type processed by the combine function. * @param The data type emitted by the combine function. diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatCombineFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupCombineFunction.java similarity index 96% rename from flink-core/src/main/java/org/apache/flink/api/common/functions/FlatCombineFunction.java rename to flink-core/src/main/java/org/apache/flink/api/common/functions/GroupCombineFunction.java index b90b3cec32bd9..c0b153b3acaf6 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatCombineFunction.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupCombineFunction.java @@ -36,14 +36,14 @@ * @param The data type processed by the combine function. * @param The data type emitted by the combine function. */ -public interface FlatCombineFunction extends Function, Serializable { +public interface GroupCombineFunction extends Function, Serializable { /** * The combine method, called (potentially multiple timed) with subgroups of elements. - * + * * @param values The elements to be combined. * @param out The collector to use to return values from the function. - * + * * @throws Exception The function may throw Exceptions, which will cause the program to cancel, * and may trigger the recovery logic. */ diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFlatCombineFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupCombineFunction.java similarity index 77% rename from flink-core/src/main/java/org/apache/flink/api/common/functions/RichFlatCombineFunction.java rename to flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupCombineFunction.java index 17aca886e2841..55df232ab9b9c 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFlatCombineFunction.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupCombineFunction.java @@ -19,13 +19,10 @@ package org.apache.flink.api.common.functions; -import org.apache.flink.api.common.functions.AbstractRichFunction; -import org.apache.flink.api.common.functions.FlatCombineFunction; -import org.apache.flink.api.common.functions.RichFunction; import org.apache.flink.util.Collector; /** - * Rich variant of the {@link FlatCombineFunction}. As a {@link RichFunction}, it gives access to the + * Rich variant of the {@link GroupCombineFunction}. As a {@link RichFunction}, it gives access to the * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods: * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and * {@link RichFunction#close()}. @@ -33,7 +30,7 @@ * @param The data type of the elements to be combined. * @param The resulting data type of the elements to be combined. */ -public abstract class RichFlatCombineFunction extends AbstractRichFunction implements FlatCombineFunction { +public abstract class RichGroupCombineFunction extends AbstractRichFunction implements GroupCombineFunction { private static final long serialVersionUID = 1L; diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupReduceFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupReduceFunction.java index b6c92c29996bc..48e27d370f9d5 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupReduceFunction.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupReduceFunction.java @@ -34,7 +34,7 @@ * @param Type of the elements that this function processes. * @param The type of the elements returned by the user-defined function. */ -public abstract class RichGroupReduceFunction extends AbstractRichFunction implements GroupReduceFunction, FlatCombineFunction { +public abstract class RichGroupReduceFunction extends AbstractRichFunction implements GroupReduceFunction, GroupCombineFunction { private static final long serialVersionUID = 1L; @@ -83,5 +83,5 @@ public void combine(Iterable values, Collector out) throws Exception { */ @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.TYPE) - public static @interface Combinable {}; + public static @interface Combinable {} } diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupCombineOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupCombineOperatorBase.java index 2a47c4527dd3b..27fbc1c3eca42 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupCombineOperatorBase.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupCombineOperatorBase.java @@ -22,7 +22,7 @@ import org.apache.commons.lang3.ArrayUtils; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.InvalidProgramException; -import org.apache.flink.api.common.functions.FlatCombineFunction; +import org.apache.flink.api.common.functions.GroupCombineFunction; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.functions.util.CopyingListCollector; import org.apache.flink.api.common.functions.util.FunctionUtils; @@ -46,7 +46,7 @@ * This class is later processed by the compiler to generate the plan. * @see org.apache.flink.api.common.functions.CombineFunction */ -public class GroupCombineOperatorBase> extends SingleInputOperator { +public class GroupCombineOperatorBase> extends SingleInputOperator { /** The ordering for the order inside a reduce group. */ @@ -81,7 +81,7 @@ public Ordering getGroupOrder() { @Override protected List executeOnCollections(List inputData, RuntimeContext ctx, ExecutionConfig executionConfig) throws Exception { - FlatCombineFunction function = this.userFunction.getUserCodeObject(); + GroupCombineFunction function = this.userFunction.getUserCodeObject(); UnaryOperatorInformation operatorInfo = getOperatorInfo(); TypeInformation inputType = operatorInfo.getInputType(); diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java index 6d7db89f819de..57f07f38be91a 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java @@ -21,7 +21,7 @@ import org.apache.commons.lang3.ArrayUtils; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.InvalidProgramException; -import org.apache.flink.api.common.functions.FlatCombineFunction; +import org.apache.flink.api.common.functions.GroupCombineFunction; import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.functions.Partitioner; import org.apache.flink.api.common.functions.RuntimeContext; @@ -107,15 +107,15 @@ public Ordering getGroupOrder() { /** * Marks the group reduce operation as combinable. Combinable operations may pre-reduce the * data before the actual group reduce operations. Combinable user-defined functions - * must implement the interface {@link org.apache.flink.api.common.functions.FlatCombineFunction}. + * must implement the interface {@link GroupCombineFunction}. * * @param combinable Flag to mark the group reduce operation as combinable. */ public void setCombinable(boolean combinable) { // sanity check - if (combinable && !FlatCombineFunction.class.isAssignableFrom(this.userFunction.getUserCodeClass())) { + if (combinable && !GroupCombineFunction.class.isAssignableFrom(this.userFunction.getUserCodeClass())) { throw new IllegalArgumentException("Cannot set a UDF as combinable if it does not implement the interface " + - FlatCombineFunction.class.getName()); + GroupCombineFunction.class.getName()); } else { this.combinable = combinable; } diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java index ed8d1caab8305..1e91eeb0b22dd 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java @@ -24,7 +24,7 @@ import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.functions.FilterFunction; -import org.apache.flink.api.common.functions.FlatCombineFunction; +import org.apache.flink.api.common.functions.GroupCombineFunction; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.functions.InvalidTypesException; @@ -473,7 +473,7 @@ public GroupReduceOperator reduceGroup(GroupReduceFunction reduc * @param combiner The CombineFunction that is applied on the DataSet. * @return A GroupCombineOperator which represents the combined DataSet. */ - public GroupCombineOperator combineGroup(FlatCombineFunction combiner) { + public GroupCombineOperator combineGroup(GroupCombineFunction combiner) { if (combiner == null) { throw new NullPointerException("GroupReduce function must not be null."); } diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/FirstReducer.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/FirstReducer.java index fbb7029b4093a..a604cc02d8448 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/functions/FirstReducer.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/FirstReducer.java @@ -17,13 +17,13 @@ */ package org.apache.flink.api.java.functions; -import org.apache.flink.api.common.functions.FlatCombineFunction; +import org.apache.flink.api.common.functions.GroupCombineFunction; import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable; import org.apache.flink.util.Collector; @Combinable -public class FirstReducer implements GroupReduceFunction, FlatCombineFunction { +public class FirstReducer implements GroupReduceFunction, GroupCombineFunction { private static final long serialVersionUID = 1L; private final int count; diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java index 3c1d47cdf3d29..911c608e3e866 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java @@ -18,7 +18,7 @@ package org.apache.flink.api.java.operators; -import org.apache.flink.api.common.functions.FlatCombineFunction; +import org.apache.flink.api.common.functions.GroupCombineFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.operators.Operator; import org.apache.flink.api.common.operators.Order; @@ -46,7 +46,7 @@ */ public class GroupCombineOperator extends SingleInputUdfOperator> { - private final FlatCombineFunction function; + private final GroupCombineFunction function; private final Grouping grouper; @@ -60,7 +60,7 @@ public class GroupCombineOperator extends SingleInputUdfOperator input, TypeInformation resultType, FlatCombineFunction function, String defaultName) { + public GroupCombineOperator(DataSet input, TypeInformation resultType, GroupCombineFunction function, String defaultName) { super(input, resultType); this.function = function; this.grouper = null; @@ -73,7 +73,7 @@ public GroupCombineOperator(DataSet input, TypeInformation resultType, * @param input The grouped input to be processed group-wise by the groupReduce function. * @param function The user-defined GroupReduce function. */ - public GroupCombineOperator(Grouping input, TypeInformation resultType, FlatCombineFunction function, String defaultName) { + public GroupCombineOperator(Grouping input, TypeInformation resultType, GroupCombineFunction function, String defaultName) { super(input != null ? input.getDataSet() : null, resultType); this.function = function; @@ -82,7 +82,7 @@ public GroupCombineOperator(Grouping input, TypeInformation resultType, } @Override - protected FlatCombineFunction getFunction() { + protected GroupCombineFunction getFunction() { return function; } @@ -99,8 +99,8 @@ protected FlatCombineFunction getFunction() { if (grouper == null) { // non grouped reduce UnaryOperatorInformation operatorInfo = new UnaryOperatorInformation(getInputType(), getResultType()); - GroupCombineOperatorBase> po = - new GroupCombineOperatorBase>(function, operatorInfo, new int[0], name); + GroupCombineOperatorBase> po = + new GroupCombineOperatorBase>(function, operatorInfo, new int[0], name); po.setInput(input); // the parallelism for a non grouped reduce can only be 1 @@ -144,8 +144,8 @@ else if (grouper.getKeys() instanceof Keys.ExpressionKeys) { int[] logicalKeyPositions = grouper.getKeys().computeLogicalKeyPositions(); UnaryOperatorInformation operatorInfo = new UnaryOperatorInformation(getInputType(), getResultType()); - GroupCombineOperatorBase> po = - new GroupCombineOperatorBase>(function, operatorInfo, logicalKeyPositions, name); + GroupCombineOperatorBase> po = + new GroupCombineOperatorBase>(function, operatorInfo, logicalKeyPositions, name); po.setInput(input); po.setParallelism(getParallelism()); @@ -175,7 +175,7 @@ else if (grouper.getKeys() instanceof Keys.ExpressionKeys) { // -------------------------------------------------------------------------------------------- private static PlanUnwrappingGroupCombineOperator translateSelectorFunctionReducer( - Keys.SelectorFunctionKeys rawKeys, FlatCombineFunction function, + Keys.SelectorFunctionKeys rawKeys, GroupCombineFunction function, TypeInformation inputType, TypeInformation outputType, String name, Operator input) { @SuppressWarnings("unchecked") @@ -199,7 +199,7 @@ private static PlanUnwrappingGroupCombineOperator trans } private static PlanUnwrappingSortedGroupCombineOperator translateSelectorFunctionSortedReducer( - Keys.SelectorFunctionKeys rawGroupingKey, Keys.SelectorFunctionKeys rawSortingKey, FlatCombineFunction function, + Keys.SelectorFunctionKeys rawGroupingKey, Keys.SelectorFunctionKeys rawSortingKey, GroupCombineFunction function, TypeInformation inputType, TypeInformation outputType, String name, Operator input) { @SuppressWarnings("unchecked") diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java index e8096231305c7..30f2cc401cec8 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java @@ -18,7 +18,7 @@ package org.apache.flink.api.java.operators; -import org.apache.flink.api.common.functions.FlatCombineFunction; +import org.apache.flink.api.common.functions.GroupCombineFunction; import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.operators.Operator; @@ -88,7 +88,7 @@ public GroupReduceOperator(Grouping input, TypeInformation resultType, } private void checkCombinability() { - if (function instanceof FlatCombineFunction && + if (function instanceof GroupCombineFunction && function.getClass().getAnnotation(RichGroupReduceFunction.Combinable.class) != null) { this.combinable = true; } @@ -111,7 +111,7 @@ public boolean isCombinable() { public GroupReduceOperator setCombinable(boolean combinable) { // sanity check that the function is a subclass of the combine interface - if (combinable && !(function instanceof FlatCombineFunction)) { + if (combinable && !(function instanceof GroupCombineFunction)) { throw new IllegalArgumentException("The function does not implement the combine interface."); } diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java index b2054bf143290..287bf8250af98 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java @@ -18,7 +18,7 @@ package org.apache.flink.api.java.operators; -import org.apache.flink.api.common.functions.FlatCombineFunction; +import org.apache.flink.api.common.functions.GroupCombineFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.CompositeType; import org.apache.flink.api.java.Utils; @@ -169,7 +169,7 @@ public GroupReduceOperator reduceGroup(GroupReduceFunction reduc * @param combiner The CombineFunction that is applied on the DataSet. * @return A GroupCombineOperator which represents the combined DataSet. */ - public GroupCombineOperator combineGroup(FlatCombineFunction combiner) { + public GroupCombineOperator combineGroup(GroupCombineFunction combiner) { if (combiner == null) { throw new NullPointerException("GroupReduce function must not be null."); } diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java index 0f3faa060df20..319a599f000b6 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java @@ -19,7 +19,7 @@ package org.apache.flink.api.java.operators; import org.apache.flink.api.common.InvalidProgramException; -import org.apache.flink.api.common.functions.FlatCombineFunction; +import org.apache.flink.api.common.functions.GroupCombineFunction; import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.functions.Partitioner; import org.apache.flink.api.common.functions.ReduceFunction; @@ -173,7 +173,7 @@ public GroupReduceOperator reduceGroup(GroupReduceFunction reduc * @param combiner The CombineFunction that is applied on the DataSet. * @return A GroupCombineOperator which represents the combined DataSet. */ - public GroupCombineOperator combineGroup(FlatCombineFunction combiner) { + public GroupCombineOperator combineGroup(GroupCombineFunction combiner) { if (combiner == null) { throw new NullPointerException("GroupReduce function must not be null."); } diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingGroupCombineOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingGroupCombineOperator.java index ae4ba115fd0ba..c8e40ceb53d26 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingGroupCombineOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingGroupCombineOperator.java @@ -18,7 +18,7 @@ package org.apache.flink.api.java.operators.translation; -import org.apache.flink.api.common.functions.FlatCombineFunction; +import org.apache.flink.api.common.functions.GroupCombineFunction; import org.apache.flink.api.common.operators.UnaryOperatorInformation; import org.apache.flink.api.common.operators.base.GroupCombineOperatorBase; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -30,9 +30,9 @@ * A group combine operator that takes 2-tuples (key-value pairs), and applies the group combine operation only * on the unwrapped values. */ -public class PlanUnwrappingGroupCombineOperator extends GroupCombineOperatorBase, OUT, FlatCombineFunction, OUT>> { +public class PlanUnwrappingGroupCombineOperator extends GroupCombineOperatorBase, OUT, GroupCombineFunction, OUT>> { - public PlanUnwrappingGroupCombineOperator(FlatCombineFunction udf, Keys.SelectorFunctionKeys key, String name, + public PlanUnwrappingGroupCombineOperator(GroupCombineFunction udf, Keys.SelectorFunctionKeys key, String name, TypeInformation outType, TypeInformation> typeInfoWithKey) { super(new TupleUnwrappingGroupCombiner(udf), @@ -42,15 +42,15 @@ public PlanUnwrappingGroupCombineOperator(FlatCombineFunction udf, Keys // -------------------------------------------------------------------------------------------- - public static final class TupleUnwrappingGroupCombiner extends WrappingFunction> - implements FlatCombineFunction, OUT> + public static final class TupleUnwrappingGroupCombiner extends WrappingFunction> + implements GroupCombineFunction, OUT> { private static final long serialVersionUID = 1L; private final TupleUnwrappingIterator iter; - private TupleUnwrappingGroupCombiner(FlatCombineFunction wrapped) { + private TupleUnwrappingGroupCombiner(GroupCombineFunction wrapped) { super(wrapped); this.iter = new TupleUnwrappingIterator(); } diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java index 1d59a211252c8..e01af501c0341 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java @@ -18,7 +18,7 @@ package org.apache.flink.api.java.operators.translation; -import org.apache.flink.api.common.functions.FlatCombineFunction; +import org.apache.flink.api.common.functions.GroupCombineFunction; import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.operators.UnaryOperatorInformation; import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase; @@ -37,7 +37,7 @@ public class PlanUnwrappingReduceGroupOperator extends GroupReduceOp public PlanUnwrappingReduceGroupOperator(GroupReduceFunction udf, Keys.SelectorFunctionKeys key, String name, TypeInformation outType, TypeInformation> typeInfoWithKey, boolean combinable) { - super(combinable ? new TupleUnwrappingFlatCombinableGroupReducer((RichGroupReduceFunction) udf) : new TupleUnwrappingNonCombinableGroupReducer(udf), + super(combinable ? new TupleUnwrappingGroupCombinableGroupReducer((RichGroupReduceFunction) udf) : new TupleUnwrappingNonCombinableGroupReducer(udf), new UnaryOperatorInformation, OUT>(typeInfoWithKey, outType), key.computeLogicalKeyPositions(), name); super.setCombinable(combinable); @@ -46,8 +46,8 @@ public PlanUnwrappingReduceGroupOperator(GroupReduceFunction udf, Keys. // -------------------------------------------------------------------------------------------- @RichGroupReduceFunction.Combinable - public static final class TupleUnwrappingFlatCombinableGroupReducer extends WrappingFunction> - implements GroupReduceFunction, OUT>, FlatCombineFunction, Tuple2> + public static final class TupleUnwrappingGroupCombinableGroupReducer extends WrappingFunction> + implements GroupReduceFunction, OUT>, GroupCombineFunction, Tuple2> { private static final long serialVersionUID = 1L; @@ -55,7 +55,7 @@ public static final class TupleUnwrappingFlatCombinableGroupReducer private TupleUnwrappingIterator iter; private TupleWrappingCollector coll; - private TupleUnwrappingFlatCombinableGroupReducer(RichGroupReduceFunction wrapped) { + private TupleUnwrappingGroupCombinableGroupReducer(RichGroupReduceFunction wrapped) { super(wrapped); this.iter = new TupleUnwrappingIterator(); this.coll = new TupleWrappingCollector(this.iter); diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedGroupCombineOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedGroupCombineOperator.java index b3d847054978b..e52a5c42bbc1a 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedGroupCombineOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedGroupCombineOperator.java @@ -18,7 +18,7 @@ package org.apache.flink.api.java.operators.translation; -import org.apache.flink.api.common.functions.FlatCombineFunction; +import org.apache.flink.api.common.functions.GroupCombineFunction; import org.apache.flink.api.common.operators.UnaryOperatorInformation; import org.apache.flink.api.common.operators.base.GroupCombineOperatorBase; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -30,9 +30,9 @@ * A reduce operator that takes 3-tuples (groupKey, sortKey, value), and applies the sorted partial group reduce * operation only on the unwrapped values. */ -public class PlanUnwrappingSortedGroupCombineOperator extends GroupCombineOperatorBase, OUT, FlatCombineFunction,OUT>> { +public class PlanUnwrappingSortedGroupCombineOperator extends GroupCombineOperatorBase, OUT, GroupCombineFunction,OUT>> { - public PlanUnwrappingSortedGroupCombineOperator(FlatCombineFunction udf, Keys.SelectorFunctionKeys groupingKey, Keys.SelectorFunctionKeys sortingKey, String name, + public PlanUnwrappingSortedGroupCombineOperator(GroupCombineFunction udf, Keys.SelectorFunctionKeys groupingKey, Keys.SelectorFunctionKeys sortingKey, String name, TypeInformation outType, TypeInformation> typeInfoWithKey) { super(new TupleUnwrappingGroupReducer(udf), @@ -42,15 +42,15 @@ public PlanUnwrappingSortedGroupCombineOperator(FlatCombineFunction udf } - public static final class TupleUnwrappingGroupReducer extends WrappingFunction> - implements FlatCombineFunction, OUT> + public static final class TupleUnwrappingGroupReducer extends WrappingFunction> + implements GroupCombineFunction, OUT> { private static final long serialVersionUID = 1L; private final Tuple3UnwrappingIterator iter; - private TupleUnwrappingGroupReducer(FlatCombineFunction wrapped) { + private TupleUnwrappingGroupReducer(GroupCombineFunction wrapped) { super(wrapped); this.iter = new Tuple3UnwrappingIterator(); } diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java index 757ff56548887..63ebfa4f6c4ea 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java @@ -18,7 +18,7 @@ package org.apache.flink.api.java.operators.translation; -import org.apache.flink.api.common.functions.FlatCombineFunction; +import org.apache.flink.api.common.functions.GroupCombineFunction; import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.operators.UnaryOperatorInformation; import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase; @@ -37,7 +37,7 @@ public class PlanUnwrappingSortedReduceGroupOperator extends Gr public PlanUnwrappingSortedReduceGroupOperator(GroupReduceFunction udf, Keys.SelectorFunctionKeys groupingKey, Keys.SelectorFunctionKeys sortingKey, String name, TypeInformation outType, TypeInformation> typeInfoWithKey, boolean combinable) { - super(combinable ? new TupleUnwrappingFlatCombinableGroupReducer((RichGroupReduceFunction) udf) : new TupleUnwrappingNonCombinableGroupReducer(udf), + super(combinable ? new TupleUnwrappingGroupCombinableGroupReducer((RichGroupReduceFunction) udf) : new TupleUnwrappingNonCombinableGroupReducer(udf), new UnaryOperatorInformation, OUT>(typeInfoWithKey, outType), groupingKey.computeLogicalKeyPositions(), name); super.setCombinable(combinable); @@ -46,8 +46,8 @@ public PlanUnwrappingSortedReduceGroupOperator(GroupReduceFunction udf, // -------------------------------------------------------------------------------------------- @RichGroupReduceFunction.Combinable - public static final class TupleUnwrappingFlatCombinableGroupReducer extends WrappingFunction> - implements GroupReduceFunction, OUT>, FlatCombineFunction, Tuple3> + public static final class TupleUnwrappingGroupCombinableGroupReducer extends WrappingFunction> + implements GroupReduceFunction, OUT>, GroupCombineFunction, Tuple3> { private static final long serialVersionUID = 1L; @@ -55,7 +55,7 @@ public static final class TupleUnwrappingFlatCombinableGroupReducer iter; private Tuple3WrappingCollector coll; - private TupleUnwrappingFlatCombinableGroupReducer(RichGroupReduceFunction wrapped) { + private TupleUnwrappingGroupCombinableGroupReducer(RichGroupReduceFunction wrapped) { super(wrapped); this.iter = new Tuple3UnwrappingIterator(); this.coll = new Tuple3WrappingCollector(this.iter); diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/ReduceOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/ReduceOperator.java index 875e9c141c809..1866feac218be 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/ReduceOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/ReduceOperator.java @@ -32,7 +32,7 @@ import java.util.Map; import org.apache.commons.lang3.Validate; -import org.apache.flink.api.common.functions.FlatCombineFunction; +import org.apache.flink.api.common.functions.GroupCombineFunction; import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.operators.Operator; import org.apache.flink.api.common.operators.Ordering; @@ -367,7 +367,7 @@ public ReduceOperator build() { // ============================================================================================ - public static class WrappingReduceFunction extends WrappingFunction implements GroupReduceFunction, FlatCombineFunction { + public static class WrappingReduceFunction extends WrappingFunction implements GroupReduceFunction, GroupCombineFunction { private static final long serialVersionUID = 1L; diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java index 4527aa02f3b2c..ae6063a1ac592 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java @@ -34,7 +34,7 @@ import org.apache.commons.lang3.Validate; import org.apache.flink.api.common.functions.CoGroupFunction; import org.apache.flink.api.common.functions.CrossFunction; -import org.apache.flink.api.common.functions.FlatCombineFunction; +import org.apache.flink.api.common.functions.GroupCombineFunction; import org.apache.flink.api.common.functions.FlatJoinFunction; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.FoldFunction; @@ -135,14 +135,14 @@ public static TypeInformation getGroupReduceReturnTypes(GroupRedu return getUnaryOperatorReturnType((Function) groupReduceInterface, GroupReduceFunction.class, true, true, inType, functionName, allowMissing); } - public static TypeInformation getGroupCombineReturnTypes(FlatCombineFunction combineInterface, TypeInformation inType) { + public static TypeInformation getGroupCombineReturnTypes(GroupCombineFunction combineInterface, TypeInformation inType) { return getGroupCombineReturnTypes(combineInterface, inType, null, false); } - public static TypeInformation getGroupCombineReturnTypes(FlatCombineFunction combineInterface, TypeInformation inType, + public static TypeInformation getGroupCombineReturnTypes(GroupCombineFunction combineInterface, TypeInformation inType, String functionName, boolean allowMissing) { - return getUnaryOperatorReturnType((Function) combineInterface, FlatCombineFunction.class, true, true, inType, functionName, allowMissing); + return getUnaryOperatorReturnType((Function) combineInterface, GroupCombineFunction.class, true, true, inType, functionName, allowMissing); } @@ -600,7 +600,7 @@ else if (inTypeInfo instanceof ObjectArrayTypeInfo) { // the input is a tuple else if (inTypeInfo instanceof TupleTypeInfo && isClassType(inType) && Tuple.class.isAssignableFrom(typeToClass(inType))) { - ParameterizedType tupleBaseClass = null; + ParameterizedType tupleBaseClass; // get tuple from possible tuple subclass while (!(isClassType(inType) && typeToClass(inType).getSuperclass().equals(Tuple.class))) { @@ -737,7 +737,7 @@ private static void validateInfo(ArrayList typeHierarchy, Type type, TypeI // check for basic type if (typeInfo.isBasicType()) { - TypeInformation actual = null; + TypeInformation actual; // check if basic type at all if (!(type instanceof Class) || (actual = BasicTypeInfo.getInfoFor((Class) type)) == null) { throw new InvalidTypesException("Basic type expected."); @@ -792,7 +792,7 @@ else if (typeInfo instanceof WritableTypeInfo) { } // check writable type contents - Class clazz = null; + Class clazz; if (((WritableTypeInfo) typeInfo).getTypeClass() != (clazz = (Class) type)) { throw new InvalidTypesException("Writable type '" + ((WritableTypeInfo) typeInfo).getTypeClass().getCanonicalName() + "' expected but was '" @@ -801,7 +801,7 @@ else if (typeInfo instanceof WritableTypeInfo) { } // check for primitive array else if (typeInfo instanceof PrimitiveArrayTypeInfo) { - Type component = null; + Type component; // check if array at all if (!(type instanceof Class && ((Class) type).isArray() && (component = ((Class) type).getComponentType()) != null) && !(type instanceof GenericArrayType && (component = ((GenericArrayType) type).getGenericComponentType()) != null)) { @@ -819,7 +819,7 @@ else if (typeInfo instanceof PrimitiveArrayTypeInfo) { } // check for basic array else if (typeInfo instanceof BasicArrayTypeInfo) { - Type component = null; + Type component; // check if array at all if (!(type instanceof Class && ((Class) type).isArray() && (component = ((Class) type).getComponentType()) != null) && !(type instanceof GenericArrayType && (component = ((GenericArrayType) type).getGenericComponentType()) != null)) { @@ -844,7 +844,7 @@ else if (typeInfo instanceof ObjectArrayTypeInfo) { } // check component - Type component = null; + Type component; if (type instanceof Class) { component = ((Class) type).getComponentType(); } else { @@ -1428,8 +1428,8 @@ private static boolean sameTypeVars(Type t1, Type t2) { if (!(t1 instanceof TypeVariable) || !(t2 instanceof TypeVariable)) { return false; } - return ((TypeVariable) t1).getName().equals(((TypeVariable)t2).getName()) - && ((TypeVariable) t1).getGenericDeclaration().equals(((TypeVariable)t2).getGenericDeclaration()); + return ((TypeVariable) t1).getName().equals(((TypeVariable) t2).getName()) + && ((TypeVariable) t1).getGenericDeclaration().equals(((TypeVariable) t2).getGenericDeclaration()); } private static TypeInformation getTypeOfPojoField(TypeInformation pojoInfo, Field field) { diff --git a/flink-java/src/test/java/org/apache/flink/api/java/record/ReduceWrappingFunctionTest.java b/flink-java/src/test/java/org/apache/flink/api/java/record/ReduceWrappingFunctionTest.java index 2216217b3a73d..89baa98797a51 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/record/ReduceWrappingFunctionTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/record/ReduceWrappingFunctionTest.java @@ -18,15 +18,12 @@ package org.apache.flink.api.java.record; -import static org.junit.Assert.*; - import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; - import org.apache.commons.lang3.SerializationUtils; -import org.apache.flink.api.common.functions.FlatCombineFunction; +import org.apache.flink.api.common.functions.GroupCombineFunction; import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.functions.RichFunction; import org.apache.flink.api.common.operators.SingleInputSemanticProperties; @@ -43,6 +40,11 @@ import org.apache.flink.util.Collector; import org.junit.Test; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + @SuppressWarnings({ "serial", "deprecation" }) public class ReduceWrappingFunctionTest { @@ -86,7 +88,7 @@ public void close() {} target.clear(); // test combine - ((FlatCombineFunction) reducer).combine(source, collector); + ((GroupCombineFunction) reducer).combine(source, collector); assertEquals(2, target.size()); assertEquals(new IntValue(42), target.get(0).getField(0, IntValue.class)); assertEquals(new LongValue(11), target.get(0).getField(1, LongValue.class)); @@ -138,7 +140,7 @@ public void close() {} target.clear(); // test combine - ((FlatCombineFunction) reducer).combine(source, collector); + ((GroupCombineFunction) reducer).combine(source, collector); assertEquals(2, target.size()); assertEquals(new IntValue(42), target.get(0).getField(0, IntValue.class)); assertEquals(new LongValue(11), target.get(0).getField(1, LongValue.class)); @@ -227,5 +229,5 @@ public void open(Configuration parameters) throws Exception { methodCounter.incrementAndGet(); super.open(parameters); } - }; + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupCombineDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupCombineDriver.java index 7d87a6b8d40d7..7b279ee649520 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupCombineDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupCombineDriver.java @@ -20,7 +20,7 @@ package org.apache.flink.runtime.operators; import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.functions.FlatCombineFunction; +import org.apache.flink.api.common.functions.GroupCombineFunction; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerFactory; import org.apache.flink.runtime.util.NonReusingMutableToRegularIteratorWrapper; @@ -35,20 +35,20 @@ * Like @org.apache.flink.runtime.operators.GroupCombineDriver but without grouping and sorting. May emit partially * reduced results. * -* @see org.apache.flink.api.common.functions.FlatCombineFunction +* @see GroupCombineFunction */ -public class AllGroupCombineDriver implements PactDriver, OUT> { +public class AllGroupCombineDriver implements PactDriver, OUT> { private static final Logger LOG = LoggerFactory.getLogger(AllGroupCombineDriver.class); - private PactTaskContext, OUT> taskContext; + private PactTaskContext, OUT> taskContext; private boolean objectReuseEnabled = false; // ------------------------------------------------------------------------ @Override - public void setup(PactTaskContext, OUT> context) { + public void setup(PactTaskContext, OUT> context) { this.taskContext = context; } @@ -58,9 +58,9 @@ public int getNumberOfInputs() { } @Override - public Class> getStubType() { + public Class> getStubType() { @SuppressWarnings("unchecked") - final Class> clazz = (Class>) (Class) FlatCombineFunction.class; + final Class> clazz = (Class>) (Class) GroupCombineFunction.class; return clazz; } @@ -95,7 +95,7 @@ public void run() throws Exception { TypeSerializer serializer = serializerFactory.getSerializer(); final MutableObjectIterator in = this.taskContext.getInput(0); - final FlatCombineFunction reducer = this.taskContext.getStub(); + final GroupCombineFunction reducer = this.taskContext.getStub(); final Collector output = this.taskContext.getOutputCollector(); if (objectReuseEnabled) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java index ad1afdbdd68d2..a20fddf02f811 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java @@ -23,7 +23,7 @@ import org.apache.flink.runtime.util.NonReusingMutableToRegularIteratorWrapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.flink.api.common.functions.FlatCombineFunction; +import org.apache.flink.api.common.functions.GroupCombineFunction; import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.runtime.operators.util.TaskConfig; @@ -92,8 +92,8 @@ public void prepare() throws Exception { switch (this.strategy) { case ALL_GROUP_REDUCE_COMBINE: - if (!(this.taskContext.getStub() instanceof FlatCombineFunction)) { - throw new Exception("Using combiner on a UDF that does not implement the combiner interface " + FlatCombineFunction.class.getName()); + if (!(this.taskContext.getStub() instanceof GroupCombineFunction)) { + throw new Exception("Using combiner on a UDF that does not implement the combiner interface " + GroupCombineFunction.class.getName()); } case ALL_GROUP_REDUCE: case ALL_GROUP_COMBINE: @@ -129,7 +129,7 @@ public void run() throws Exception { final Collector output = this.taskContext.getOutputCollector(); reducer.reduce(inIter, output); } else if (strategy == DriverStrategy.ALL_GROUP_REDUCE_COMBINE || strategy == DriverStrategy.ALL_GROUP_COMBINE) { - @SuppressWarnings("unchecked") final FlatCombineFunction combiner = (FlatCombineFunction) this.taskContext.getStub(); + @SuppressWarnings("unchecked") final GroupCombineFunction combiner = (GroupCombineFunction) this.taskContext.getStub(); final Collector output = this.taskContext.getOutputCollector(); combiner.combine(inIter, output); } else { @@ -147,7 +147,7 @@ public void run() throws Exception { final Collector output = this.taskContext.getOutputCollector(); reducer.reduce(inIter, output); } else if (strategy == DriverStrategy.ALL_GROUP_REDUCE_COMBINE || strategy == DriverStrategy.ALL_GROUP_COMBINE) { - @SuppressWarnings("unchecked") final FlatCombineFunction combiner = (FlatCombineFunction) this.taskContext.getStub(); + @SuppressWarnings("unchecked") final GroupCombineFunction combiner = (GroupCombineFunction) this.taskContext.getStub(); final Collector output = this.taskContext.getOutputCollector(); combiner.combine(inIter, output); } else { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java index dacd43667f479..493eb4f9acb74 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java @@ -23,7 +23,7 @@ import org.apache.flink.runtime.util.NonReusingKeyGroupedIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.flink.api.common.functions.FlatCombineFunction; +import org.apache.flink.api.common.functions.GroupCombineFunction; import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerFactory; @@ -45,7 +45,7 @@ * the user supplied a RichGroupReduceFunction with a combine method. The combining is performed in memory with a * lazy approach which only combines elements which currently fit in the sorter. This may lead to a partial solution. * In the case of the RichGroupReduceFunction this partial result is transformed into a proper deterministic result. - * The CombineGroup uses the FlatCombineFunction interface which allows to combine values of type to any type + * The CombineGroup uses the GroupCombineFunction interface which allows to combine values of type to any type * of type . In contrast, the RichGroupReduceFunction requires the combine method to have the same input and * output type to be able to reduce the elements after the combine from to . * @@ -54,18 +54,18 @@ * @param The data type consumed by the combiner. * @param The data type produced by the combiner. */ -public class GroupReduceCombineDriver implements PactDriver, OUT> { +public class GroupReduceCombineDriver implements PactDriver, OUT> { private static final Logger LOG = LoggerFactory.getLogger(GroupReduceCombineDriver.class); /** Fix length records with a length below this threshold will be in-place sorted, if possible. */ private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32; - private PactTaskContext, OUT> taskContext; + private PactTaskContext, OUT> taskContext; private InMemorySorter sorter; - private FlatCombineFunction combiner; + private GroupCombineFunction combiner; private TypeSerializer serializer; @@ -86,7 +86,7 @@ public class GroupReduceCombineDriver implements PactDriver, OUT> context) { + public void setup(PactTaskContext, OUT> context) { this.taskContext = context; this.running = true; } @@ -97,9 +97,9 @@ public int getNumberOfInputs() { } @Override - public Class> getStubType() { + public Class> getStubType() { @SuppressWarnings("unchecked") - final Class> clazz = (Class>) (Class) FlatCombineFunction.class; + final Class> clazz = (Class>) (Class) GroupCombineFunction.class; return clazz; } @@ -188,7 +188,7 @@ private void sortAndCombine() throws Exception { final ReusingKeyGroupedIterator keyIter = new ReusingKeyGroupedIterator(sorter.getIterator(), this.serializer, this.groupingComparator); - final FlatCombineFunction combiner = this.combiner; + final GroupCombineFunction combiner = this.combiner; final Collector output = this.output; // iterate over key groups @@ -203,7 +203,7 @@ private void sortAndCombine() throws Exception { final NonReusingKeyGroupedIterator keyIter = new NonReusingKeyGroupedIterator(sorter.getIterator(), this.groupingComparator); - final FlatCombineFunction combiner = this.combiner; + final GroupCombineFunction combiner = this.combiner; final Collector output = this.output; // iterate over key groups diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java index ca110c2d7eac5..71b8afc74f05a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java @@ -23,7 +23,7 @@ import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.accumulators.AccumulatorHelper; import org.apache.flink.api.common.distributions.DataDistribution; -import org.apache.flink.api.common.functions.FlatCombineFunction; +import org.apache.flink.api.common.functions.GroupCombineFunction; import org.apache.flink.api.common.functions.Function; import org.apache.flink.api.common.functions.Partitioner; import org.apache.flink.api.common.functions.util.FunctionUtils; @@ -525,7 +525,9 @@ protected void run() throws Exception { try { FunctionUtils.closeFunction(this.stub); } - catch (Throwable t) {} + catch (Throwable t) { + // do nothing + } } // if resettable driver invoke teardown @@ -1006,13 +1008,13 @@ this.inputIterators[inputNum], this, this.inputSerializers[inputNum], getLocalSt (e.getMessage() == null ? "." : ": " + e.getMessage()), e); } - if (!(localStub instanceof FlatCombineFunction)) { + if (!(localStub instanceof GroupCombineFunction)) { throw new IllegalStateException("Performing combining sort outside a reduce task!"); } @SuppressWarnings({ "rawtypes", "unchecked" }) CombiningUnilateralSortMerger cSorter = new CombiningUnilateralSortMerger( - (FlatCombineFunction) localStub, getMemoryManager(), getIOManager(), this.inputIterators[inputNum], + (GroupCombineFunction) localStub, getMemoryManager(), getIOManager(), this.inputIterators[inputNum], this, this.inputSerializers[inputNum], getLocalStrategyComparator(inputNum), this.config.getRelativeMemoryInput(inputNum), this.config.getFilehandlesInput(inputNum), this.config.getSpillingThresholdInput(inputNum)); @@ -1467,7 +1469,9 @@ public static void cancelChainedTasks(List> tasks) { for (int i = 0; i < tasks.size(); i++) { try { tasks.get(i).cancelTask(); - } catch (Throwable t) {} + } catch (Throwable t) { + // do nothing + } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java index 7e36b49eceac3..41161454dc4ae 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java @@ -21,7 +21,7 @@ import java.io.IOException; import java.util.List; -import org.apache.flink.api.common.functions.FlatCombineFunction; +import org.apache.flink.api.common.functions.GroupCombineFunction; import org.apache.flink.api.common.functions.Function; import org.apache.flink.api.common.functions.util.FunctionUtils; import org.apache.flink.api.common.typeutils.TypeComparator; @@ -67,7 +67,7 @@ public class SynchronousChainedCombineDriver extends ChainedDriver sorter; - private FlatCombineFunction combiner; + private GroupCombineFunction combiner; private TypeSerializer serializer; @@ -90,8 +90,8 @@ public void setup(AbstractInvokable parent) { this.parent = parent; @SuppressWarnings("unchecked") - final FlatCombineFunction combiner = - RegularPactTask.instantiateUserCode(this.config, userCodeClassLoader, FlatCombineFunction.class); + final GroupCombineFunction combiner = + RegularPactTask.instantiateUserCode(this.config, userCodeClassLoader, GroupCombineFunction.class); this.combiner = combiner; FunctionUtils.setFunctionRuntimeContext(combiner, getUdfRuntimeContext()); } @@ -210,7 +210,7 @@ private void sortAndCombine() throws Exception { // cache references on the stack - final FlatCombineFunction stub = this.combiner; + final GroupCombineFunction stub = this.combiner; final Collector output = this.outputCollector; // run stub implementation @@ -226,7 +226,7 @@ private void sortAndCombine() throws Exception { // cache references on the stack - final FlatCombineFunction stub = this.combiner; + final GroupCombineFunction stub = this.combiner; final Collector output = this.outputCollector; // run stub implementation diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java index 9282fd48cd39a..8da9413146d33 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java @@ -18,7 +18,7 @@ package org.apache.flink.runtime.operators.sort; -import org.apache.flink.api.common.functions.FlatCombineFunction; +import org.apache.flink.api.common.functions.GroupCombineFunction; import org.apache.flink.api.common.functions.util.FunctionUtils; import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypeSerializer; @@ -70,7 +70,7 @@ public class CombiningUnilateralSortMerger extends UnilateralSortMerger { */ private static final Logger LOG = LoggerFactory.getLogger(CombiningUnilateralSortMerger.class); - private final FlatCombineFunction combineStub; // the user code stub that does the combining + private final GroupCombineFunction combineStub; // the user code stub that does the combining private Configuration udfConfig; @@ -100,7 +100,7 @@ public class CombiningUnilateralSortMerger extends UnilateralSortMerger { * @throws MemoryAllocationException Thrown, if not enough memory can be obtained from the memory manager to * perform the sort. */ - public CombiningUnilateralSortMerger(FlatCombineFunction combineStub, MemoryManager memoryManager, IOManager ioManager, + public CombiningUnilateralSortMerger(GroupCombineFunction combineStub, MemoryManager memoryManager, IOManager ioManager, MutableObjectIterator input, AbstractInvokable parentTask, TypeSerializerFactory serializerFactory, TypeComparator comparator, double memoryFraction, int maxNumFileHandles, float startSpillingFraction) @@ -132,7 +132,7 @@ public CombiningUnilateralSortMerger(FlatCombineFunction combineStub, Memo * @throws MemoryAllocationException Thrown, if not enough memory can be obtained from the memory manager to * perform the sort. */ - public CombiningUnilateralSortMerger(FlatCombineFunction combineStub, MemoryManager memoryManager, IOManager ioManager, + public CombiningUnilateralSortMerger(GroupCombineFunction combineStub, MemoryManager memoryManager, IOManager ioManager, MutableObjectIterator input, AbstractInvokable parentTask, TypeSerializerFactory serializerFactory, TypeComparator comparator, double memoryFraction, int numSortBuffers, int maxNumFileHandles, @@ -189,7 +189,7 @@ public void go() throws IOException { // ------------------- In-Memory Cache ------------------------ final Queue> cache = new ArrayDeque>(); - CircularElement element = null; + CircularElement element; boolean cacheOnly = false; // fill cache @@ -253,7 +253,7 @@ else if (element == endMarker()) { // ------------------- Spilling Phase ------------------------ - final FlatCombineFunction combineStub = CombiningUnilateralSortMerger.this.combineStub; + final GroupCombineFunction combineStub = CombiningUnilateralSortMerger.this.combineStub; // now that we are actually spilling, take the combiner, and open it try { @@ -463,7 +463,7 @@ protected ChannelWithBlockCount mergeChannels(List channe this.memManager.getPageSize()); final WriterCollector collector = new WriterCollector(output, this.serializer); - final FlatCombineFunction combineStub = CombiningUnilateralSortMerger.this.combineStub; + final GroupCombineFunction combineStub = CombiningUnilateralSortMerger.this.combineStub; // combine and write to disk try { @@ -573,7 +573,7 @@ public Iterator iterator() { throw new TraversableOnceException(); } } - }; + } // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java index 4b92b717352e2..5c3ecf38fb7e4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java @@ -18,11 +18,6 @@ package org.apache.flink.runtime.blob; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - import java.io.File; import java.net.InetSocketAddress; import java.net.URISyntaxException; @@ -33,6 +28,11 @@ import org.apache.flink.configuration.Configuration; import org.junit.Test; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + /** * This class contains unit tests for the {@link BlobCache}. */ @@ -70,8 +70,8 @@ public void testBlobCache() { blobCache = new BlobCache(serverAddress, new Configuration()); - for(int i = 0; i < blobKeys.size(); i++){ - blobCache.getURL(blobKeys.get(i)); + for (BlobKey blobKey : blobKeys) { + blobCache.getURL(blobKey); } // Now, shut down the BLOB server, the BLOBs must still be accessible through the cache. @@ -87,9 +87,7 @@ public void testBlobCache() { // Verify the result assertEquals(blobKeys.size(), urls.length); - for (int i = 0; i < urls.length; ++i) { - - final URL url = urls[i]; + for (final URL url : urls) { assertNotNull(url); diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala index 12911813efe9f..2732112e4413c 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala @@ -641,7 +641,7 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) { * arbitrary output type. */ def combineGroup[R: TypeInformation: ClassTag]( - combiner: FlatCombineFunction[T, R]): DataSet[R] = { + combiner: GroupCombineFunction[T, R]): DataSet[R] = { if (combiner == null) { throw new NullPointerException("Combine function must not be null.") } @@ -670,7 +670,7 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) { if (fun == null) { throw new NullPointerException("Combine function must not be null.") } - val combiner = new FlatCombineFunction[T, R] { + val combiner = new GroupCombineFunction[T, R] { val cleanFun = clean(fun) def combine(in: java.lang.Iterable[T], out: Collector[R]) { cleanFun(in.iterator().asScala, out) diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala index eca4563ad2d12..d547ea4b5ece1 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala @@ -22,7 +22,7 @@ import org.apache.flink.api.java.functions.{KeySelector, FirstReducer} import org.apache.flink.api.scala.operators.ScalaAggregateOperator import scala.collection.JavaConverters._ import org.apache.commons.lang3.Validate -import org.apache.flink.api.common.functions.{FlatCombineFunction, GroupReduceFunction, ReduceFunction, Partitioner} +import org.apache.flink.api.common.functions.{GroupCombineFunction, GroupReduceFunction, ReduceFunction, Partitioner} import org.apache.flink.api.common.operators.Order import org.apache.flink.api.java.aggregation.Aggregations import org.apache.flink.api.java.operators._ @@ -370,7 +370,7 @@ class GroupedDataSet[T: ClassTag]( def combineGroup[R: TypeInformation: ClassTag]( fun: (Iterator[T], Collector[R]) => Unit): DataSet[R] = { Validate.notNull(fun, "GroupCombine function must not be null.") - val combiner = new FlatCombineFunction[T, R] { + val combiner = new GroupCombineFunction[T, R] { val cleanFun = set.clean(fun) def combine(in: java.lang.Iterable[T], out: Collector[R]) { cleanFun(in.iterator().asScala, out) @@ -396,7 +396,7 @@ class GroupedDataSet[T: ClassTag]( * arbitrary output type. */ def combineGroup[R: TypeInformation: ClassTag]( - combiner: FlatCombineFunction[T, R]): DataSet[R] = { + combiner: GroupCombineFunction[T, R]): DataSet[R] = { Validate.notNull(combiner, "GroupCombine function must not be null.") wrap( new GroupCombineOperator[T, R](maybeCreateSortedGrouping(), diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomRankCombiner.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomRankCombiner.java index 6631f070f5cc3..e2a160dfcf372 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomRankCombiner.java +++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomRankCombiner.java @@ -21,14 +21,14 @@ import java.util.Iterator; import org.apache.flink.api.common.functions.AbstractRichFunction; -import org.apache.flink.api.common.functions.FlatCombineFunction; +import org.apache.flink.api.common.functions.GroupCombineFunction; import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithRank; import org.apache.flink.util.Collector; public class CustomRankCombiner extends AbstractRichFunction implements GroupReduceFunction, - FlatCombineFunction + GroupCombineFunction { private static final long serialVersionUID = 1L; diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupCombineITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupCombineITCase.java index 2a97c607f8336..3e9fde750398a 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupCombineITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupCombineITCase.java @@ -18,7 +18,7 @@ package org.apache.flink.test.javaApiOperators; -import org.apache.flink.api.common.functions.FlatCombineFunction; +import org.apache.flink.api.common.functions.GroupCombineFunction; import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.operators.Order; @@ -284,7 +284,7 @@ public void testCheckPartitionShuffleGroupBy() throws Exception { // partition and group data UnsortedGrouping> partitionedDS = ds.partitionByHash(0).groupBy(1); - partitionedDS.combineGroup(new FlatCombineFunction, Tuple2>() { + partitionedDS.combineGroup(new GroupCombineFunction, Tuple2>() { @Override public void combine(Iterable> values, Collector> out) throws Exception { int count = 0; @@ -334,7 +334,7 @@ public void testCheckPartitionShuffleDOP1() throws Exception { // partition and group data UnsortedGrouping> partitionedDS = ds.partitionByHash(0).groupBy(1); - partitionedDS.combineGroup(new FlatCombineFunction, Tuple2>() { + partitionedDS.combineGroup(new GroupCombineFunction, Tuple2>() { @Override public void combine(Iterable> values, Collector> out) throws Exception { int count = 0; @@ -372,21 +372,21 @@ public Tuple1 map(String value) throws Exception { }); // all methods on DataSet - ds.combineGroup(new FlatCombineFunctionExample()) + ds.combineGroup(new GroupCombineFunctionExample()) .output(new DiscardingOutputFormat>()); // all methods on UnsortedGrouping - ds.groupBy(0).combineGroup(new FlatCombineFunctionExample()) + ds.groupBy(0).combineGroup(new GroupCombineFunctionExample()) .output(new DiscardingOutputFormat>()); // all methods on SortedGrouping - ds.groupBy(0).sortGroup(0, Order.ASCENDING).combineGroup(new FlatCombineFunctionExample()) + ds.groupBy(0).sortGroup(0, Order.ASCENDING).combineGroup(new GroupCombineFunctionExample()) .output(new DiscardingOutputFormat>()); env.execute(); } - public static class FlatCombineFunctionExample implements FlatCombineFunction, Tuple1> { + public static class GroupCombineFunctionExample implements GroupCombineFunction, Tuple1> { @Override public void combine(Iterable> values, Collector> out) throws Exception { @@ -396,7 +396,7 @@ public void combine(Iterable> values, Collector> o } } - public static class ScalaFlatCombineFunctionExample implements FlatCombineFunction, scala.Tuple1> { + public static class ScalaGroupCombineFunctionExample implements GroupCombineFunction, scala.Tuple1> { @Override public void combine(Iterable> values, Collector> out) throws Exception { @@ -406,7 +406,7 @@ public void combine(Iterable> values, Collector, Tuple3>, + public static class IdentityFunction implements GroupCombineFunction, Tuple3>, GroupReduceFunction, Tuple3> { @Override @@ -510,7 +510,7 @@ public Tuple2> map(Tuple3 extends FlatCombineFunction, GroupReduceFunction { + public interface CombineAndReduceGroup extends GroupCombineFunction, GroupReduceFunction { } public interface KvGroupReduce extends CombineAndReduceGroup, Tuple2, Tuple2> { diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala index ef484df289179..380b3bca0cf5a 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala @@ -43,7 +43,7 @@ class GroupCombineITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa .map(str => Tuple1(str)) // all methods on DataSet - ds.combineGroup(new GroupCombineITCase.ScalaFlatCombineFunctionExample()) + ds.combineGroup(new GroupCombineITCase.ScalaGroupCombineFunctionExample()) .output(new DiscardingOutputFormat[Tuple1[String]]) ds.combineGroup((in, out: Collector[Tuple1[String]]) => in.toSet foreach (out.collect)) @@ -51,7 +51,7 @@ class GroupCombineITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa // all methods on UnsortedGrouping ds.groupBy(0) - .combineGroup(new GroupCombineITCase.ScalaFlatCombineFunctionExample()) + .combineGroup(new GroupCombineITCase.ScalaGroupCombineFunctionExample()) .output(new DiscardingOutputFormat[Tuple1[String]]) ds.groupBy(0) @@ -60,7 +60,7 @@ class GroupCombineITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa // all methods on SortedGrouping ds.groupBy(0).sortGroup(0, Order.ASCENDING) - .combineGroup(new GroupCombineITCase.ScalaFlatCombineFunctionExample()) + .combineGroup(new GroupCombineITCase.ScalaGroupCombineFunctionExample()) .output(new DiscardingOutputFormat[Tuple1[String]]) ds.groupBy(0).sortGroup(0, Order.ASCENDING)