Skip to content

Commit

Permalink
Some WIP work on prefix comparison.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshRosen committed Jul 6, 2015
1 parent 7f875f9 commit 6b156fb
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@

package org.apache.spark.util.collection.unsafe.sort;

import org.apache.spark.annotation.Private;

/**
* Compares 8-byte key prefixes in prefix sort. Subclasses may implement type-specific
* comparisons, such as lexicographic comparison for strings.
*/
@Private
public abstract class PrefixComparator {
public abstract int compare(long prefix1, long prefix2);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.util.collection.unsafe.sort;

import org.apache.spark.annotation.Private;

@Private
public class PrefixComparators {
private PrefixComparators() {}

public static final IntPrefixComparator INTEGER = new IntPrefixComparator();

static final class IntPrefixComparator extends PrefixComparator {
@Override
public int compare(long aPrefix, long bPrefix) {
int a = (int) aPrefix;
int b = (int) bPrefix;
return (a < b) ? -1 : (a > b) ? 1 : 0;
}

public long computePrefix(int value) {
return value & 0xffffffffL;
}
}

static final class LongPrefixComparator extends PrefixComparator {
@Override
public int compare(long a, long b) {
return (a < b) ? -1 : (a > b) ? 1 : 0;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.util.collection.unsafe.sort;

import org.junit.Test;
import static org.junit.Assert.*;

public class PrefixComparatorsSuite {

private static int genericComparison(Comparable a, Comparable b) {
return a.compareTo(b);
}

@Test
public void intPrefixComparator() {
int[] testData = new int[] { 0, Integer.MIN_VALUE, Integer.MAX_VALUE, 0, 1, 2, -1, -2, 1024};
for (int a : testData) {
for (int b : testData) {
long aPrefix = PrefixComparators.INTEGER.computePrefix(a);
long bPrefix = PrefixComparators.INTEGER.computePrefix(b);
assertEquals(
"Wrong prefix comparison results for a=" + a + " b=" + b,
genericComparison(a, b),
PrefixComparators.INTEGER.compare(aPrefix, bPrefix));

}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.sql.execution

import org.apache.spark.sql.types.StructType
import org.apache.spark.util.collection.unsafe.sort.PrefixComparator
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.{RDD, ShuffledRDD}
import org.apache.spark.shuffle.sort.SortShuffleManager
Expand All @@ -28,6 +27,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.util.collection.ExternalSorter
import org.apache.spark.util.collection.unsafe.sort.PrefixComparator
import org.apache.spark.util.{CompletionIterator, MutablePair}
import org.apache.spark.{HashPartitioner, SparkEnv}

Expand Down

0 comments on commit 6b156fb

Please sign in to comment.