Skip to content

Commit

Permalink
[FLINK-2556] [core] Refactor/fix pre-flight key validation
Browse files Browse the repository at this point in the history
This closes #1044
  • Loading branch information
supermegaciaccount authored and StephanEwen committed Aug 27, 2015
1 parent bc63ef2 commit 1e38d6f
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,14 @@

package org.apache.flink.api.java.operators;

import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.operators.Operator;
import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
import org.apache.flink.api.common.operators.UnaryOperatorInformation;
import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
import org.apache.flink.api.common.operators.base.MapOperatorBase;
import org.apache.flink.api.common.typeinfo.AtomicType;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.CompositeType;
import org.apache.flink.api.common.functions.RichGroupReduceFunction;
import org.apache.flink.api.java.operators.translation.KeyExtractingMapper;
import org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator;
Expand All @@ -54,10 +51,6 @@ public DistinctOperator(DataSet<T> input, Keys<T> keys, String distinctLocationN

this.distinctLocationName = distinctLocationName;

if (!(input.getType() instanceof CompositeType) &&
!(input.getType() instanceof AtomicType && input.getType().isKeyType())){
throw new InvalidProgramException("Distinct only possible on composite or atomic key types.");
}
// if keys is null distinction is done on all fields
if (keys == null) {
keys = new Keys.ExpressionKeys<T>(new String[] {Keys.ExpressionKeys.SELECT_ALL_CHAR }, input.getType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,9 @@ public ExpressionKeys(int[] groupingFields, TypeInformation<T> type, boolean all
throw new InvalidProgramException("Specifying keys via field positions is only valid " +
"for tuple data types. Type: " + type);
}
if (type.getArity() == 0) {
throw new InvalidProgramException("Tuple size must be greater than 0. Size: " + type.getArity());
}

if (!allowEmpty && (groupingFields == null || groupingFields.length == 0)) {
throw new IllegalArgumentException("The grouping fields must not be empty.");
Expand Down Expand Up @@ -240,6 +243,9 @@ public ExpressionKeys(int[] groupingFields, TypeInformation<T> type, boolean all
}
else {
// arrived at key position
if (!fieldType.isKeyType()) {
throw new InvalidProgramException("This type (" + fieldType + ") cannot be used as key.");
}
if(fieldType instanceof CompositeType) {
// add all nested fields of composite type
((CompositeType) fieldType).getFlatFields("*", offset, keyFields);
Expand Down Expand Up @@ -296,6 +302,15 @@ public ExpressionKeys(String[] expressionsIn, TypeInformation<T> type) {
keyFields = new ArrayList<FlatFieldDescriptor>(expressions.length);
for (int i = 0; i < expressions.length; i++) {
List<FlatFieldDescriptor> keys = cType.getFlatFields(expressions[i]); // use separate list to do a size check
for (FlatFieldDescriptor key : keys) {
TypeInformation<?> keyType = key.getType();
if (!keyType.isKeyType()) {
throw new InvalidProgramException("This type (" + key.getType() + ") cannot be used as key.");
}
if (!(keyType instanceof AtomicType || keyType instanceof CompositeType)) {
throw new InvalidProgramException("Field type is neither CompositeType nor AtomicType: " + keyType);
}
}
if(keys.size() == 0) {
throw new InvalidProgramException("Unable to extract key from expression '"+expressions[i]+"' on key "+cType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -529,8 +529,8 @@ public Gradient reduce(Gradient gradient1, Gradient gradient2) throws Exception
@Test
public void testLogisticRegressionExamplesSumGradient() {
compareAnalyzerResultWithAnnotationsSingleInputWithKeys(ReduceFunction.class, SumGradient.class,
"Tuple1<double[]>",
"Tuple1<double[]>",
"Tuple1<double>",
"Tuple1<double>",
new String[] { "0" });
}

Expand Down

0 comments on commit 1e38d6f

Please sign in to comment.