diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/PrefixComparator.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/PrefixComparator.java index c41332ad117cb..45b78829e4cf7 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/PrefixComparator.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/PrefixComparator.java @@ -17,10 +17,13 @@ package org.apache.spark.util.collection.unsafe.sort; +import org.apache.spark.annotation.Private; + /** * Compares 8-byte key prefixes in prefix sort. Subclasses may implement type-specific * comparisons, such as lexicographic comparison for strings. */ +@Private public abstract class PrefixComparator { public abstract int compare(long prefix1, long prefix2); } diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/PrefixComparators.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/PrefixComparators.java new file mode 100644 index 0000000000000..475430fffe93d --- /dev/null +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/PrefixComparators.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.collection.unsafe.sort; + +import org.apache.spark.annotation.Private; + +@Private +public class PrefixComparators { + private PrefixComparators() {} + + public static final IntPrefixComparator INTEGER = new IntPrefixComparator(); + + static final class IntPrefixComparator extends PrefixComparator { + @Override + public int compare(long aPrefix, long bPrefix) { + int a = (int) aPrefix; + int b = (int) bPrefix; + return (a < b) ? -1 : (a > b) ? 1 : 0; + } + + public long computePrefix(int value) { + return value & 0xffffffffL; + } + } + + static final class LongPrefixComparator extends PrefixComparator { + @Override + public int compare(long a, long b) { + return (a < b) ? -1 : (a > b) ? 1 : 0; + } + } +} diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/PrefixComparatorsSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/PrefixComparatorsSuite.java new file mode 100644 index 0000000000000..4c4a7d5f5486a --- /dev/null +++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/PrefixComparatorsSuite.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.collection.unsafe.sort; + +import org.junit.Test; +import static org.junit.Assert.*; + +public class PrefixComparatorsSuite { + + private static int genericComparison(Comparable a, Comparable b) { + return a.compareTo(b); + } + + @Test + public void intPrefixComparator() { + int[] testData = new int[] { 0, Integer.MIN_VALUE, Integer.MAX_VALUE, 0, 1, 2, -1, -2, 1024}; + for (int a : testData) { + for (int b : testData) { + long aPrefix = PrefixComparators.INTEGER.computePrefix(a); + long bPrefix = PrefixComparators.INTEGER.computePrefix(b); + assertEquals( + "Wrong prefix comparison results for a=" + a + " b=" + b, + genericComparison(a, b), + PrefixComparators.INTEGER.compare(aPrefix, bPrefix)); + + } + } + } + +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index 8d26d661d2b2c..4d58983d219da 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.execution import org.apache.spark.sql.types.StructType -import org.apache.spark.util.collection.unsafe.sort.PrefixComparator import org.apache.spark.annotation.DeveloperApi import org.apache.spark.rdd.{RDD, ShuffledRDD} import org.apache.spark.shuffle.sort.SortShuffleManager @@ -28,6 +27,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.util.collection.ExternalSorter +import org.apache.spark.util.collection.unsafe.sort.PrefixComparator import org.apache.spark.util.{CompletionIterator, MutablePair} import org.apache.spark.{HashPartitioner, SparkEnv}