From 7a57b6ef2ecdc7adaf770f8585cf8f974c684705 Mon Sep 17 00:00:00 2001 From: zentol Date: Sun, 23 Aug 2015 15:57:34 +0200 Subject: [PATCH] [FLINK-2556] Refactor/Fix pre-flight Key validation --- .../api/java/operators/DistinctOperator.java | 7 ------- .../org/apache/flink/api/java/operators/Keys.java | 15 +++++++++++++++ .../api/java/sca/UdfAnalyzerExamplesTest.java | 4 ++-- 3 files changed, 17 insertions(+), 9 deletions(-) diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java index a6eb43e5141b1..ad2335b211f87 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java @@ -18,7 +18,6 @@ package org.apache.flink.api.java.operators; -import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.operators.Operator; @@ -26,9 +25,7 @@ import org.apache.flink.api.common.operators.UnaryOperatorInformation; import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase; import org.apache.flink.api.common.operators.base.MapOperatorBase; -import org.apache.flink.api.common.typeinfo.AtomicType; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.CompositeType; import org.apache.flink.api.common.functions.RichGroupReduceFunction; import org.apache.flink.api.java.operators.translation.KeyExtractingMapper; import org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator; @@ -54,10 +51,6 @@ public DistinctOperator(DataSet input, Keys keys, String distinctLocationN this.distinctLocationName = distinctLocationName; - if (!(input.getType() instanceof CompositeType) && - !(input.getType() instanceof AtomicType && input.getType().isKeyType())){ - throw new InvalidProgramException("Distinct only possible on composite or atomic key types."); - } // if keys is null distinction is done on all fields if (keys == null) { keys = new Keys.ExpressionKeys(new String[] {Keys.ExpressionKeys.SELECT_ALL_CHAR }, input.getType()); diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java index 09874e5d5e10f..47c66f4ef7fd4 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java @@ -209,6 +209,9 @@ public ExpressionKeys(int[] groupingFields, TypeInformation type, boolean all throw new InvalidProgramException("Specifying keys via field positions is only valid " + "for tuple data types. Type: " + type); } + if (type.getArity() == 0) { + throw new InvalidProgramException("Tuple size must be greater than 0. Size: " + type.getArity()); + } if (!allowEmpty && (groupingFields == null || groupingFields.length == 0)) { throw new IllegalArgumentException("The grouping fields must not be empty."); @@ -240,6 +243,9 @@ public ExpressionKeys(int[] groupingFields, TypeInformation type, boolean all } else { // arrived at key position + if (!fieldType.isKeyType()) { + throw new InvalidProgramException("This type (" + fieldType + ") cannot be used as key."); + } if(fieldType instanceof CompositeType) { // add all nested fields of composite type ((CompositeType) fieldType).getFlatFields("*", offset, keyFields); @@ -296,6 +302,15 @@ public ExpressionKeys(String[] expressionsIn, TypeInformation type) { keyFields = new ArrayList(expressions.length); for (int i = 0; i < expressions.length; i++) { List keys = cType.getFlatFields(expressions[i]); // use separate list to do a size check + for (FlatFieldDescriptor key : keys) { + TypeInformation keyType = key.getType(); + if (!keyType.isKeyType()) { + throw new InvalidProgramException("This type (" + key.getType() + ") cannot be used as key."); + } + if (!(keyType instanceof AtomicType || keyType instanceof CompositeType)) { + throw new InvalidProgramException("Field type is neither CompositeType nor AtomicType: " + keyType); + } + } if(keys.size() == 0) { throw new InvalidProgramException("Unable to extract key from expression '"+expressions[i]+"' on key "+cType); } diff --git a/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerExamplesTest.java b/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerExamplesTest.java index a1d2b9779b195..5254b68d5f946 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerExamplesTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerExamplesTest.java @@ -529,8 +529,8 @@ public Gradient reduce(Gradient gradient1, Gradient gradient2) throws Exception @Test public void testLogisticRegressionExamplesSumGradient() { compareAnalyzerResultWithAnnotationsSingleInputWithKeys(ReduceFunction.class, SumGradient.class, - "Tuple1", - "Tuple1", + "Tuple1", + "Tuple1", new String[] { "0" }); }