Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-7078] [SPARK-7079] Binary processing sort for Spark SQL #6444

Closed
wants to merge 67 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
67 commits
Select commit Hold shift + click to select a range
d5d3106
WIP towards external sorter for Spark SQL.
JoshRosen May 15, 2015
2bd8c9a
Import my original tests and get them to pass.
JoshRosen May 15, 2015
58f36d0
Merge in a sketch of a unit test for the new sorter (now failing).
JoshRosen May 26, 2015
dda6752
Commit some missing code from an old git stash.
JoshRosen May 27, 2015
c8792de
Remove some debug logging
JoshRosen May 27, 2015
9cc98f5
Move more code to Java; fix bugs in UnsafeRowConverter length type.
JoshRosen May 27, 2015
73cc761
Fix whitespace
JoshRosen Jun 5, 2015
dfdb93f
SparkFunSuite change
JoshRosen Jun 5, 2015
b420a71
Move most of the existing SMJ code into Java.
JoshRosen Jun 11, 2015
1b841ca
WIP towards copying
JoshRosen Jun 11, 2015
269cf86
Back out SMJ operator change; isolate changes to selection of sort op.
JoshRosen Jun 13, 2015
d468a88
Update for InternalRow refactoring
JoshRosen Jun 14, 2015
7eafecf
Port test to SparkPlanTest
JoshRosen Jun 18, 2015
21d7d93
Back out of BlockObjectWriter change
JoshRosen Jun 18, 2015
62f0bb8
Update to reflect SparkPlanTest changes
JoshRosen Jun 18, 2015
26c8931
Back out some Hive changes that aren't needed anymore
JoshRosen Jun 18, 2015
206bfa2
Add some missing newlines at the ends of files
JoshRosen Jun 18, 2015
ebf9eea
Harmonization with shuffle's unsafe sorter
JoshRosen Jun 18, 2015
1db845a
Many more changes to harmonize with shuffle sorter
JoshRosen Jun 18, 2015
82bb0ec
Fix IntelliJ complaint due to negated if condition
JoshRosen Jun 18, 2015
9869ec2
Clean up Exchange code a bit
JoshRosen Jun 18, 2015
6d6a1e6
Centralize logic for picking sort operator implementations
JoshRosen Jun 18, 2015
90c2b6a
Update test name
JoshRosen Jun 27, 2015
41b8881
Get UnsafeInMemorySorterSuite to pass (WIP)
JoshRosen Jun 28, 2015
7f875f9
Commit failing test demonstrating bug in handling objects in spills
JoshRosen Jun 30, 2015
6b156fb
Some WIP work on prefix comparison.
JoshRosen Jun 30, 2015
d246e29
Fix consideration of column types when choosing sort implementation.
JoshRosen Jul 1, 2015
6890863
Fix memory leak on empty inputs.
JoshRosen Jul 1, 2015
4c37ba6
Add tests for sorting on all primitive types.
JoshRosen Jul 2, 2015
95058d9
Add missing SortPrefixUtils file
JoshRosen Jul 2, 2015
b310c88
Integrate prefix comparators for Int and Long (others coming soon)
JoshRosen Jul 2, 2015
66a813e
Prefix comparators for float and double
JoshRosen Jul 2, 2015
0dfe919
Implement prefix sort for strings (albeit inefficiently).
JoshRosen Jul 2, 2015
939f824
Remove code gen experiment.
JoshRosen Jul 2, 2015
5822e6f
Fix test compilation issue
JoshRosen Jul 3, 2015
9969c14
Merge remote-tracking branch 'origin/master' into sql-external-sort
JoshRosen Jul 6, 2015
7c3c864
Undo part of a SparkPlanTest change in #7162 that broke my test.
JoshRosen Jul 6, 2015
0a79d39
Revert "Undo part of a SparkPlanTest change in #7162 that broke my te…
JoshRosen Jul 7, 2015
f27be09
Fix tests by binding attributes.
JoshRosen Jul 7, 2015
88b72db
Test ascending and descending sort orders.
JoshRosen Jul 7, 2015
82e21c1
Force spilling in UnsafeExternalSortSuite.
JoshRosen Jul 7, 2015
8d7fbe7
Fixes to multiple spilling-related bugs.
JoshRosen Jul 7, 2015
87b6ed9
Fix critical issues in test which led to false negatives.
JoshRosen Jul 7, 2015
5d6109d
Fix inconsistent handling / encoding of record lengths.
JoshRosen Jul 7, 2015
b81a920
Temporarily enable only the passing sort tests
JoshRosen Jul 7, 2015
1c7bad8
Make sorting of answers explicit in SparkPlanTest.checkAnswer().
JoshRosen Jul 7, 2015
b86e684
Set global = true in UnsafeExternalSortSuite.
JoshRosen Jul 7, 2015
08701e7
Fix prefix comparison of null primitives.
JoshRosen Jul 7, 2015
1d7ffaa
Somewhat hacky fix for descending sorts
JoshRosen Jul 7, 2015
613e16f
Test with larger data.
JoshRosen Jul 7, 2015
88aff18
NULL_PREFIX has to be negative infinity for floating point types
JoshRosen Jul 7, 2015
9d00afc
Clean up prefix comparators for integral types
JoshRosen Jul 7, 2015
f99a612
Fix bugs in string prefix comparison.
JoshRosen Jul 7, 2015
293f109
Add missing license header.
JoshRosen Jul 7, 2015
844f4ca
Merge remote-tracking branch 'origin/master' into sql-external-sort
JoshRosen Jul 8, 2015
d31f180
Re-enable NullType sorting test now that SPARK-8868 is fixed
JoshRosen Jul 8, 2015
c56ec18
Clean up final row copying code.
JoshRosen Jul 8, 2015
845bea3
Remove unnecessary zeroing of row conversion buffer
JoshRosen Jul 8, 2015
d13ac55
Hacky approach to copying of UnsafeRows for sort followed by limit.
JoshRosen Jul 8, 2015
3947fc1
Merge remote-tracking branch 'origin/master' into sql-external-sort
JoshRosen Jul 9, 2015
cd05866
Fix scalastyle
JoshRosen Jul 9, 2015
d1e28bc
Merge remote-tracking branch 'origin/master' into sql-external-sort
JoshRosen Jul 10, 2015
2f48777
Add test and fix bug for sorting empty arrays
JoshRosen Jul 10, 2015
5135200
Fix spill reading for large rows; add test
JoshRosen Jul 10, 2015
35dad9f
Make sortAnswers = false the default in SparkPlanTest
JoshRosen Jul 10, 2015
2bbac9c
Merge remote-tracking branch 'origin/master' into sql-external-sort
JoshRosen Jul 10, 2015
6beb467
Remove a bunch of overloaded methods to avoid default args. issue
JoshRosen Jul 10, 2015
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -343,28 +343,28 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalacheck</groupId>
<artifactId>scalacheck_${scala.binary.version}</artifactId>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-library</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-core</artifactId>
<groupId>org.scalacheck</groupId>
<artifactId>scalacheck_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-library</artifactId>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.shuffle.unsafe;
package org.apache.spark.serializer;

import java.io.IOException;
import java.io.InputStream;
Expand All @@ -24,9 +24,7 @@

import scala.reflect.ClassTag;

import org.apache.spark.serializer.DeserializationStream;
import org.apache.spark.serializer.SerializationStream;
import org.apache.spark.serializer.SerializerInstance;
import org.apache.spark.annotation.Private;
import org.apache.spark.unsafe.PlatformDependent;

/**
Expand All @@ -35,7 +33,8 @@
* `write() OutputStream methods), but DiskBlockObjectWriter still calls some methods on it. To work
* around this, we pass a dummy no-op serializer.
*/
final class DummySerializerInstance extends SerializerInstance {
@Private
public final class DummySerializerInstance extends SerializerInstance {

public static final DummySerializerInstance INSTANCE = new DummySerializerInstance();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.executor.ShuffleWriteMetrics;
import org.apache.spark.serializer.DummySerializerInstance;
import org.apache.spark.serializer.SerializerInstance;
import org.apache.spark.shuffle.ShuffleMemoryManager;
import org.apache.spark.storage.*;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.util.collection.unsafe.sort;

import org.apache.spark.annotation.Private;

/**
* Compares 8-byte key prefixes in prefix sort. Subclasses may implement type-specific
* comparisons, such as lexicographic comparison for strings.
*/
@Private
public abstract class PrefixComparator {
public abstract int compare(long prefix1, long prefix2);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.util.collection.unsafe.sort;

import com.google.common.base.Charsets;
import com.google.common.primitives.Longs;
import com.google.common.primitives.UnsignedBytes;

import org.apache.spark.annotation.Private;
import org.apache.spark.unsafe.types.UTF8String;

@Private
public class PrefixComparators {
private PrefixComparators() {}

public static final StringPrefixComparator STRING = new StringPrefixComparator();
public static final IntegralPrefixComparator INTEGRAL = new IntegralPrefixComparator();
public static final FloatPrefixComparator FLOAT = new FloatPrefixComparator();
public static final DoublePrefixComparator DOUBLE = new DoublePrefixComparator();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have comparator for Byte/Short/Boolean/BinaryType?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it will be trivial to add. I'll do it now.


public static final class StringPrefixComparator extends PrefixComparator {
@Override
public int compare(long aPrefix, long bPrefix) {
// TODO: can done more efficiently
byte[] a = Longs.toByteArray(aPrefix);
byte[] b = Longs.toByteArray(bPrefix);
for (int i = 0; i < 8; i++) {
int c = UnsignedBytes.compare(a[i], b[i]);
if (c != 0) return c;
}
return 0;
}

public long computePrefix(byte[] bytes) {
if (bytes == null) {
return 0L;
} else {
byte[] padded = new byte[8];
System.arraycopy(bytes, 0, padded, 0, Math.min(bytes.length, 8));
return Longs.fromByteArray(padded);
}
}

public long computePrefix(String value) {
return value == null ? 0L : computePrefix(value.getBytes(Charsets.UTF_8));
}

public long computePrefix(UTF8String value) {
return value == null ? 0L : computePrefix(value.getBytes());
}
}

/**
* Prefix comparator for all integral types (boolean, byte, short, int, long).
*/
public static final class IntegralPrefixComparator extends PrefixComparator {
@Override
public int compare(long a, long b) {
return (a < b) ? -1 : (a > b) ? 1 : 0;
}

public final long NULL_PREFIX = Long.MIN_VALUE;
}

public static final class FloatPrefixComparator extends PrefixComparator {
@Override
public int compare(long aPrefix, long bPrefix) {
float a = Float.intBitsToFloat((int) aPrefix);
float b = Float.intBitsToFloat((int) bPrefix);
return (a < b) ? -1 : (a > b) ? 1 : 0;
}

public long computePrefix(float value) {
return Float.floatToIntBits(value) & 0xffffffffL;
}

public final long NULL_PREFIX = computePrefix(Float.NEGATIVE_INFINITY);
}

public static final class DoublePrefixComparator extends PrefixComparator {
@Override
public int compare(long aPrefix, long bPrefix) {
double a = Double.longBitsToDouble(aPrefix);
double b = Double.longBitsToDouble(bPrefix);
return (a < b) ? -1 : (a > b) ? 1 : 0;
}

public long computePrefix(double value) {
return Double.doubleToLongBits(value);
}

public final long NULL_PREFIX = computePrefix(Double.NEGATIVE_INFINITY);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.util.collection.unsafe.sort;

/**
* Compares records for ordering. In cases where the entire sorting key can fit in the 8-byte
* prefix, this may simply return 0.
*/
public abstract class RecordComparator {

/**
* Compare two records for order.
*
* @return a negative integer, zero, or a positive integer as the first record is less than,
* equal to, or greater than the second.
*/
public abstract int compare(
Object leftBaseObject,
long leftBaseOffset,
Object rightBaseObject,
long rightBaseOffset);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.util.collection.unsafe.sort;

final class RecordPointerAndKeyPrefix {
/**
* A pointer to a record; see {@link org.apache.spark.unsafe.memory.TaskMemoryManager} for a
* description of how these addresses are encoded.
*/
public long recordPointer;

/**
* A key prefix, for use in comparisons.
*/
public long keyPrefix;
}
Loading