Skip to content

Commit

Permalink
[Refactor] [DataSet] Refactor key selector translation in DataSet API.
Browse files Browse the repository at this point in the history
Clean up several compiler warnings.

This closes #1509
  • Loading branch information
fhueske committed Jan 19, 2016
1 parent 153a678 commit 544abb9
Show file tree
Hide file tree
Showing 9 changed files with 422 additions and 460 deletions.

Large diffs are not rendered by default.

Expand Up @@ -19,31 +19,28 @@
package org.apache.flink.api.java.operators; package org.apache.flink.api.java.operators;


import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.operators.Operator; import org.apache.flink.api.common.operators.Operator;
import org.apache.flink.api.common.operators.SingleInputSemanticProperties; import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
import org.apache.flink.api.common.operators.UnaryOperatorInformation; import org.apache.flink.api.common.operators.UnaryOperatorInformation;
import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase; import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
import org.apache.flink.api.common.operators.base.MapOperatorBase;
import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.functions.RichGroupReduceFunction; import org.apache.flink.api.common.functions.RichGroupReduceFunction;
import org.apache.flink.api.java.operators.translation.KeyExtractingMapper; import org.apache.flink.api.java.operators.Keys.SelectorFunctionKeys;
import org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator; import org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator;
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.util.Collector; import org.apache.flink.util.Collector;
import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.DataSet;


/** /**
* This operator represents the application of a "distinct" function on a data set, and the * This operator represents the application of a "distinct" function on a data set, and the
* result data set produced by the function. * result data set produced by the function.
* *
* @param <T> The type of the data set made distinct by the operator. * @param <T> The type of the data set made distinct by the operator.
*/ */
public class DistinctOperator<T> extends SingleInputOperator<T, T, DistinctOperator<T>> { public class DistinctOperator<T> extends SingleInputOperator<T, T, DistinctOperator<T>> {

private final Keys<T> keys; private final Keys<T> keys;

private final String distinctLocationName; private final String distinctLocationName;


public DistinctOperator(DataSet<T> input, Keys<T> keys, String distinctLocationName) { public DistinctOperator(DataSet<T> input, Keys<T> keys, String distinctLocationName) {
Expand All @@ -53,87 +50,79 @@ public DistinctOperator(DataSet<T> input, Keys<T> keys, String distinctLocationN


// if keys is null distinction is done on all fields // if keys is null distinction is done on all fields
if (keys == null) { if (keys == null) {
keys = new Keys.ExpressionKeys<T>(new String[] {Keys.ExpressionKeys.SELECT_ALL_CHAR }, input.getType()); keys = new Keys.ExpressionKeys<>(new String[] {Keys.ExpressionKeys.SELECT_ALL_CHAR }, input.getType());
} }


this.keys = keys; this.keys = keys;
} }


@Override @Override
protected org.apache.flink.api.common.operators.base.GroupReduceOperatorBase<?, T, ?> translateToDataFlow(Operator<T> input) { protected org.apache.flink.api.common.operators.base.GroupReduceOperatorBase<?, T, ?> translateToDataFlow(Operator<T> input) {

final RichGroupReduceFunction<T, T> function = new DistinctFunction<T>(); final RichGroupReduceFunction<T, T> function = new DistinctFunction<>();


String name = getName() != null ? getName() : "Distinct at " + distinctLocationName; String name = getName() != null ? getName() : "Distinct at " + distinctLocationName;

if (keys instanceof Keys.ExpressionKeys) { if (keys instanceof Keys.ExpressionKeys) {


int[] logicalKeyPositions = keys.computeLogicalKeyPositions(); int[] logicalKeyPositions = keys.computeLogicalKeyPositions();
UnaryOperatorInformation<T, T> operatorInfo = new UnaryOperatorInformation<T, T>(getInputType(), getResultType()); UnaryOperatorInformation<T, T> operatorInfo = new UnaryOperatorInformation<>(getInputType(), getResultType());
GroupReduceOperatorBase<T, T, GroupReduceFunction<T, T>> po = GroupReduceOperatorBase<T, T, GroupReduceFunction<T, T>> po =
new GroupReduceOperatorBase<T, T, GroupReduceFunction<T, T>>(function, operatorInfo, logicalKeyPositions, name); new GroupReduceOperatorBase<T, T, GroupReduceFunction<T, T>>(function, operatorInfo, logicalKeyPositions, name);


po.setCombinable(true); po.setCombinable(true);
po.setInput(input); po.setInput(input);
po.setParallelism(getParallelism()); po.setParallelism(getParallelism());

// make sure that distinct preserves the partitioning for the fields on which they operate // make sure that distinct preserves the partitioning for the fields on which they operate
if (getType().isTupleType()) { if (getType().isTupleType()) {
SingleInputSemanticProperties sProps = new SingleInputSemanticProperties(); SingleInputSemanticProperties sProps = new SingleInputSemanticProperties();

for (int field : keys.computeLogicalKeyPositions()) { for (int field : keys.computeLogicalKeyPositions()) {
sProps.addForwardedField(field, field); sProps.addForwardedField(field, field);
} }

po.setSemanticProperties(sProps); po.setSemanticProperties(sProps);
} }



return po; return po;
} }
else if (keys instanceof Keys.SelectorFunctionKeys) { else if (keys instanceof SelectorFunctionKeys) {

@SuppressWarnings("unchecked")
Keys.SelectorFunctionKeys<T, ?> selectorKeys = (Keys.SelectorFunctionKeys<T, ?>) keys;


@SuppressWarnings("unchecked")
SelectorFunctionKeys<T, ?> selectorKeys = (SelectorFunctionKeys<T, ?>) keys;


PlanUnwrappingReduceGroupOperator<T, T, ?> po = translateSelectorFunctionDistinct( PlanUnwrappingReduceGroupOperator<T, T, ?> po = translateSelectorFunctionDistinct(
selectorKeys, function, getInputType(), getResultType(), name, input); selectorKeys, function, getResultType(), name, input);

po.setParallelism(this.getParallelism()); po.setParallelism(this.getParallelism());

return po; return po;
} }
else { else {
throw new UnsupportedOperationException("Unrecognized key type."); throw new UnsupportedOperationException("Unrecognized key type.");
} }
} }

// -------------------------------------------------------------------------------------------- // --------------------------------------------------------------------------------------------

private static <IN, OUT, K> PlanUnwrappingReduceGroupOperator<IN, OUT, K> translateSelectorFunctionDistinct( private static <IN, OUT, K> PlanUnwrappingReduceGroupOperator<IN, OUT, K> translateSelectorFunctionDistinct(
Keys.SelectorFunctionKeys<IN, ?> rawKeys, RichGroupReduceFunction<IN, OUT> function, SelectorFunctionKeys<IN, ?> rawKeys,
TypeInformation<IN> inputType, TypeInformation<OUT> outputType, String name, Operator<IN> input) RichGroupReduceFunction<IN, OUT> function,
TypeInformation<OUT> outputType,
String name,
Operator<IN> input)
{ {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
final Keys.SelectorFunctionKeys<IN, K> keys = (Keys.SelectorFunctionKeys<IN, K>) rawKeys; final SelectorFunctionKeys<IN, K> keys = (SelectorFunctionKeys<IN, K>) rawKeys;


TypeInformation<Tuple2<K, IN>> typeInfoWithKey = new TupleTypeInfo<Tuple2<K, IN>>(keys.getKeyType(), inputType); TypeInformation<Tuple2<K, IN>> typeInfoWithKey = SelectorFunctionKeys.createTypeWithKey(keys);
Operator<Tuple2<K, IN>> keyedInput = SelectorFunctionKeys.appendKeyExtractor(input, keys);


KeyExtractingMapper<IN, K> extractor = new KeyExtractingMapper<IN, K>(keys.getKeyExtractor());


PlanUnwrappingReduceGroupOperator<IN, OUT, K> reducer = PlanUnwrappingReduceGroupOperator<IN, OUT, K> reducer =
new PlanUnwrappingReduceGroupOperator<IN, OUT, K>(function, keys, name, outputType, typeInfoWithKey, true); new PlanUnwrappingReduceGroupOperator<>(function, keys, name, outputType, typeInfoWithKey, true);

reducer.setInput(keyedInput);
MapOperatorBase<IN, Tuple2<K, IN>, MapFunction<IN, Tuple2<K, IN>>> mapper = new MapOperatorBase<IN, Tuple2<K, IN>, MapFunction<IN, Tuple2<K, IN>>>(extractor, new UnaryOperatorInformation<IN, Tuple2<K, IN>>(inputType, typeInfoWithKey), "Key Extractor");


reducer.setInput(mapper);
mapper.setInput(input);

// set the mapper's parallelism to the input parallelism to make sure it is chained
mapper.setParallelism(input.getParallelism());

return reducer; return reducer;
} }


Expand Down
Expand Up @@ -19,24 +19,20 @@
package org.apache.flink.api.java.operators; package org.apache.flink.api.java.operators;


import org.apache.flink.api.common.functions.GroupCombineFunction; import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.operators.Operator; import org.apache.flink.api.common.operators.Operator;
import org.apache.flink.api.common.operators.Order; import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.common.operators.Ordering; import org.apache.flink.api.common.operators.Ordering;
import org.apache.flink.api.common.operators.SingleInputSemanticProperties; import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
import org.apache.flink.api.common.operators.UnaryOperatorInformation; import org.apache.flink.api.common.operators.UnaryOperatorInformation;
import org.apache.flink.api.common.operators.base.GroupCombineOperatorBase; import org.apache.flink.api.common.operators.base.GroupCombineOperatorBase;
import org.apache.flink.api.common.operators.base.MapOperatorBase;
import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.functions.SemanticPropUtil; import org.apache.flink.api.java.functions.SemanticPropUtil;
import org.apache.flink.api.java.operators.translation.KeyExtractingMapper;
import org.apache.flink.api.java.operators.translation.PlanUnwrappingGroupCombineOperator; import org.apache.flink.api.java.operators.translation.PlanUnwrappingGroupCombineOperator;
import org.apache.flink.api.java.operators.translation.PlanUnwrappingSortedGroupCombineOperator; import org.apache.flink.api.java.operators.translation.PlanUnwrappingSortedGroupCombineOperator;
import org.apache.flink.api.java.operators.translation.TwoKeyExtractingMapper; import org.apache.flink.api.java.operators.Keys.SelectorFunctionKeys;
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;


/** /**
* This operator behaves like the GroupReduceOperator with Combine but only runs the Combine part which reduces all data * This operator behaves like the GroupReduceOperator with Combine but only runs the Combine part which reduces all data
Expand Down Expand Up @@ -96,9 +92,9 @@ public SingleInputSemanticProperties getSemanticProperties() {
// offset semantic information by extracted key fields // offset semantic information by extracted key fields
if(props != null && if(props != null &&
this.grouper != null && this.grouper != null &&
this.grouper.keys instanceof Keys.SelectorFunctionKeys) { this.grouper.keys instanceof SelectorFunctionKeys) {


int offset = ((Keys.SelectorFunctionKeys<?,?>) this.grouper.keys).getKeyType().getTotalFields(); int offset = ((SelectorFunctionKeys<?,?>) this.grouper.keys).getKeyType().getTotalFields();
if(this.grouper instanceof SortedGrouping) { if(this.grouper instanceof SortedGrouping) {
offset += ((SortedGrouping<?>) this.grouper).getSortSelectionFunctionKey().getKeyType().getTotalFields(); offset += ((SortedGrouping<?>) this.grouper).getSortSelectionFunctionKey().getKeyType().getTotalFields();
} }
Expand All @@ -121,43 +117,35 @@ public SingleInputSemanticProperties getSemanticProperties() {
// distinguish between grouped reduce and non-grouped reduce // distinguish between grouped reduce and non-grouped reduce
if (grouper == null) { if (grouper == null) {
// non grouped reduce // non grouped reduce
UnaryOperatorInformation<IN, OUT> operatorInfo = new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType()); UnaryOperatorInformation<IN, OUT> operatorInfo = new UnaryOperatorInformation<>(getInputType(), getResultType());
GroupCombineOperatorBase<IN, OUT, GroupCombineFunction<IN, OUT>> po = GroupCombineOperatorBase<IN, OUT, GroupCombineFunction<IN, OUT>> po =
new GroupCombineOperatorBase<IN, OUT, GroupCombineFunction<IN, OUT>>(function, operatorInfo, new int[0], name); new GroupCombineOperatorBase<>(function, operatorInfo, new int[0], name);


po.setInput(input); po.setInput(input);
// the parallelism for a non grouped reduce can only be 1 // the parallelism for a non grouped reduce can only be 1
po.setParallelism(1); po.setParallelism(1);
return po; return po;
} }


if (grouper.getKeys() instanceof Keys.SelectorFunctionKeys) { if (grouper.getKeys() instanceof SelectorFunctionKeys) {


@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
Keys.SelectorFunctionKeys<IN, ?> selectorKeys = (Keys.SelectorFunctionKeys<IN, ?>) grouper.getKeys(); SelectorFunctionKeys<IN, ?> selectorKeys = (SelectorFunctionKeys<IN, ?>) grouper.getKeys();


if (grouper instanceof SortedGrouping) { if (grouper instanceof SortedGrouping) {
SortedGrouping<IN> sortedGrouper = (SortedGrouping<IN>) grouper;
Keys.SelectorFunctionKeys<IN, ?> sortKeys = sortedGrouper.getSortSelectionFunctionKey();


PlanUnwrappingSortedGroupCombineOperator<IN, OUT, ?, ?> po = translateSelectorFunctionSortedReducer( SortedGrouping<IN> sortedGrouping = (SortedGrouping<IN>) grouper;
selectorKeys, sortKeys, function, getInputType(), getResultType(), name, input); SelectorFunctionKeys<IN, ?> sortKeys = sortedGrouping.getSortSelectionFunctionKey();
Ordering groupOrder = sortedGrouping.getGroupOrdering();


// set group order PlanUnwrappingSortedGroupCombineOperator<IN, OUT, ?, ?> po =
int[] sortKeyPositions = sortedGrouper.getGroupSortKeyPositions(); translateSelectorFunctionSortedReducer(selectorKeys, sortKeys, groupOrder, function, getResultType(), name, input);
Order[] sortOrders = sortedGrouper.getGroupSortOrders();

Ordering o = new Ordering();
for(int i=0; i < sortKeyPositions.length; i++) {
o.appendOrdering(sortKeyPositions[i], null, sortOrders[i]);
}
po.setGroupOrder(o);


po.setParallelism(this.getParallelism()); po.setParallelism(this.getParallelism());
return po; return po;
} else { } else {
PlanUnwrappingGroupCombineOperator<IN, OUT, ?> po = translateSelectorFunctionReducer( PlanUnwrappingGroupCombineOperator<IN, OUT, ?> po = translateSelectorFunctionReducer(
selectorKeys, function, getInputType(), getResultType(), name, input); selectorKeys, function, getResultType(), name, input);


po.setParallelism(this.getParallelism()); po.setParallelism(this.getParallelism());
return po; return po;
Expand All @@ -166,9 +154,9 @@ public SingleInputSemanticProperties getSemanticProperties() {
else if (grouper.getKeys() instanceof Keys.ExpressionKeys) { else if (grouper.getKeys() instanceof Keys.ExpressionKeys) {


int[] logicalKeyPositions = grouper.getKeys().computeLogicalKeyPositions(); int[] logicalKeyPositions = grouper.getKeys().computeLogicalKeyPositions();
UnaryOperatorInformation<IN, OUT> operatorInfo = new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType()); UnaryOperatorInformation<IN, OUT> operatorInfo = new UnaryOperatorInformation<>(getInputType(), getResultType());
GroupCombineOperatorBase<IN, OUT, GroupCombineFunction<IN, OUT>> po = GroupCombineOperatorBase<IN, OUT, GroupCombineFunction<IN, OUT>> po =
new GroupCombineOperatorBase<IN, OUT, GroupCombineFunction<IN, OUT>>(function, operatorInfo, logicalKeyPositions, name); new GroupCombineOperatorBase<>(function, operatorInfo, logicalKeyPositions, name);


po.setInput(input); po.setInput(input);
po.setParallelism(getParallelism()); po.setParallelism(getParallelism());
Expand Down Expand Up @@ -197,53 +185,46 @@ else if (grouper.getKeys() instanceof Keys.ExpressionKeys) {


// -------------------------------------------------------------------------------------------- // --------------------------------------------------------------------------------------------


@SuppressWarnings("unchecked")
private static <IN, OUT, K> PlanUnwrappingGroupCombineOperator<IN, OUT, K> translateSelectorFunctionReducer( private static <IN, OUT, K> PlanUnwrappingGroupCombineOperator<IN, OUT, K> translateSelectorFunctionReducer(
Keys.SelectorFunctionKeys<IN, ?> rawKeys, GroupCombineFunction<IN, OUT> function, SelectorFunctionKeys<IN, ?> rawKeys,
TypeInformation<IN> inputType, TypeInformation<OUT> outputType, String name, Operator<IN> input) GroupCombineFunction<IN, OUT> function,
TypeInformation<OUT> outputType,
String name,
Operator<IN> input)
{ {
@SuppressWarnings("unchecked") final SelectorFunctionKeys<IN, K> keys = (SelectorFunctionKeys<IN, K>) rawKeys;
final Keys.SelectorFunctionKeys<IN, K> keys = (Keys.SelectorFunctionKeys<IN, K>) rawKeys;


TypeInformation<Tuple2<K, IN>> typeInfoWithKey = new TupleTypeInfo<Tuple2<K, IN>>(keys.getKeyType(), inputType); TypeInformation<Tuple2<K, IN>> typeInfoWithKey = SelectorFunctionKeys.createTypeWithKey(keys);
Operator<Tuple2<K, IN>> keyedInput = SelectorFunctionKeys.appendKeyExtractor(input, keys);


KeyExtractingMapper<IN, K> extractor = new KeyExtractingMapper<IN, K>(keys.getKeyExtractor()); PlanUnwrappingGroupCombineOperator<IN, OUT, K> reducer =

new PlanUnwrappingGroupCombineOperator<>(function, keys, name, outputType, typeInfoWithKey);
PlanUnwrappingGroupCombineOperator<IN, OUT, K> reducer = new PlanUnwrappingGroupCombineOperator<IN, OUT, K>(function, keys, name, outputType, typeInfoWithKey); reducer.setInput(keyedInput);

MapOperatorBase<IN, Tuple2<K, IN>, MapFunction<IN, Tuple2<K, IN>>> mapper = new MapOperatorBase<IN, Tuple2<K, IN>, MapFunction<IN, Tuple2<K, IN>>>(extractor, new UnaryOperatorInformation<IN, Tuple2<K, IN>>(inputType, typeInfoWithKey), "Key Extractor");

reducer.setInput(mapper);
mapper.setInput(input);

// set the mapper's parallelism to the input parallelism to make sure it is chained
mapper.setParallelism(input.getParallelism());


return reducer; return reducer;
} }


@SuppressWarnings("unchecked")
private static <IN, OUT, K1, K2> PlanUnwrappingSortedGroupCombineOperator<IN, OUT, K1, K2> translateSelectorFunctionSortedReducer( private static <IN, OUT, K1, K2> PlanUnwrappingSortedGroupCombineOperator<IN, OUT, K1, K2> translateSelectorFunctionSortedReducer(
Keys.SelectorFunctionKeys<IN, ?> rawGroupingKey, Keys.SelectorFunctionKeys<IN, ?> rawSortingKey, GroupCombineFunction<IN, OUT> function, SelectorFunctionKeys<IN, ?> rawGroupingKey,
TypeInformation<IN> inputType, TypeInformation<OUT> outputType, String name, Operator<IN> input) SelectorFunctionKeys<IN, ?> rawSortingKeys,
Ordering groupOrder,
GroupCombineFunction<IN, OUT> function,
TypeInformation<OUT> outputType,
String name,
Operator<IN> input)
{ {
@SuppressWarnings("unchecked") final SelectorFunctionKeys<IN, K1> groupingKey = (SelectorFunctionKeys<IN, K1>) rawGroupingKey;
final Keys.SelectorFunctionKeys<IN, K1> groupingKey = (Keys.SelectorFunctionKeys<IN, K1>) rawGroupingKey; final SelectorFunctionKeys<IN, K2> sortingKey = (SelectorFunctionKeys<IN, K2>)rawSortingKeys;

TypeInformation<Tuple3<K1, K2, IN>> typeInfoWithKey = SelectorFunctionKeys.createTypeWithKey(groupingKey, sortingKey);
@SuppressWarnings("unchecked")
final Keys.SelectorFunctionKeys<IN, K2> sortingKey = (Keys.SelectorFunctionKeys<IN, K2>) rawSortingKey;

TypeInformation<Tuple3<K1, K2, IN>> typeInfoWithKey = new TupleTypeInfo<Tuple3<K1, K2, IN>>(groupingKey.getKeyType(), sortingKey.getKeyType(), inputType);

TwoKeyExtractingMapper<IN, K1, K2> extractor = new TwoKeyExtractingMapper<IN, K1, K2>(groupingKey.getKeyExtractor(), sortingKey.getKeyExtractor());

PlanUnwrappingSortedGroupCombineOperator<IN, OUT, K1, K2> reducer = new PlanUnwrappingSortedGroupCombineOperator<IN, OUT, K1, K2>(function, groupingKey, sortingKey, name, outputType, typeInfoWithKey);

MapOperatorBase<IN, Tuple3<K1, K2, IN>, MapFunction<IN, Tuple3<K1, K2, IN>>> mapper = new MapOperatorBase<IN, Tuple3<K1, K2, IN>, MapFunction<IN, Tuple3<K1, K2, IN>>>(extractor, new UnaryOperatorInformation<IN, Tuple3<K1, K2, IN>>(inputType, typeInfoWithKey), "Key Extractor");


reducer.setInput(mapper); Operator<Tuple3<K1, K2, IN>> inputWithKey = SelectorFunctionKeys.appendKeyExtractor(input, groupingKey, sortingKey);
mapper.setInput(input);


// set the mapper's parallelism to the input parallelism to make sure it is chained PlanUnwrappingSortedGroupCombineOperator<IN, OUT, K1, K2> reducer =
mapper.setParallelism(input.getParallelism()); new PlanUnwrappingSortedGroupCombineOperator<>(function, groupingKey, sortingKey, name, outputType, typeInfoWithKey);
reducer.setInput(inputWithKey);
reducer.setGroupOrder(groupOrder);


return reducer; return reducer;
} }
Expand Down

0 comments on commit 544abb9

Please sign in to comment.