Skip to content

Commit

Permalink
[Flink-1780] Rename FlatCombineFunction to GroupCombineFunction
Browse files Browse the repository at this point in the history
This closes #530
  • Loading branch information
smarthi authored and mxm committed Mar 25, 2015
1 parent ae04025 commit 033c69f
Show file tree
Hide file tree
Showing 32 changed files with 157 additions and 156 deletions.
2 changes: 1 addition & 1 deletion docs/dataset_transformations.md
Expand Up @@ -554,7 +554,7 @@ an alternative WordCount implementation. In the implementation,
DataSet<String> input = [..] // The words received as input
DataSet<String> groupedInput = input.groupBy(0); // group identical words

DataSet<Tuple2<String, Integer>> combinedWords = groupedInput.combineGroup(new FlatCombineFunction<String, Tuple2<String, Integer>() {
DataSet<Tuple2<String, Integer>> combinedWords = groupedInput.combineGroup(new GroupCombineFunction<String, Tuple2<String, Integer>() {

public void combine(Iterable<String> words, Collector<Tuple2<String, Integer>>) { // combine
int count = 0;
Expand Down
Expand Up @@ -29,7 +29,7 @@
* reduce the data volume earlier, before the entire groups have been collected.
* <p>
* This special variant of the combine function reduces the group of elements into a single element. A variant
* that can return multiple values per group is defined in {@link FlatCombineFunction}.
* that can return multiple values per group is defined in {@link GroupCombineFunction}.
*
* @param <IN> The data type processed by the combine function.
* @param <OUT> The data type emitted by the combine function.
Expand Down
Expand Up @@ -36,14 +36,14 @@
* @param <IN> The data type processed by the combine function.
* @param <OUT> The data type emitted by the combine function.
*/
public interface FlatCombineFunction<IN, OUT> extends Function, Serializable {
public interface GroupCombineFunction<IN, OUT> extends Function, Serializable {

/**
* The combine method, called (potentially multiple timed) with subgroups of elements.
*
*
* @param values The elements to be combined.
* @param out The collector to use to return values from the function.
*
*
* @throws Exception The function may throw Exceptions, which will cause the program to cancel,
* and may trigger the recovery logic.
*/
Expand Down
Expand Up @@ -19,21 +19,18 @@
package org.apache.flink.api.common.functions;


import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.api.common.functions.FlatCombineFunction;
import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.util.Collector;

/**
* Rich variant of the {@link FlatCombineFunction}. As a {@link RichFunction}, it gives access to the
* Rich variant of the {@link GroupCombineFunction}. As a {@link RichFunction}, it gives access to the
* {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods:
* {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
* {@link RichFunction#close()}.
*
* @param <IN> The data type of the elements to be combined.
* @param <OUT> The resulting data type of the elements to be combined.
*/
public abstract class RichFlatCombineFunction<IN, OUT> extends AbstractRichFunction implements FlatCombineFunction<IN, OUT> {
public abstract class RichGroupCombineFunction<IN, OUT> extends AbstractRichFunction implements GroupCombineFunction<IN, OUT> {

private static final long serialVersionUID = 1L;

Expand Down
Expand Up @@ -34,7 +34,7 @@
* @param <IN> Type of the elements that this function processes.
* @param <OUT> The type of the elements returned by the user-defined function.
*/
public abstract class RichGroupReduceFunction<IN, OUT> extends AbstractRichFunction implements GroupReduceFunction<IN, OUT>, FlatCombineFunction<IN, IN> {
public abstract class RichGroupReduceFunction<IN, OUT> extends AbstractRichFunction implements GroupReduceFunction<IN, OUT>, GroupCombineFunction<IN, IN> {

private static final long serialVersionUID = 1L;

Expand Down Expand Up @@ -83,5 +83,5 @@ public void combine(Iterable<IN> values, Collector<IN> out) throws Exception {
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public static @interface Combinable {};
public static @interface Combinable {}
}
Expand Up @@ -22,7 +22,7 @@
import org.apache.commons.lang3.ArrayUtils;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.functions.FlatCombineFunction;
import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.functions.util.CopyingListCollector;
import org.apache.flink.api.common.functions.util.FunctionUtils;
Expand All @@ -46,7 +46,7 @@
* This class is later processed by the compiler to generate the plan.
* @see org.apache.flink.api.common.functions.CombineFunction
*/
public class GroupCombineOperatorBase<IN, OUT, FT extends FlatCombineFunction<IN, OUT>> extends SingleInputOperator<IN, OUT, FT> {
public class GroupCombineOperatorBase<IN, OUT, FT extends GroupCombineFunction<IN, OUT>> extends SingleInputOperator<IN, OUT, FT> {


/** The ordering for the order inside a reduce group. */
Expand Down Expand Up @@ -81,7 +81,7 @@ public Ordering getGroupOrder() {

@Override
protected List<OUT> executeOnCollections(List<IN> inputData, RuntimeContext ctx, ExecutionConfig executionConfig) throws Exception {
FlatCombineFunction<IN, OUT> function = this.userFunction.getUserCodeObject();
GroupCombineFunction<IN, OUT> function = this.userFunction.getUserCodeObject();

UnaryOperatorInformation<IN, OUT> operatorInfo = getOperatorInfo();
TypeInformation<IN> inputType = operatorInfo.getInputType();
Expand Down
Expand Up @@ -21,7 +21,7 @@
import org.apache.commons.lang3.ArrayUtils;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.functions.FlatCombineFunction;
import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.functions.RuntimeContext;
Expand Down Expand Up @@ -107,15 +107,15 @@ public Ordering getGroupOrder() {
/**
* Marks the group reduce operation as combinable. Combinable operations may pre-reduce the
* data before the actual group reduce operations. Combinable user-defined functions
* must implement the interface {@link org.apache.flink.api.common.functions.FlatCombineFunction}.
* must implement the interface {@link GroupCombineFunction}.
*
* @param combinable Flag to mark the group reduce operation as combinable.
*/
public void setCombinable(boolean combinable) {
// sanity check
if (combinable && !FlatCombineFunction.class.isAssignableFrom(this.userFunction.getUserCodeClass())) {
if (combinable && !GroupCombineFunction.class.isAssignableFrom(this.userFunction.getUserCodeClass())) {
throw new IllegalArgumentException("Cannot set a UDF as combinable if it does not implement the interface " +
FlatCombineFunction.class.getName());
GroupCombineFunction.class.getName());
} else {
this.combinable = combinable;
}
Expand Down
Expand Up @@ -24,7 +24,7 @@
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatCombineFunction;
import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.InvalidTypesException;
Expand Down Expand Up @@ -473,7 +473,7 @@ public <R> GroupReduceOperator<T, R> reduceGroup(GroupReduceFunction<T, R> reduc
* @param combiner The CombineFunction that is applied on the DataSet.
* @return A GroupCombineOperator which represents the combined DataSet.
*/
public <R> GroupCombineOperator<T, R> combineGroup(FlatCombineFunction<T, R> combiner) {
public <R> GroupCombineOperator<T, R> combineGroup(GroupCombineFunction<T, R> combiner) {
if (combiner == null) {
throw new NullPointerException("GroupReduce function must not be null.");
}
Expand Down
Expand Up @@ -17,13 +17,13 @@
*/

package org.apache.flink.api.java.functions;
import org.apache.flink.api.common.functions.FlatCombineFunction;
import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable;
import org.apache.flink.util.Collector;

@Combinable
public class FirstReducer<T> implements GroupReduceFunction<T, T>, FlatCombineFunction<T, T> {
public class FirstReducer<T> implements GroupReduceFunction<T, T>, GroupCombineFunction<T, T> {
private static final long serialVersionUID = 1L;

private final int count;
Expand Down
Expand Up @@ -18,7 +18,7 @@

package org.apache.flink.api.java.operators;

import org.apache.flink.api.common.functions.FlatCombineFunction;
import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.operators.Operator;
import org.apache.flink.api.common.operators.Order;
Expand Down Expand Up @@ -46,7 +46,7 @@
*/
public class GroupCombineOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT, GroupCombineOperator<IN, OUT>> {

private final FlatCombineFunction<IN, OUT> function;
private final GroupCombineFunction<IN, OUT> function;

private final Grouping<IN> grouper;

Expand All @@ -60,7 +60,7 @@ public class GroupCombineOperator<IN, OUT> extends SingleInputUdfOperator<IN, OU
* @param function The user-defined GroupReduce function.
* @param defaultName The operator's name.
*/
public GroupCombineOperator(DataSet<IN> input, TypeInformation<OUT> resultType, FlatCombineFunction<IN, OUT> function, String defaultName) {
public GroupCombineOperator(DataSet<IN> input, TypeInformation<OUT> resultType, GroupCombineFunction<IN, OUT> function, String defaultName) {
super(input, resultType);
this.function = function;
this.grouper = null;
Expand All @@ -73,7 +73,7 @@ public GroupCombineOperator(DataSet<IN> input, TypeInformation<OUT> resultType,
* @param input The grouped input to be processed group-wise by the groupReduce function.
* @param function The user-defined GroupReduce function.
*/
public GroupCombineOperator(Grouping<IN> input, TypeInformation<OUT> resultType, FlatCombineFunction<IN, OUT> function, String defaultName) {
public GroupCombineOperator(Grouping<IN> input, TypeInformation<OUT> resultType, GroupCombineFunction<IN, OUT> function, String defaultName) {
super(input != null ? input.getDataSet() : null, resultType);

this.function = function;
Expand All @@ -82,7 +82,7 @@ public GroupCombineOperator(Grouping<IN> input, TypeInformation<OUT> resultType,
}

@Override
protected FlatCombineFunction<IN, OUT> getFunction() {
protected GroupCombineFunction<IN, OUT> getFunction() {
return function;
}

Expand All @@ -99,8 +99,8 @@ protected FlatCombineFunction<IN, OUT> getFunction() {
if (grouper == null) {
// non grouped reduce
UnaryOperatorInformation<IN, OUT> operatorInfo = new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType());
GroupCombineOperatorBase<IN, OUT, FlatCombineFunction<IN, OUT>> po =
new GroupCombineOperatorBase<IN, OUT, FlatCombineFunction<IN, OUT>>(function, operatorInfo, new int[0], name);
GroupCombineOperatorBase<IN, OUT, GroupCombineFunction<IN, OUT>> po =
new GroupCombineOperatorBase<IN, OUT, GroupCombineFunction<IN, OUT>>(function, operatorInfo, new int[0], name);

po.setInput(input);
// the parallelism for a non grouped reduce can only be 1
Expand Down Expand Up @@ -144,8 +144,8 @@ else if (grouper.getKeys() instanceof Keys.ExpressionKeys) {

int[] logicalKeyPositions = grouper.getKeys().computeLogicalKeyPositions();
UnaryOperatorInformation<IN, OUT> operatorInfo = new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType());
GroupCombineOperatorBase<IN, OUT, FlatCombineFunction<IN, OUT>> po =
new GroupCombineOperatorBase<IN, OUT, FlatCombineFunction<IN, OUT>>(function, operatorInfo, logicalKeyPositions, name);
GroupCombineOperatorBase<IN, OUT, GroupCombineFunction<IN, OUT>> po =
new GroupCombineOperatorBase<IN, OUT, GroupCombineFunction<IN, OUT>>(function, operatorInfo, logicalKeyPositions, name);

po.setInput(input);
po.setParallelism(getParallelism());
Expand Down Expand Up @@ -175,7 +175,7 @@ else if (grouper.getKeys() instanceof Keys.ExpressionKeys) {
// --------------------------------------------------------------------------------------------

private static <IN, OUT, K> PlanUnwrappingGroupCombineOperator<IN, OUT, K> translateSelectorFunctionReducer(
Keys.SelectorFunctionKeys<IN, ?> rawKeys, FlatCombineFunction<IN, OUT> function,
Keys.SelectorFunctionKeys<IN, ?> rawKeys, GroupCombineFunction<IN, OUT> function,
TypeInformation<IN> inputType, TypeInformation<OUT> outputType, String name, Operator<IN> input)
{
@SuppressWarnings("unchecked")
Expand All @@ -199,7 +199,7 @@ private static <IN, OUT, K> PlanUnwrappingGroupCombineOperator<IN, OUT, K> trans
}

private static <IN, OUT, K1, K2> PlanUnwrappingSortedGroupCombineOperator<IN, OUT, K1, K2> translateSelectorFunctionSortedReducer(
Keys.SelectorFunctionKeys<IN, ?> rawGroupingKey, Keys.SelectorFunctionKeys<IN, ?> rawSortingKey, FlatCombineFunction<IN, OUT> function,
Keys.SelectorFunctionKeys<IN, ?> rawGroupingKey, Keys.SelectorFunctionKeys<IN, ?> rawSortingKey, GroupCombineFunction<IN, OUT> function,
TypeInformation<IN> inputType, TypeInformation<OUT> outputType, String name, Operator<IN> input)
{
@SuppressWarnings("unchecked")
Expand Down
Expand Up @@ -18,7 +18,7 @@

package org.apache.flink.api.java.operators;

import org.apache.flink.api.common.functions.FlatCombineFunction;
import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.operators.Operator;
Expand Down Expand Up @@ -88,7 +88,7 @@ public GroupReduceOperator(Grouping<IN> input, TypeInformation<OUT> resultType,
}

private void checkCombinability() {
if (function instanceof FlatCombineFunction &&
if (function instanceof GroupCombineFunction &&
function.getClass().getAnnotation(RichGroupReduceFunction.Combinable.class) != null) {
this.combinable = true;
}
Expand All @@ -111,7 +111,7 @@ public boolean isCombinable() {

public GroupReduceOperator<IN, OUT> setCombinable(boolean combinable) {
// sanity check that the function is a subclass of the combine interface
if (combinable && !(function instanceof FlatCombineFunction)) {
if (combinable && !(function instanceof GroupCombineFunction)) {
throw new IllegalArgumentException("The function does not implement the combine interface.");
}

Expand Down
Expand Up @@ -18,7 +18,7 @@

package org.apache.flink.api.java.operators;

import org.apache.flink.api.common.functions.FlatCombineFunction;
import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.CompositeType;
import org.apache.flink.api.java.Utils;
Expand Down Expand Up @@ -169,7 +169,7 @@ public <R> GroupReduceOperator<T, R> reduceGroup(GroupReduceFunction<T, R> reduc
* @param combiner The CombineFunction that is applied on the DataSet.
* @return A GroupCombineOperator which represents the combined DataSet.
*/
public <R> GroupCombineOperator<T, R> combineGroup(FlatCombineFunction<T, R> combiner) {
public <R> GroupCombineOperator<T, R> combineGroup(GroupCombineFunction<T, R> combiner) {
if (combiner == null) {
throw new NullPointerException("GroupReduce function must not be null.");
}
Expand Down
Expand Up @@ -19,7 +19,7 @@
package org.apache.flink.api.java.operators;

import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.functions.FlatCombineFunction;
import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.functions.ReduceFunction;
Expand Down Expand Up @@ -173,7 +173,7 @@ public <R> GroupReduceOperator<T, R> reduceGroup(GroupReduceFunction<T, R> reduc
* @param combiner The CombineFunction that is applied on the DataSet.
* @return A GroupCombineOperator which represents the combined DataSet.
*/
public <R> GroupCombineOperator<T, R> combineGroup(FlatCombineFunction<T, R> combiner) {
public <R> GroupCombineOperator<T, R> combineGroup(GroupCombineFunction<T, R> combiner) {
if (combiner == null) {
throw new NullPointerException("GroupReduce function must not be null.");
}
Expand Down
Expand Up @@ -18,7 +18,7 @@

package org.apache.flink.api.java.operators.translation;

import org.apache.flink.api.common.functions.FlatCombineFunction;
import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.operators.UnaryOperatorInformation;
import org.apache.flink.api.common.operators.base.GroupCombineOperatorBase;
import org.apache.flink.api.common.typeinfo.TypeInformation;
Expand All @@ -30,9 +30,9 @@
* A group combine operator that takes 2-tuples (key-value pairs), and applies the group combine operation only
* on the unwrapped values.
*/
public class PlanUnwrappingGroupCombineOperator<IN, OUT, K> extends GroupCombineOperatorBase<Tuple2<K, IN>, OUT, FlatCombineFunction<Tuple2<K, IN>, OUT>> {
public class PlanUnwrappingGroupCombineOperator<IN, OUT, K> extends GroupCombineOperatorBase<Tuple2<K, IN>, OUT, GroupCombineFunction<Tuple2<K, IN>, OUT>> {

public PlanUnwrappingGroupCombineOperator(FlatCombineFunction<IN, OUT> udf, Keys.SelectorFunctionKeys<IN, K> key, String name,
public PlanUnwrappingGroupCombineOperator(GroupCombineFunction<IN, OUT> udf, Keys.SelectorFunctionKeys<IN, K> key, String name,
TypeInformation<OUT> outType, TypeInformation<Tuple2<K, IN>> typeInfoWithKey)
{
super(new TupleUnwrappingGroupCombiner<IN, OUT, K>(udf),
Expand All @@ -42,15 +42,15 @@ public PlanUnwrappingGroupCombineOperator(FlatCombineFunction<IN, OUT> udf, Keys

// --------------------------------------------------------------------------------------------

public static final class TupleUnwrappingGroupCombiner<IN, OUT, K> extends WrappingFunction<FlatCombineFunction<IN, OUT>>
implements FlatCombineFunction<Tuple2<K, IN>, OUT>
public static final class TupleUnwrappingGroupCombiner<IN, OUT, K> extends WrappingFunction<GroupCombineFunction<IN, OUT>>
implements GroupCombineFunction<Tuple2<K, IN>, OUT>
{

private static final long serialVersionUID = 1L;

private final TupleUnwrappingIterator<IN, K> iter;

private TupleUnwrappingGroupCombiner(FlatCombineFunction<IN, OUT> wrapped) {
private TupleUnwrappingGroupCombiner(GroupCombineFunction<IN, OUT> wrapped) {
super(wrapped);
this.iter = new TupleUnwrappingIterator<IN, K>();
}
Expand Down

0 comments on commit 033c69f

Please sign in to comment.