Skip to content

Commit

Permalink
[FLINK-3254] [dataSet] Adding functionality to support the CombineFun…
Browse files Browse the repository at this point in the history
…ction contract.

This closes #1568
  • Loading branch information
kl0u authored and fhueske committed Feb 3, 2016
1 parent f7d1911 commit aabb268
Show file tree
Hide file tree
Showing 3 changed files with 399 additions and 21 deletions.
Expand Up @@ -18,9 +18,10 @@


package org.apache.flink.api.java.operators; package org.apache.flink.api.java.operators;


import org.apache.flink.api.common.functions.CombineFunction;
import org.apache.flink.api.common.functions.GroupCombineFunction; import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.operators.Keys; import org.apache.flink.api.java.operators.translation.CombineToGroupCombineWrapper;
import org.apache.flink.api.common.operators.Operator; import org.apache.flink.api.common.operators.Operator;
import org.apache.flink.api.common.operators.Order; import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.common.operators.Ordering; import org.apache.flink.api.common.operators.Ordering;
Expand All @@ -30,6 +31,7 @@
import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.SemanticPropUtil; import org.apache.flink.api.java.functions.SemanticPropUtil;
import org.apache.flink.api.common.operators.Keys.SelectorFunctionKeys; import org.apache.flink.api.common.operators.Keys.SelectorFunctionKeys;
import org.apache.flink.api.common.operators.Keys.ExpressionKeys;
import org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator; import org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator;
import org.apache.flink.api.java.operators.translation.PlanUnwrappingSortedReduceGroupOperator; import org.apache.flink.api.java.operators.translation.PlanUnwrappingSortedReduceGroupOperator;
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple2;
Expand All @@ -52,7 +54,7 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT


private static final Logger LOG = LoggerFactory.getLogger(GroupReduceOperator.class); private static final Logger LOG = LoggerFactory.getLogger(GroupReduceOperator.class);


private final GroupReduceFunction<IN, OUT> function; private GroupReduceFunction<IN, OUT> function;


private final Grouping<IN> grouper; private final Grouping<IN> grouper;


Expand All @@ -68,12 +70,12 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT
*/ */
public GroupReduceOperator(DataSet<IN> input, TypeInformation<OUT> resultType, GroupReduceFunction<IN, OUT> function, String defaultName) { public GroupReduceOperator(DataSet<IN> input, TypeInformation<OUT> resultType, GroupReduceFunction<IN, OUT> function, String defaultName) {
super(input, resultType); super(input, resultType);

this.function = function; this.function = function;
this.grouper = null; this.grouper = null;
this.defaultName = defaultName; this.defaultName = defaultName;


checkCombinability(); this.combinable = checkCombinability();
} }


/** /**
Expand All @@ -84,18 +86,18 @@ public GroupReduceOperator(DataSet<IN> input, TypeInformation<OUT> resultType, G
*/ */
public GroupReduceOperator(Grouping<IN> input, TypeInformation<OUT> resultType, GroupReduceFunction<IN, OUT> function, String defaultName) { public GroupReduceOperator(Grouping<IN> input, TypeInformation<OUT> resultType, GroupReduceFunction<IN, OUT> function, String defaultName) {
super(input != null ? input.getInputDataSet() : null, resultType); super(input != null ? input.getInputDataSet() : null, resultType);

this.function = function; this.function = function;
this.grouper = input; this.grouper = input;
this.defaultName = defaultName; this.defaultName = defaultName;


checkCombinability(); this.combinable = checkCombinability();


UdfOperatorUtils.analyzeSingleInputUdf(this, GroupReduceFunction.class, defaultName, function, grouper.keys); UdfOperatorUtils.analyzeSingleInputUdf(this, GroupReduceFunction.class, defaultName, function, grouper.keys);
} }


private void checkCombinability() { private boolean checkCombinability() {
if (function instanceof GroupCombineFunction) { if (function instanceof GroupCombineFunction || function instanceof CombineFunction) {


// check if the generic types of GroupCombineFunction and GroupReduceFunction match, i.e., // check if the generic types of GroupCombineFunction and GroupReduceFunction match, i.e.,
// GroupCombineFunction<IN, IN> and GroupReduceFunction<IN, OUT>. // GroupCombineFunction<IN, IN> and GroupReduceFunction<IN, OUT>.
Expand All @@ -110,7 +112,9 @@ private void checkCombinability() {
if (((ParameterizedType) genInterface).getRawType().equals(GroupReduceFunction.class)) { if (((ParameterizedType) genInterface).getRawType().equals(GroupReduceFunction.class)) {
reduceTypes = ((ParameterizedType) genInterface).getActualTypeArguments(); reduceTypes = ((ParameterizedType) genInterface).getActualTypeArguments();
// get parameters of GroupCombineFunction // get parameters of GroupCombineFunction
} else if (((ParameterizedType) genInterface).getRawType().equals(GroupCombineFunction.class)) { } else if ((((ParameterizedType) genInterface).getRawType().equals(GroupCombineFunction.class)) ||
(((ParameterizedType) genInterface).getRawType().equals(CombineFunction.class))) {

combineTypes = ((ParameterizedType) genInterface).getActualTypeArguments(); combineTypes = ((ParameterizedType) genInterface).getActualTypeArguments();
} }
} }
Expand All @@ -120,24 +124,25 @@ private void checkCombinability() {
combineTypes != null && combineTypes.length == 2) { combineTypes != null && combineTypes.length == 2) {


if (reduceTypes[0].equals(combineTypes[0]) && reduceTypes[0].equals(combineTypes[1])) { if (reduceTypes[0].equals(combineTypes[0]) && reduceTypes[0].equals(combineTypes[1])) {
this.combinable = true; return true;
} else { } else {
LOG.warn("GroupCombineFunction cannot be used as combiner for GroupReduceFunction. " + LOG.warn("GroupCombineFunction cannot be used as combiner for GroupReduceFunction. " +
"Generic types are incompatible."); "Generic types are incompatible.");
this.combinable = false; return false;
} }
} }
else if (reduceTypes == null || reduceTypes.length != 2) { else if (reduceTypes == null || reduceTypes.length != 2) {
LOG.warn("Cannot check generic types of GroupReduceFunction. " + LOG.warn("Cannot check generic types of GroupReduceFunction. " +
"Enabling combiner but combine function might fail at runtime."); "Enabling combiner but combine function might fail at runtime.");
this.combinable = true; return true;
} }
else { else {
LOG.warn("Cannot check generic types of GroupCombineFunction. " + LOG.warn("Cannot check generic types of GroupCombineFunction. " +
"Enabling combiner but combine function might fail at runtime."); "Enabling combiner but combine function might fail at runtime.");
this.combinable = true; return true;
} }
} }
return false;
} }




Expand All @@ -156,13 +161,18 @@ public boolean isCombinable() {
} }


public GroupReduceOperator<IN, OUT> setCombinable(boolean combinable) { public GroupReduceOperator<IN, OUT> setCombinable(boolean combinable) {
// sanity check that the function is a subclass of the combine interface
if (combinable && !(function instanceof GroupCombineFunction)) { if(combinable) {
throw new IllegalArgumentException("The function does not implement the combine interface."); // sanity check that the function is a subclass of the combine interface
if (!checkCombinability()) {
throw new IllegalArgumentException("Either the function does not implement a combine interface, " +
"or the types of the combine() and reduce() methods are not compatible.");
}
this.combinable = true;
}
else {
this.combinable = false;
} }

this.combinable = combinable;

return this; return this;
} }


Expand Down Expand Up @@ -191,10 +201,16 @@ public SingleInputSemanticProperties getSemanticProperties() {
// -------------------------------------------------------------------------------------------- // --------------------------------------------------------------------------------------------


@Override @Override
@SuppressWarnings("unchecked")
protected GroupReduceOperatorBase<?, OUT, ?> translateToDataFlow(Operator<IN> input) { protected GroupReduceOperatorBase<?, OUT, ?> translateToDataFlow(Operator<IN> input) {


String name = getName() != null ? getName() : "GroupReduce at " + defaultName; String name = getName() != null ? getName() : "GroupReduce at " + defaultName;


// wrap CombineFunction in GroupCombineFunction if combinable
this.function = (combinable && function instanceof CombineFunction<?,?>) ?
new CombineToGroupCombineWrapper((CombineFunction<?,?>) function) :
function;

// distinguish between grouped reduce and non-grouped reduce // distinguish between grouped reduce and non-grouped reduce
if (grouper == null) { if (grouper == null) {
// non grouped reduce // non grouped reduce
Expand Down Expand Up @@ -236,7 +252,7 @@ selectorKeys, sortKeys, groupOrder, function, getResultType(), name, input, isCo
return po; return po;
} }
} }
else if (grouper.getKeys() instanceof Keys.ExpressionKeys) { else if (grouper.getKeys() instanceof ExpressionKeys) {


int[] logicalKeyPositions = grouper.getKeys().computeLogicalKeyPositions(); int[] logicalKeyPositions = grouper.getKeys().computeLogicalKeyPositions();
UnaryOperatorInformation<IN, OUT> operatorInfo = new UnaryOperatorInformation<>(getInputType(), getResultType()); UnaryOperatorInformation<IN, OUT> operatorInfo = new UnaryOperatorInformation<>(getInputType(), getResultType());
Expand Down
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.java.operators.translation;

import com.google.common.base.Preconditions;
import org.apache.flink.api.common.functions.CombineFunction;
import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.util.Collector;

/**
* A wrapper the wraps a function that implements both {@link CombineFunction} and {@link GroupReduceFunction} interfaces
* and makes it look like a function that implements {@link GroupCombineFunction} and {@link GroupReduceFunction} to the runtime.
*/
public class CombineToGroupCombineWrapper<IN, OUT, F extends CombineFunction<IN, IN> & GroupReduceFunction<IN, OUT>>
implements GroupCombineFunction<IN, IN>, GroupReduceFunction<IN, OUT> {

private final F wrappedFunction;

public CombineToGroupCombineWrapper(F wrappedFunction) {
this.wrappedFunction = Preconditions.checkNotNull(wrappedFunction);
}

@Override
public void combine(Iterable<IN> values, Collector<IN> out) throws Exception {
IN outValue = wrappedFunction.combine(values);
out.collect(outValue);
}

@Override
public void reduce(Iterable<IN> values, Collector<OUT> out) throws Exception {
wrappedFunction.reduce(values, out);
}
}

0 comments on commit aabb268

Please sign in to comment.