From e5cc6a4724ebfeac4dcef8faffe1cbe3c0dcacf0 Mon Sep 17 00:00:00 2001 From: gyfora Date: Wed, 16 Jul 2014 19:09:04 +0200 Subject: [PATCH 1/2] [FLINK-629] getFieldNotNull added to Tuple and updated Aggregators and Comparators to use that where appropriate --- .../flink/types/NullFieldException.java | 74 +++++++++++++++++++ .../api/java/operators/AggregateOperator.java | 11 ++- .../translation/TupleKeyExtractingMapper.java | 14 +++- .../apache/flink/api/java/tuple/Tuple.java | 19 +++++ .../typeutils/runtime/TupleComparator.java | 23 ++---- .../runtime/TupleLeadingFieldComparator.java | 10 +-- .../TupleLeadingFieldPairComparator.java | 6 +- .../runtime/TuplePairComparator.java | 6 +- .../flink/api/java/tuple/Tuple2Test.java | 16 +++- 9 files changed, 143 insertions(+), 36 deletions(-) create mode 100755 flink-core/src/main/java/org/apache/flink/types/NullFieldException.java diff --git a/flink-core/src/main/java/org/apache/flink/types/NullFieldException.java b/flink-core/src/main/java/org/apache/flink/types/NullFieldException.java new file mode 100755 index 0000000000000..085660d96b3ca --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/types/NullFieldException.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.types; + + +/** + * An exception specifying that a required field was not set in a record, i.e. was null. + */ +public class NullFieldException extends RuntimeException +{ + /** + * UID for serialization interoperability. + */ + private static final long serialVersionUID = -8820467525772321173L; + + private final int fieldNumber; + + /** + * Constructs an {@code NullFieldException} with {@code null} + * as its error detail message. + */ + public NullFieldException() { + super(); + this.fieldNumber = -1; + } + + /** + * Constructs an {@code NullFieldException} with the specified detail message. + * + * @param message The detail message. + */ + public NullFieldException(String message) { + super(message); + this.fieldNumber = -1; + } + + /** + * Constructs an {@code NullFieldException} with a default message, referring to + * given field number as the null field. + * + * @param fieldNumber The index of the field that was null, bit expected to hold a value. + */ + public NullFieldException(int fieldNumber) { + super("Field " + fieldNumber + " is null, but expected to hold a value."); + this.fieldNumber = fieldNumber; + } + + /** + * Gets the field number that was attempted to access. If the number is not set, this method returns + * {@code -1}. + * + * @return The field number that was attempted to access, or {@code -1}, if not set. + */ + public int getFieldNumber() { + return this.fieldNumber; + } +} diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java index ca6ed940950c4..17e73b530eedb 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java @@ -37,8 +37,9 @@ import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.configuration.Configuration; +import org.apache.flink.types.NullFieldException; +import org.apache.flink.types.NullKeyFieldException; import org.apache.flink.util.Collector; - import org.apache.flink.api.java.DataSet; /** @@ -282,8 +283,12 @@ public void reduce(Iterator values, Collector out) { current = values.next(); for (int i = 0; i < fieldPositions.length; i++) { - Object val = current.getField(fieldPositions[i]); - aggFunctions[i].aggregate(val); + try { + Object val = current.getFieldNotNull(fieldPositions[i]); + aggFunctions[i].aggregate(val); + } catch (NullKeyFieldException e) { + throw new NullFieldException(fieldPositions[i]); + } } } diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleKeyExtractingMapper.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleKeyExtractingMapper.java index a915d1cecc544..1e76c69ffc40a 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleKeyExtractingMapper.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleKeyExtractingMapper.java @@ -21,6 +21,8 @@ import org.apache.flink.api.java.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.types.NullFieldException; +import org.apache.flink.types.NullKeyFieldException; public final class TupleKeyExtractingMapper extends MapFunction> { @@ -42,10 +44,14 @@ public Tuple2 map(T value) throws Exception { Tuple v = (Tuple) value; - K key = v.getField(pos); - tuple.f0 = key; - tuple.f1 = value; - + try { + K key = v.getFieldNotNull(pos); + tuple.f0 = key; + tuple.f1 = value; + } catch (NullFieldException nfex) { + throw new NullKeyFieldException(nfex); + } + return tuple; } } diff --git a/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple.java b/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple.java index 4738b0278199d..29668300749b6 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple.java @@ -18,6 +18,8 @@ package org.apache.flink.api.java.tuple; +import org.apache.flink.types.NullKeyFieldException; + /** * The base class of all tuples. Tuples have a fix length and contain a set of fields, * which may all be of different types. Because Tuples are strongly typed, each distinct @@ -46,6 +48,23 @@ public abstract class Tuple implements java.io.Serializable { */ public abstract T getField(int pos); + /** + * Gets the field at the specified position, throws NullKeyFieldException if the field is null. Used for comparing key fields. + * + * @param pos The position of the field, zero indexed. + * @returnThe field at the specified position. + * @throws IndexOutOfBoundsException Thrown, if the position is negative, or equal to, or larger than the number of fields. + * @throws NullKeyFieldException Thrown, if the field at pos is null. + */ + public T getFieldNotNull(int pos){ + T field = getField(pos); + if (field != null) { + return field; + } else { + throw new NullKeyFieldException(pos); + } + } + /** * Sets the field at the specified position. * diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java index 8e0abcb64447d..b7fc15a885a78 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java @@ -156,17 +156,14 @@ protected TypeComparator[] getComparators() { public int hash(T value) { int i = 0; try { - int code = this.comparators[0].hash(value.getField(keyPositions[0])); + int code = this.comparators[0].hash(value.getFieldNotNull(keyPositions[0])); for (i = 1; i < this.keyPositions.length; i++) { code *= HASH_SALT[i & 0x1F]; // salt code with (i % HASH_SALT.length)-th salt component - code += this.comparators[i].hash(value.getField(keyPositions[i])); + code += this.comparators[i].hash(value.getFieldNotNull(keyPositions[i])); } return code; } - catch (NullPointerException npex) { - throw new NullKeyFieldException(keyPositions[i]); - } catch (IndexOutOfBoundsException iobex) { throw new KeyFieldOutOfBoundsException(keyPositions[i]); } @@ -177,12 +174,9 @@ public void setReference(T toCompare) { int i = 0; try { for (; i < this.keyPositions.length; i++) { - this.comparators[i].setReference(toCompare.getField(this.keyPositions[i])); + this.comparators[i].setReference(toCompare.getFieldNotNull(this.keyPositions[i])); } } - catch (NullPointerException npex) { - throw new NullKeyFieldException(keyPositions[i]); - } catch (IndexOutOfBoundsException iobex) { throw new KeyFieldOutOfBoundsException(keyPositions[i]); } @@ -193,15 +187,12 @@ public boolean equalToReference(T candidate) { int i = 0; try { for (; i < this.keyPositions.length; i++) { - if (!this.comparators[i].equalToReference(candidate.getField(this.keyPositions[i]))) { + if (!this.comparators[i].equalToReference(candidate.getFieldNotNull(this.keyPositions[i]))) { return false; } } return true; } - catch (NullPointerException npex) { - throw new NullKeyFieldException(keyPositions[i]); - } catch (IndexOutOfBoundsException iobex) { throw new KeyFieldOutOfBoundsException(keyPositions[i]); } @@ -235,15 +226,13 @@ public int compare(T first, T second) { try { for (; i < keyPositions.length; i++) { int keyPos = keyPositions[i]; - int cmp = comparators[i].compare(first.getField(keyPos), second.getField(keyPos)); + int cmp = comparators[i].compare(first.getFieldNotNull(keyPos), second.getFieldNotNull(keyPos)); if (cmp != 0) { return cmp; } } return 0; - } catch (NullPointerException npex) { - throw new NullKeyFieldException(keyPositions[i]); } catch (IndexOutOfBoundsException iobex) { throw new KeyFieldOutOfBoundsException(keyPositions[i]); } @@ -303,7 +292,7 @@ public void putNormalizedKey(T value, MemorySegment target, int offset, int numB { int len = this.normalizedKeyLengths[i]; len = numBytes >= len ? len : numBytes; - this.comparators[i].putNormalizedKey(value.getField(this.keyPositions[i]), target, offset, len); + this.comparators[i].putNormalizedKey(value.getFieldNotNull(this.keyPositions[i]), target, offset, len); numBytes -= len; offset += len; } diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleLeadingFieldComparator.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleLeadingFieldComparator.java index 43906f86b0b27..7a9aa43ddefe8 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleLeadingFieldComparator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleLeadingFieldComparator.java @@ -46,18 +46,18 @@ public TypeComparator getComparator() { @Override public int hash(T value) { - return comparator.hash(value.getField(0)); + return comparator.hash(value.getFieldNotNull(0)); } @Override public void setReference(T toCompare) { - this.comparator.setReference(toCompare.getField(0)); + this.comparator.setReference(toCompare.getFieldNotNull(0)); } @Override public boolean equalToReference(T candidate) { - return this.comparator.equalToReference(candidate.getField(0)); + return this.comparator.equalToReference(candidate.getFieldNotNull(0)); } @SuppressWarnings("unchecked") @@ -68,7 +68,7 @@ public int compareToReference(TypeComparator referencedComparator) { @Override public int compare(T first, T second) { - return this.comparator.compare(first.getField(0), second.getField(0)); + return this.comparator.compare(first.getFieldNotNull(0), second.getFieldNotNull(0)); } @Override @@ -98,7 +98,7 @@ public boolean isNormalizedKeyPrefixOnly(int keyBytes) { @Override public void putNormalizedKey(T record, MemorySegment target, int offset, int numBytes) { - this.comparator.putNormalizedKey(record.getField(0), target, offset, numBytes); + this.comparator.putNormalizedKey(record.getFieldNotNull(0), target, offset, numBytes); } @Override diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleLeadingFieldPairComparator.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleLeadingFieldPairComparator.java index cb7eef5a6acac..749c38d458878 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleLeadingFieldPairComparator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleLeadingFieldPairComparator.java @@ -39,17 +39,17 @@ public TupleLeadingFieldPairComparator(TypeComparator comparator1, TypeCompar @Override public void setReference(T1 reference) { - this.comparator1.setReference(reference.getField(0)); + this.comparator1.setReference(reference.getFieldNotNull(0)); } @Override public boolean equalToReference(T2 candidate) { - return this.comparator1.equalToReference(candidate.getField(0)); + return this.comparator1.equalToReference(candidate.getFieldNotNull(0)); } @Override public int compareToReference(T2 candidate) { - this.comparator2.setReference(candidate.getField(0)); + this.comparator2.setReference(candidate.getFieldNotNull(0)); return this.comparator1.compareToReference(this.comparator2); } } diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TuplePairComparator.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TuplePairComparator.java index 796799b0a621f..43f46e44fb2aa 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TuplePairComparator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TuplePairComparator.java @@ -59,14 +59,14 @@ public TuplePairComparator(int[] keyFields1, int[] keyFields2, TypeComparator tuple = new Tuple2(new String("Test case"), null); + + Assert.assertEquals("Test case", tuple.getFieldNotNull(0)); + + try { + tuple.getFieldNotNull(1); + Assert.fail(); + } catch (NullKeyFieldException e) { + // right + } + } } From 337dd967528d6cf65d317561d4aa0cd38edb70b7 Mon Sep 17 00:00:00 2001 From: mbalassi Date: Thu, 17 Jul 2014 09:49:53 +0200 Subject: [PATCH 2/2] [FLINK-629] Updated getFieldNotNull usage and added it to TupleSerializers --- .../flink/types/NullKeyFieldException.java | 11 ++++++ .../api/java/operators/AggregateOperator.java | 8 +--- .../apache/flink/api/java/tuple/Tuple.java | 9 +++-- .../typeutils/runtime/TupleComparator.java | 20 +++++++++- .../runtime/TupleLeadingFieldComparator.java | 38 ++++++++++++++++--- .../TupleLeadingFieldPairComparator.java | 23 +++++++++-- .../runtime/TuplePairComparator.java | 32 ++++++++++++---- .../typeutils/runtime/TupleSerializer.java | 5 +++ .../flink/api/java/tuple/Tuple2Test.java | 4 +- 9 files changed, 118 insertions(+), 32 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/types/NullKeyFieldException.java b/flink-core/src/main/java/org/apache/flink/types/NullKeyFieldException.java index 8f6d5c661755e..83edd1770e593 100644 --- a/flink-core/src/main/java/org/apache/flink/types/NullKeyFieldException.java +++ b/flink-core/src/main/java/org/apache/flink/types/NullKeyFieldException.java @@ -41,6 +41,17 @@ public NullKeyFieldException() { this.fieldNumber = -1; } + /** + * Constructs an {@code NullKeyFieldException} with a default message, referring to + * the field number given in the {@code NullFieldException}. + * + * @param nfex The base exception. + */ + public NullKeyFieldException(NullFieldException nfex) { + super(); + this.fieldNumber = nfex.getFieldNumber(); + } + /** * Constructs an {@code NullKeyFieldException} with the specified detail message. * diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java index 17e73b530eedb..bb830e277b8d4 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java @@ -29,6 +29,7 @@ import org.apache.flink.api.common.operators.SingleInputSemanticProperties; import org.apache.flink.api.common.operators.UnaryOperatorInformation; import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase; +import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.aggregation.AggregationFunction; import org.apache.flink.api.java.aggregation.AggregationFunctionFactory; import org.apache.flink.api.java.aggregation.Aggregations; @@ -37,10 +38,7 @@ import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.configuration.Configuration; -import org.apache.flink.types.NullFieldException; -import org.apache.flink.types.NullKeyFieldException; import org.apache.flink.util.Collector; -import org.apache.flink.api.java.DataSet; /** * This operator represents the application of a "aggregate" operation on a data set, and the @@ -283,12 +281,8 @@ public void reduce(Iterator values, Collector out) { current = values.next(); for (int i = 0; i < fieldPositions.length; i++) { - try { Object val = current.getFieldNotNull(fieldPositions[i]); aggFunctions[i].aggregate(val); - } catch (NullKeyFieldException e) { - throw new NullFieldException(fieldPositions[i]); - } } } diff --git a/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple.java b/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple.java index 29668300749b6..bc913f1198331 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple.java @@ -18,7 +18,8 @@ package org.apache.flink.api.java.tuple; -import org.apache.flink.types.NullKeyFieldException; +import org.apache.flink.types.NullFieldException; + /** * The base class of all tuples. Tuples have a fix length and contain a set of fields, @@ -49,19 +50,19 @@ public abstract class Tuple implements java.io.Serializable { public abstract T getField(int pos); /** - * Gets the field at the specified position, throws NullKeyFieldException if the field is null. Used for comparing key fields. + * Gets the field at the specified position, throws NullFieldException if the field is null. Used for comparing key fields. * * @param pos The position of the field, zero indexed. * @returnThe field at the specified position. * @throws IndexOutOfBoundsException Thrown, if the position is negative, or equal to, or larger than the number of fields. - * @throws NullKeyFieldException Thrown, if the field at pos is null. + * @throws NullFieldException Thrown, if the field at pos is null. */ public T getFieldNotNull(int pos){ T field = getField(pos); if (field != null) { return field; } else { - throw new NullKeyFieldException(pos); + throw new NullFieldException(pos); } } diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java index b7fc15a885a78..fc4b0224eebc0 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java @@ -28,6 +28,7 @@ import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.types.KeyFieldOutOfBoundsException; +import org.apache.flink.types.NullFieldException; import org.apache.flink.types.NullKeyFieldException; @@ -164,6 +165,9 @@ public int hash(T value) { } return code; } + catch (NullFieldException nfex) { + throw new NullKeyFieldException(nfex); + } catch (IndexOutOfBoundsException iobex) { throw new KeyFieldOutOfBoundsException(keyPositions[i]); } @@ -177,6 +181,9 @@ public void setReference(T toCompare) { this.comparators[i].setReference(toCompare.getFieldNotNull(this.keyPositions[i])); } } + catch (NullFieldException nfex) { + throw new NullKeyFieldException(nfex); + } catch (IndexOutOfBoundsException iobex) { throw new KeyFieldOutOfBoundsException(keyPositions[i]); } @@ -193,6 +200,9 @@ public boolean equalToReference(T candidate) { } return true; } + catch (NullFieldException nfex) { + throw new NullKeyFieldException(nfex); + } catch (IndexOutOfBoundsException iobex) { throw new KeyFieldOutOfBoundsException(keyPositions[i]); } @@ -231,9 +241,12 @@ public int compare(T first, T second) { return cmp; } } - return 0; - } catch (IndexOutOfBoundsException iobex) { + } + catch (NullFieldException nfex) { + throw new NullKeyFieldException(nfex); + } + catch (IndexOutOfBoundsException iobex) { throw new KeyFieldOutOfBoundsException(keyPositions[i]); } } @@ -297,6 +310,9 @@ public void putNormalizedKey(T value, MemorySegment target, int offset, int numB offset += len; } } + catch (NullFieldException nfex) { + throw new NullKeyFieldException(nfex); + } catch (NullPointerException npex) { throw new NullKeyFieldException(this.keyPositions[i]); } diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleLeadingFieldComparator.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleLeadingFieldComparator.java index 7a9aa43ddefe8..d63fccb7ca762 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleLeadingFieldComparator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleLeadingFieldComparator.java @@ -25,6 +25,8 @@ import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.types.NullFieldException; +import org.apache.flink.types.NullKeyFieldException; public final class TupleLeadingFieldComparator extends TypeComparator @@ -46,18 +48,32 @@ public TypeComparator getComparator() { @Override public int hash(T value) { - return comparator.hash(value.getFieldNotNull(0)); - + try { + return comparator.hash(value. getFieldNotNull(0)); + } catch (NullFieldException nfex) { + throw new NullKeyFieldException(nfex); + } + } @Override public void setReference(T toCompare) { - this.comparator.setReference(toCompare.getFieldNotNull(0)); + try { + this.comparator.setReference(toCompare. getFieldNotNull(0)); + } catch (NullFieldException nfex) { + throw new NullKeyFieldException(nfex); + } } + @Override public boolean equalToReference(T candidate) { - return this.comparator.equalToReference(candidate.getFieldNotNull(0)); + try { + return this.comparator.equalToReference(candidate + . getFieldNotNull(0)); + } catch (NullFieldException nfex) { + throw new NullKeyFieldException(nfex); + } } @SuppressWarnings("unchecked") @@ -68,7 +84,12 @@ public int compareToReference(TypeComparator referencedComparator) { @Override public int compare(T first, T second) { - return this.comparator.compare(first.getFieldNotNull(0), second.getFieldNotNull(0)); + try { + return this.comparator.compare(first. getFieldNotNull(0), + second. getFieldNotNull(0)); + } catch (NullFieldException nfex) { + throw new NullKeyFieldException(nfex); + } } @Override @@ -98,7 +119,12 @@ public boolean isNormalizedKeyPrefixOnly(int keyBytes) { @Override public void putNormalizedKey(T record, MemorySegment target, int offset, int numBytes) { - this.comparator.putNormalizedKey(record.getFieldNotNull(0), target, offset, numBytes); + try { + this.comparator.putNormalizedKey(record. getFieldNotNull(0), + target, offset, numBytes); + } catch (NullFieldException nfex) { + throw new NullKeyFieldException(nfex); + } } @Override diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleLeadingFieldPairComparator.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleLeadingFieldPairComparator.java index 749c38d458878..3611f7023002e 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleLeadingFieldPairComparator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleLeadingFieldPairComparator.java @@ -23,6 +23,8 @@ import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypePairComparator; import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.types.NullFieldException; +import org.apache.flink.types.NullKeyFieldException; public class TupleLeadingFieldPairComparator extends TypePairComparator implements Serializable { @@ -39,17 +41,30 @@ public TupleLeadingFieldPairComparator(TypeComparator comparator1, TypeCompar @Override public void setReference(T1 reference) { - this.comparator1.setReference(reference.getFieldNotNull(0)); + try { + this.comparator1.setReference(reference. getFieldNotNull(0)); + } catch (NullFieldException nfex) { + throw new NullKeyFieldException(nfex); + } } @Override public boolean equalToReference(T2 candidate) { - return this.comparator1.equalToReference(candidate.getFieldNotNull(0)); + try { + return this.comparator1.equalToReference(candidate + . getFieldNotNull(0)); + } catch (NullFieldException nfex) { + throw new NullKeyFieldException(nfex); + } } @Override public int compareToReference(T2 candidate) { - this.comparator2.setReference(candidate.getFieldNotNull(0)); - return this.comparator1.compareToReference(this.comparator2); + try { + this.comparator2.setReference(candidate. getFieldNotNull(0)); + return this.comparator1.compareToReference(this.comparator2); + } catch (NullFieldException nfex) { + throw new NullKeyFieldException(nfex); + } } } diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TuplePairComparator.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TuplePairComparator.java index 43f46e44fb2aa..4c4094d7dfe20 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TuplePairComparator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TuplePairComparator.java @@ -23,6 +23,8 @@ import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypePairComparator; import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.types.NullFieldException; +import org.apache.flink.types.NullKeyFieldException; public class TuplePairComparator extends TypePairComparator implements Serializable { @@ -59,15 +61,25 @@ public TuplePairComparator(int[] keyFields1, int[] keyFields2, TypeComparator extends TypeSerializer { @@ -105,7 +106,11 @@ public int getLength() { public void serialize(T value, DataOutputView target) throws IOException { for (int i = 0; i < arity; i++) { Object o = value.getField(i); + try { fieldSerializers[i].serialize(o, target); + } catch (NullPointerException npex) { + throw new NullFieldException(i); + } } } diff --git a/flink-java/src/test/java/org/apache/flink/api/java/tuple/Tuple2Test.java b/flink-java/src/test/java/org/apache/flink/api/java/tuple/Tuple2Test.java index 7a28ed21c92a3..6f0f9d371dce9 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/tuple/Tuple2Test.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/tuple/Tuple2Test.java @@ -21,7 +21,7 @@ import junit.framework.Assert; -import org.apache.flink.types.NullKeyFieldException; +import org.apache.flink.types.NullFieldException; import org.junit.Test; public class Tuple2Test { @@ -45,7 +45,7 @@ public void testGetFieldNotNull() { try { tuple.getFieldNotNull(1); Assert.fail(); - } catch (NullKeyFieldException e) { + } catch (NullFieldException e) { // right } }