From 0807eec0cb1acf8052a77b6133387e25399fce08 Mon Sep 17 00:00:00 2001 From: zentol Date: Sun, 23 Aug 2015 15:36:47 +0200 Subject: [PATCH] [FLINK-2565] Support primitive arrays as keys This closes #1043 --- .../typeinfo/PrimitiveArrayTypeInfo.java | 47 +++++-- .../BooleanPrimitiveArrayComparator.java | 56 ++++++++ .../array/BytePrimitiveArrayComparator.java | 56 ++++++++ .../array/CharPrimitiveArrayComparator.java | 56 ++++++++ .../array/DoublePrimitiveArrayComparator.java | 57 +++++++++ .../array/FloatPrimitiveArrayComparator.java | 56 ++++++++ .../array/IntPrimitiveArrayComparator.java | 56 ++++++++ .../array/LongPrimitiveArrayComparator.java | 56 ++++++++ .../base/array/PrimitiveArrayComparator.java | 121 ++++++++++++++++++ .../array/ShortPrimitiveArrayComparator.java | 56 ++++++++ .../BooleanPrimitiveArrayComparatorTest.java | 45 +++++++ .../BytePrimitiveArrayComparatorTest.java | 44 +++++++ .../CharPrimitiveArrayComparatorTest.java | 42 ++++++ .../DoublePrimitiveArrayComparatorTest.java | 44 +++++++ .../FloatPrimitiveArrayComparatorTest.java | 44 +++++++ .../IntPrimitiveArrayComparatorTest.java | 44 +++++++ .../LongPrimitiveArrayComparatorTest.java | 44 +++++++ .../PrimitiveArrayComparatorTestBase.java | 41 ++++++ .../ShortPrimitiveArrayComparatorTest.java | 44 +++++++ .../flink/api/java/operator/GroupingTest.java | 14 +- .../javaApiOperators/GroupReduceITCase.java | 31 +++++ .../util/CollectionDataSets.java | 18 +++ 22 files changed, 1057 insertions(+), 15 deletions(-) create mode 100644 flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArrayComparator.java create mode 100644 flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArrayComparator.java create mode 100644 flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArrayComparator.java create mode 100644 flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArrayComparator.java create mode 100644 flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArrayComparator.java create mode 100644 flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArrayComparator.java create mode 100644 flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArrayComparator.java create mode 100644 flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/PrimitiveArrayComparator.java create mode 100644 flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArrayComparator.java create mode 100644 flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArrayComparatorTest.java create mode 100644 flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArrayComparatorTest.java create mode 100644 flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArrayComparatorTest.java create mode 100644 flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArrayComparatorTest.java create mode 100644 flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArrayComparatorTest.java create mode 100644 flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArrayComparatorTest.java create mode 100644 flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArrayComparatorTest.java create mode 100644 flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/PrimitiveArrayComparatorTestBase.java create mode 100644 flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArrayComparatorTest.java diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java index 83126abf3f2c5..3843f28a9753e 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java @@ -24,13 +24,22 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.InvalidTypesException; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.array.BooleanPrimitiveArrayComparator; import org.apache.flink.api.common.typeutils.base.array.BooleanPrimitiveArraySerializer; +import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArrayComparator; import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer; +import org.apache.flink.api.common.typeutils.base.array.CharPrimitiveArrayComparator; import org.apache.flink.api.common.typeutils.base.array.CharPrimitiveArraySerializer; +import org.apache.flink.api.common.typeutils.base.array.DoublePrimitiveArrayComparator; import org.apache.flink.api.common.typeutils.base.array.DoublePrimitiveArraySerializer; +import org.apache.flink.api.common.typeutils.base.array.FloatPrimitiveArrayComparator; import org.apache.flink.api.common.typeutils.base.array.FloatPrimitiveArraySerializer; +import org.apache.flink.api.common.typeutils.base.array.IntPrimitiveArrayComparator; import org.apache.flink.api.common.typeutils.base.array.IntPrimitiveArraySerializer; +import org.apache.flink.api.common.typeutils.base.array.LongPrimitiveArrayComparator; import org.apache.flink.api.common.typeutils.base.array.LongPrimitiveArraySerializer; +import org.apache.flink.api.common.typeutils.base.array.PrimitiveArrayComparator; +import org.apache.flink.api.common.typeutils.base.array.ShortPrimitiveArrayComparator; import org.apache.flink.api.common.typeutils.base.array.ShortPrimitiveArraySerializer; /** @@ -39,19 +48,18 @@ * * @param The type represented by this type information, e.g., int[], double[], long[] */ -public class PrimitiveArrayTypeInfo extends TypeInformation { - +public class PrimitiveArrayTypeInfo extends TypeInformation implements AtomicType { + private static final long serialVersionUID = 1L; - public static final PrimitiveArrayTypeInfo BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO = new PrimitiveArrayTypeInfo(boolean[].class, BooleanPrimitiveArraySerializer.INSTANCE); - public static final PrimitiveArrayTypeInfo BYTE_PRIMITIVE_ARRAY_TYPE_INFO = new PrimitiveArrayTypeInfo(byte[].class, BytePrimitiveArraySerializer.INSTANCE); - public static final PrimitiveArrayTypeInfo SHORT_PRIMITIVE_ARRAY_TYPE_INFO = new PrimitiveArrayTypeInfo(short[].class, ShortPrimitiveArraySerializer.INSTANCE); - public static final PrimitiveArrayTypeInfo INT_PRIMITIVE_ARRAY_TYPE_INFO = new PrimitiveArrayTypeInfo(int[].class, IntPrimitiveArraySerializer.INSTANCE); - public static final PrimitiveArrayTypeInfo LONG_PRIMITIVE_ARRAY_TYPE_INFO = new PrimitiveArrayTypeInfo(long[].class, LongPrimitiveArraySerializer.INSTANCE); - public static final PrimitiveArrayTypeInfo FLOAT_PRIMITIVE_ARRAY_TYPE_INFO = new PrimitiveArrayTypeInfo(float[].class, FloatPrimitiveArraySerializer.INSTANCE); - public static final PrimitiveArrayTypeInfo DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO = new PrimitiveArrayTypeInfo(double[].class, DoublePrimitiveArraySerializer.INSTANCE); - public static final PrimitiveArrayTypeInfo CHAR_PRIMITIVE_ARRAY_TYPE_INFO = new PrimitiveArrayTypeInfo(char[].class, CharPrimitiveArraySerializer.INSTANCE); - + public static final PrimitiveArrayTypeInfo BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO = new PrimitiveArrayTypeInfo(boolean[].class, BooleanPrimitiveArraySerializer.INSTANCE, BooleanPrimitiveArrayComparator.class); + public static final PrimitiveArrayTypeInfo BYTE_PRIMITIVE_ARRAY_TYPE_INFO = new PrimitiveArrayTypeInfo(byte[].class, BytePrimitiveArraySerializer.INSTANCE, BytePrimitiveArrayComparator.class); + public static final PrimitiveArrayTypeInfo SHORT_PRIMITIVE_ARRAY_TYPE_INFO = new PrimitiveArrayTypeInfo(short[].class, ShortPrimitiveArraySerializer.INSTANCE, ShortPrimitiveArrayComparator.class); + public static final PrimitiveArrayTypeInfo INT_PRIMITIVE_ARRAY_TYPE_INFO = new PrimitiveArrayTypeInfo(int[].class, IntPrimitiveArraySerializer.INSTANCE, IntPrimitiveArrayComparator.class); + public static final PrimitiveArrayTypeInfo LONG_PRIMITIVE_ARRAY_TYPE_INFO = new PrimitiveArrayTypeInfo(long[].class, LongPrimitiveArraySerializer.INSTANCE, LongPrimitiveArrayComparator.class); + public static final PrimitiveArrayTypeInfo FLOAT_PRIMITIVE_ARRAY_TYPE_INFO = new PrimitiveArrayTypeInfo(float[].class, FloatPrimitiveArraySerializer.INSTANCE, FloatPrimitiveArrayComparator.class); + public static final PrimitiveArrayTypeInfo DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO = new PrimitiveArrayTypeInfo(double[].class, DoublePrimitiveArraySerializer.INSTANCE, DoublePrimitiveArrayComparator.class); + public static final PrimitiveArrayTypeInfo CHAR_PRIMITIVE_ARRAY_TYPE_INFO = new PrimitiveArrayTypeInfo(char[].class, CharPrimitiveArraySerializer.INSTANCE, CharPrimitiveArrayComparator.class); // -------------------------------------------------------------------------------------------- /** The class of the array (such as int[].class) */ @@ -60,12 +68,15 @@ public class PrimitiveArrayTypeInfo extends TypeInformation { /** The serializer for the array */ private final TypeSerializer serializer; + /** The class of the comparator for the array */ + private Class> comparatorClass; + /** * Creates a new type info for a * @param arrayClass The class of the array (such as int[].class) * @param serializer The serializer for the array. */ - private PrimitiveArrayTypeInfo(Class arrayClass, TypeSerializer serializer) { + private PrimitiveArrayTypeInfo(Class arrayClass, TypeSerializer serializer, Class> comparatorClass) { if (arrayClass == null || serializer == null) { throw new NullPointerException(); } @@ -74,6 +85,7 @@ private PrimitiveArrayTypeInfo(Class arrayClass, TypeSerializer serializer } this.arrayClass = arrayClass; this.serializer = serializer; + this.comparatorClass = comparatorClass; } // -------------------------------------------------------------------------------------------- @@ -105,7 +117,7 @@ public Class getTypeClass() { @Override public boolean isKeyType() { - return false; + return true; } @Override @@ -161,4 +173,13 @@ public static PrimitiveArrayTypeInfo getInfoFor(Class type) { TYPES.put(double[].class, DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO); TYPES.put(char[].class, CHAR_PRIMITIVE_ARRAY_TYPE_INFO); } + + @Override + public PrimitiveArrayComparator createComparator(boolean sortOrderAscending, ExecutionConfig executionConfig) { + try { + return comparatorClass.getConstructor(boolean.class).newInstance(sortOrderAscending); + } catch (Exception e) { + throw new RuntimeException("Could not initialize primitive " + comparatorClass.getName() + " array comparator.", e); + } + } } diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArrayComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArrayComparator.java new file mode 100644 index 0000000000000..b7487b896a03e --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArrayComparator.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.common.typeutils.base.array; + +import static java.lang.Math.min; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.base.BooleanComparator; + +public class BooleanPrimitiveArrayComparator extends PrimitiveArrayComparator { + public BooleanPrimitiveArrayComparator(boolean ascending) { + super(ascending, new BooleanComparator(ascending)); + } + + @Override + public int hash(boolean[] record) { + int result = 0; + for (boolean field : record) { + result += field ? 1231 : 1237; + } + return result; + } + + @Override + public int compare(boolean[] first, boolean[] second) { + for (int x = 0; x < min(first.length, second.length); x++) { + int cmp = (second[x] == first[x] ? 0 : (first[x] ? 1 : -1)); + if (cmp != 0) { + return ascending ? cmp : -cmp; + } + } + int cmp = first.length - second.length; + return ascending ? cmp : -cmp; + } + + @Override + public TypeComparator duplicate() { + BooleanPrimitiveArrayComparator dupe = new BooleanPrimitiveArrayComparator(this.ascending); + dupe.setReference(this.reference); + return dupe; + } +} diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArrayComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArrayComparator.java new file mode 100644 index 0000000000000..d914c3e07883c --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArrayComparator.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.common.typeutils.base.array; + +import static java.lang.Math.min; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.base.ByteComparator; + +public class BytePrimitiveArrayComparator extends PrimitiveArrayComparator { + public BytePrimitiveArrayComparator(boolean ascending) { + super(ascending, new ByteComparator(ascending)); + } + + @Override + public int hash(byte[] record) { + int result = 0; + for (byte field : record) { + result += (int) field; + } + return result; + } + + @Override + public int compare(byte[] first, byte[] second) { + for (int x = 0; x < min(first.length, second.length); x++) { + int cmp = first[x] - second[x]; + if (cmp != 0) { + return ascending ? cmp : -cmp; + } + } + int cmp = first.length - second.length; + return ascending ? cmp : -cmp; + } + + @Override + public TypeComparator duplicate() { + BytePrimitiveArrayComparator dupe = new BytePrimitiveArrayComparator(this.ascending); + dupe.setReference(this.reference); + return dupe; + } +} diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArrayComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArrayComparator.java new file mode 100644 index 0000000000000..d734152bbf3a2 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArrayComparator.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.common.typeutils.base.array; + +import static java.lang.Math.min; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.base.CharComparator; + +public class CharPrimitiveArrayComparator extends PrimitiveArrayComparator { + public CharPrimitiveArrayComparator(boolean ascending) { + super(ascending, new CharComparator(ascending)); + } + + @Override + public int hash(char[] record) { + int result = 0; + for (char field : record) { + result += (int) field; + } + return result; + } + + @Override + public int compare(char[] first, char[] second) { + for (int x = 0; x < min(first.length, second.length); x++) { + int cmp = first[x] - second[x]; + if (cmp != 0) { + return ascending ? cmp : -cmp; + } + } + int cmp = first.length - second.length; + return ascending ? cmp : -cmp; + } + + @Override + public TypeComparator duplicate() { + CharPrimitiveArrayComparator dupe = new CharPrimitiveArrayComparator(this.ascending); + dupe.setReference(this.reference); + return dupe; + } +} diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArrayComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArrayComparator.java new file mode 100644 index 0000000000000..5153fa526f396 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArrayComparator.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.common.typeutils.base.array; + +import static java.lang.Math.min; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.base.DoubleComparator; + +public class DoublePrimitiveArrayComparator extends PrimitiveArrayComparator { + public DoublePrimitiveArrayComparator(boolean ascending) { + super(ascending, new DoubleComparator(ascending)); + } + + @Override + public int hash(double[] record) { + int result = 0; + for (double field : record) { + long bits = Double.doubleToLongBits(field); + result += (int) (bits ^ (bits >>> 32)); + } + return result; + } + + @Override + public int compare(double[] first, double[] second) { + for (int x = 0; x < min(first.length, second.length); x++) { + int cmp = Double.compare(first[x], second[x]); + if (cmp != 0) { + return ascending ? cmp : -cmp; + } + } + int cmp = first.length - second.length; + return ascending ? cmp : -cmp; + } + + @Override + public TypeComparator duplicate() { + DoublePrimitiveArrayComparator dupe = new DoublePrimitiveArrayComparator(this.ascending); + dupe.setReference(this.reference); + return dupe; + } +} diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArrayComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArrayComparator.java new file mode 100644 index 0000000000000..5a5986e18db59 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArrayComparator.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.common.typeutils.base.array; + +import static java.lang.Math.min; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.base.FloatComparator; + +public class FloatPrimitiveArrayComparator extends PrimitiveArrayComparator { + public FloatPrimitiveArrayComparator(boolean ascending) { + super(ascending, new FloatComparator(ascending)); + } + + @Override + public int hash(float[] record) { + int result = 0; + for (float field : record) { + result += Float.floatToIntBits(field); + } + return result; + } + + @Override + public int compare(float[] first, float[] second) { + for (int x = 0; x < min(first.length, second.length); x++) { + int cmp = Float.compare(first[x], second[x]); + if (cmp != 0) { + return ascending ? cmp : -cmp; + } + } + int cmp = first.length - second.length; + return ascending ? cmp : -cmp; + } + + @Override + public TypeComparator duplicate() { + FloatPrimitiveArrayComparator dupe = new FloatPrimitiveArrayComparator(this.ascending); + dupe.setReference(this.reference); + return dupe; + } +} diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArrayComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArrayComparator.java new file mode 100644 index 0000000000000..78cb2ae434a20 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArrayComparator.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.common.typeutils.base.array; + +import static java.lang.Math.min; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.base.IntComparator; + +public class IntPrimitiveArrayComparator extends PrimitiveArrayComparator { + public IntPrimitiveArrayComparator(boolean ascending) { + super(ascending, new IntComparator(ascending)); + } + + @Override + public int hash(int[] record) { + int result = 0; + for (int field : record) { + result += field; + } + return result; + } + + @Override + public int compare(int[] first, int[] second) { + for (int x = 0; x < min(first.length, second.length); x++) { + int cmp = first[x] - second[x]; + if (cmp != 0) { + return ascending ? cmp : -cmp; + } + } + int cmp = first.length - second.length; + return ascending ? cmp : -cmp; + } + + @Override + public TypeComparator duplicate() { + IntPrimitiveArrayComparator dupe = new IntPrimitiveArrayComparator(this.ascending); + dupe.setReference(this.reference); + return dupe; + } +} diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArrayComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArrayComparator.java new file mode 100644 index 0000000000000..c0a69bc056a32 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArrayComparator.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.common.typeutils.base.array; + +import static java.lang.Math.min; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.base.LongComparator; + +public class LongPrimitiveArrayComparator extends PrimitiveArrayComparator { + public LongPrimitiveArrayComparator(boolean ascending) { + super(ascending, new LongComparator(ascending)); + } + + @Override + public int hash(long[] record) { + int result = 0; + for (long field : record) { + result += (int) (field ^ (field >>> 32)); + } + return result; + } + + @Override + public int compare(long[] first, long[] second) { + for (int x = 0; x < min(first.length, second.length); x++) { + int cmp = first[x] < second[x] ? -1 : (first[x] == second[x] ? 0 : 1); + if (cmp != 0) { + return ascending ? cmp : -cmp; + } + } + int cmp = first.length - second.length; + return ascending ? cmp : -cmp; + } + + @Override + public TypeComparator duplicate() { + LongPrimitiveArrayComparator dupe = new LongPrimitiveArrayComparator(this.ascending); + dupe.setReference(this.reference); + return dupe; + } +} diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/PrimitiveArrayComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/PrimitiveArrayComparator.java new file mode 100644 index 0000000000000..ba53afff7e38a --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/PrimitiveArrayComparator.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.common.typeutils.base.array; + +import java.io.IOException; +import static java.lang.Math.min; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.base.BasicTypeComparator; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.MemorySegment; + +public abstract class PrimitiveArrayComparator extends TypeComparator { + // For use by getComparators + @SuppressWarnings("rawtypes") + private final TypeComparator[] comparators = new TypeComparator[]{this}; + + protected final boolean ascending; + protected transient T reference; + protected final C comparator; + + public PrimitiveArrayComparator(boolean ascending, C comparator) { + this.ascending = ascending; + this.comparator = comparator; + } + + @Override + public void setReference(T toCompare) { + this.reference = toCompare; + } + + @Override + public boolean equalToReference(T candidate) { + return compare(this.reference, candidate) == 0; + } + + @Override + public int compareToReference(TypeComparator referencedComparator) { + return compare(((PrimitiveArrayComparator) referencedComparator).reference, this.reference); + } + + @Override + public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException { + int firstCount = firstSource.readInt(); + int secondCount = secondSource.readInt(); + for (int x = 0; x < min(firstCount, secondCount); x++) { + int cmp = comparator.compareSerialized(firstSource, secondSource); + if (cmp != 0) { + return cmp; + } + } + int cmp = firstCount - secondCount; + return ascending ? cmp : -cmp; + } + + @Override + public int extractKeys(Object record, Object[] target, int index) { + target[index] = record; + return 1; + } + + @Override + public TypeComparator[] getFlatComparators() { + return comparators; + } + + @Override + public boolean supportsNormalizedKey() { + return false; + } + + @Override + public boolean supportsSerializationWithKeyNormalization() { + return false; + } + + @Override + public int getNormalizeKeyLen() { + return 0; + } + + @Override + public boolean isNormalizedKeyPrefixOnly(int keyBytes) { + throw new UnsupportedOperationException(); + } + + @Override + public void putNormalizedKey(T record, MemorySegment target, int offset, int numBytes) { + throw new UnsupportedOperationException(); + } + + @Override + public void writeWithKeyNormalization(T record, DataOutputView target) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public T readWithKeyDenormalization(T reuse, DataInputView source) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean invertNormalizedKey() { + return !ascending; + } +} diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArrayComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArrayComparator.java new file mode 100644 index 0000000000000..59436942e6ead --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArrayComparator.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.common.typeutils.base.array; + +import static java.lang.Math.min; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.base.ShortComparator; + +public class ShortPrimitiveArrayComparator extends PrimitiveArrayComparator { + public ShortPrimitiveArrayComparator(boolean ascending) { + super(ascending, new ShortComparator(ascending)); + } + + @Override + public int hash(short[] record) { + int result = 0; + for (short field : record) { + result += (int) field; + } + return result; + } + + @Override + public int compare(short[] first, short[] second) { + for (int x = 0; x < min(first.length, second.length); x++) { + int cmp = first[x] - second[x]; + if (cmp != 0) { + return ascending ? cmp : -cmp; + } + } + int cmp = first.length - second.length; + return ascending ? cmp : -cmp; + } + + @Override + public TypeComparator duplicate() { + ShortPrimitiveArrayComparator dupe = new ShortPrimitiveArrayComparator(this.ascending); + dupe.setReference(this.reference); + return dupe; + } +} diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArrayComparatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArrayComparatorTest.java new file mode 100644 index 0000000000000..4db71bfa62624 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArrayComparatorTest.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.common.typeutils.base.array; + +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.junit.Assert; + +public class BooleanPrimitiveArrayComparatorTest extends PrimitiveArrayComparatorTestBase { + public BooleanPrimitiveArrayComparatorTest() { + super(PrimitiveArrayTypeInfo.BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO); + } + + @Override + protected void deepEquals(String message, boolean[] should, boolean[] is) { + Assert.assertTrue(should.length == is.length); + for(int x=0; x< should.length; x++) { + Assert.assertEquals(should[x], is[x]); + } + } + + @Override + protected boolean[][] getSortedTestData() { + return new boolean[][]{ + new boolean[]{false, false}, + new boolean[]{false, true}, + new boolean[]{false, true, true}, + new boolean[]{true}, + }; + } +} diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArrayComparatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArrayComparatorTest.java new file mode 100644 index 0000000000000..4c57702f17ea0 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArrayComparatorTest.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.common.typeutils.base.array; + +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.junit.Assert; + +public class BytePrimitiveArrayComparatorTest extends PrimitiveArrayComparatorTestBase { + public BytePrimitiveArrayComparatorTest() { + super(PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO); + } + + @Override + protected void deepEquals(String message, byte[] should, byte[] is) { + Assert.assertArrayEquals(message, should, is); + } + + @Override + protected byte[][] getSortedTestData() { + return new byte[][]{ + new byte[]{-1, 0}, + new byte[]{0, -1}, + new byte[]{0, 0}, + new byte[]{0, 1}, + new byte[]{0, 1, 2}, + new byte[]{2} + }; + } +} diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArrayComparatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArrayComparatorTest.java new file mode 100644 index 0000000000000..b318168c5368a --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArrayComparatorTest.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.common.typeutils.base.array; + +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.junit.Assert; + +public class CharPrimitiveArrayComparatorTest extends PrimitiveArrayComparatorTestBase { + public CharPrimitiveArrayComparatorTest() { + super(PrimitiveArrayTypeInfo.CHAR_PRIMITIVE_ARRAY_TYPE_INFO); + } + + @Override + protected void deepEquals(String message, char[] should, char[] is) { + Assert.assertArrayEquals(message, should, is); + } + + @Override + protected char[][] getSortedTestData() { + return new char[][]{ + new char[]{0, 0}, + new char[]{0, 1}, + new char[]{0, 1, 2}, + new char[]{2} + }; + } +} diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArrayComparatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArrayComparatorTest.java new file mode 100644 index 0000000000000..b5d7e1db5c3a6 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArrayComparatorTest.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.common.typeutils.base.array; + +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.junit.Assert; + +public class DoublePrimitiveArrayComparatorTest extends PrimitiveArrayComparatorTestBase { + public DoublePrimitiveArrayComparatorTest() { + super(PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO); + } + + @Override + protected void deepEquals(String message, double[] should, double[] is) { + Assert.assertArrayEquals(message, should, is, 0.00001); + } + + @Override + protected double[][] getSortedTestData() { + return new double[][]{ + new double[]{-1, 0}, + new double[]{0, -1}, + new double[]{0, 0}, + new double[]{0, 1}, + new double[]{0, 1, 2}, + new double[]{2} + }; + } +} diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArrayComparatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArrayComparatorTest.java new file mode 100644 index 0000000000000..830049e24b7d2 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArrayComparatorTest.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.common.typeutils.base.array; + +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.junit.Assert; + +public class FloatPrimitiveArrayComparatorTest extends PrimitiveArrayComparatorTestBase { + public FloatPrimitiveArrayComparatorTest() { + super(PrimitiveArrayTypeInfo.FLOAT_PRIMITIVE_ARRAY_TYPE_INFO); + } + + @Override + protected void deepEquals(String message, float[] should, float[] is) { + Assert.assertArrayEquals(message, should, is, (float) 0.00001); + } + + @Override + protected float[][] getSortedTestData() { + return new float[][]{ + new float[]{-1, 0}, + new float[]{0, -1}, + new float[]{0, 0}, + new float[]{0, 1}, + new float[]{0, 1, 2}, + new float[]{2} + }; + } +} diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArrayComparatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArrayComparatorTest.java new file mode 100644 index 0000000000000..6c05f236a2467 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArrayComparatorTest.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.common.typeutils.base.array; + +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.junit.Assert; + +public class IntPrimitiveArrayComparatorTest extends PrimitiveArrayComparatorTestBase { + public IntPrimitiveArrayComparatorTest() { + super(PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO); + } + + @Override + protected void deepEquals(String message, int[] should, int[] is) { + Assert.assertArrayEquals(message, should, is); + } + + @Override + protected int[][] getSortedTestData() { + return new int[][]{ + new int[]{-1, 0}, + new int[]{0, -1}, + new int[]{0, 0}, + new int[]{0, 1}, + new int[]{0, 1, 2}, + new int[]{2} + }; + } +} diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArrayComparatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArrayComparatorTest.java new file mode 100644 index 0000000000000..0ae573eb0c277 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArrayComparatorTest.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.common.typeutils.base.array; + +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.junit.Assert; + +public class LongPrimitiveArrayComparatorTest extends PrimitiveArrayComparatorTestBase { + public LongPrimitiveArrayComparatorTest() { + super(PrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO); + } + + @Override + protected void deepEquals(String message, long[] should, long[] is) { + Assert.assertArrayEquals(message, should, is); + } + + @Override + protected long[][] getSortedTestData() { + return new long[][]{ + new long[]{-1, 0}, + new long[]{0, -1}, + new long[]{0, 0}, + new long[]{0, 1}, + new long[]{0, 1, 2}, + new long[]{2} + }; + } +} diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/PrimitiveArrayComparatorTestBase.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/PrimitiveArrayComparatorTestBase.java new file mode 100644 index 0000000000000..ff620dd6a0940 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/PrimitiveArrayComparatorTestBase.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.common.typeutils.base.array; + +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeutils.ComparatorTestBase; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; + +public abstract class PrimitiveArrayComparatorTestBase extends ComparatorTestBase { + private PrimitiveArrayTypeInfo info; + + public PrimitiveArrayComparatorTestBase(PrimitiveArrayTypeInfo info) { + this.info = info; + } + + @Override + protected TypeComparator createComparator(boolean ascending) { + return info.createComparator(ascending, null).duplicate(); + } + + @Override + protected TypeSerializer createSerializer() { + return info.createSerializer(null); + } +} diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArrayComparatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArrayComparatorTest.java new file mode 100644 index 0000000000000..5b48dc294f43a --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArrayComparatorTest.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.common.typeutils.base.array; + +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.junit.Assert; + +public class ShortPrimitiveArrayComparatorTest extends PrimitiveArrayComparatorTestBase { + public ShortPrimitiveArrayComparatorTest() { + super(PrimitiveArrayTypeInfo.SHORT_PRIMITIVE_ARRAY_TYPE_INFO); + } + + @Override + protected void deepEquals(String message, short[] should, short[] is) { + Assert.assertArrayEquals(message, should, is); + } + + @Override + protected short[][] getSortedTestData() { + return new short[][]{ + new short[]{-1, 0}, + new short[]{0, -1}, + new short[]{0, 0}, + new short[]{0, 1}, + new short[]{0, 1, 2}, + new short[]{2} + }; + } +} diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java index b3922b3974439..bdad3dbb62bde 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java @@ -67,8 +67,9 @@ public class GroupingTest { private final List> tupleWithCustomData = new ArrayList>(); - + private final List> byteArrayData = new ArrayList>(); + @Test public void testGroupByKeyFields1() { @@ -126,6 +127,15 @@ public void testGroupByKeyFields5() { tupleDs.groupBy(-1); } + @Test + public void testGroupByKeyFieldsOnPrimitiveArray() { + this.byteArrayData.add(new Tuple2(new byte[]{0}, new byte[]{1})); + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet> tupleDs = env.fromCollection(byteArrayData); + tupleDs.groupBy(0); + } + @Test public void testGroupByKeyExpressions1() { @@ -613,7 +623,7 @@ public String toString() { public static class CustomType2 implements Serializable { public int myInt; - public int[] myIntArray; + public Integer[] myIntArray; } } diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java index 260de1c384fe4..95a8cb012543b 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java @@ -61,6 +61,37 @@ public GroupReduceITCase(TestExecutionMode mode){ super(mode); } + @Test + public void testCorrectnessofGroupReduceOnTupleContainingPrimitiveByteArrayWithKeyFieldSelectors() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet> ds = CollectionDataSets.getTuple2WithByteArrayDataSet(env); + DataSet reduceDs = ds. + groupBy(0).reduceGroup(new ByteArrayGroupReduce()); + + List result = reduceDs.collect(); + + String expected = "0\n" + + "1\n" + + "2\n" + + "3\n" + + "4\n"; + + compareResultAsText(result, expected); + + } + + public static class ByteArrayGroupReduce implements GroupReduceFunction, Integer> { + @Override + public void reduce(Iterable> values, Collector out) throws Exception { + int sum = 0; + for (Tuple2 value : values) { + sum += value.f1; + } + out.collect(sum); + } + } + @Test public void testCorrectnessOfGroupReduceOnTuplesWithKeyFieldSelector() throws Exception{ /* diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java index 1faf4c12fec55..9fb275f4b086b 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java @@ -32,6 +32,7 @@ import java.util.Map; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple5; @@ -205,6 +206,23 @@ public static DataSet, String, Integer>> getGrou return env.fromCollection(data, type); } + + public static DataSet> getTuple2WithByteArrayDataSet(ExecutionEnvironment env) { + List> data = new ArrayList>(); + data.add(new Tuple2(new byte[]{0, 4}, 1)); + data.add(new Tuple2(new byte[]{2, 0}, 1)); + data.add(new Tuple2(new byte[]{2, 0, 4}, 4)); + data.add(new Tuple2(new byte[]{2, 1}, 3)); + data.add(new Tuple2(new byte[]{0}, 0)); + data.add(new Tuple2(new byte[]{2, 0}, 1)); + + TupleTypeInfo> type = new TupleTypeInfo>( + PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO + ); + + return env.fromCollection(data, type); + } public static DataSet getStringDataSet(ExecutionEnvironment env) {