Skip to content

Commit

Permalink
Fix consideration of column types when choosing sort implementation.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshRosen committed Jul 6, 2015
1 parent 6b156fb commit d246e29
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,12 @@
import org.apache.spark.TaskContext;
import org.apache.spark.sql.AbstractScalaRowIterator;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.ObjectUnsafeColumnWriter;
import org.apache.spark.sql.catalyst.expressions.UnsafeColumnWriter;
import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
import org.apache.spark.sql.catalyst.expressions.UnsafeRowConverter;
import org.apache.spark.sql.catalyst.util.ObjectPool;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.unsafe.PlatformDependent;
import org.apache.spark.util.collection.unsafe.sort.PrefixComparator;
Expand Down Expand Up @@ -164,6 +167,18 @@ public Iterator<InternalRow> sort(Iterator<InternalRow> inputIterator) throws IO
return sort();
}

/**
* Return true if UnsafeExternalRowSorter can sort rows with the given schema, false otherwise.
*/
public static boolean supportsSchema(StructType schema) {
for (StructField field : schema.fields()) {
if (UnsafeColumnWriter.forType(field.dataType()) instanceof ObjectUnsafeColumnWriter) {
return false;
}
}
return true;
}

private static final class RowComparator extends RecordComparator {
private final Ordering<InternalRow> ordering;
private final int numFields;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,19 +103,6 @@ class UnsafeRowConverter(fieldTypes: Array[DataType]) {

}

object UnsafeRowConverter {
def supportsSchema(schema: StructType): Boolean = {
schema.forall { field =>
try {
UnsafeColumnWriter.forType(field.dataType)
true
} catch {
case e: UnsupportedOperationException => false
}
}
}
}

/**
* Function for writing a column into an UnsafeRow.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
* if necessary.
*/
def getSortOperator(sortExprs: Seq[SortOrder], global: Boolean, child: SparkPlan): SparkPlan = {
if (sqlContext.conf.unsafeEnabled && UnsafeRowConverter.supportsSchema(child.schema)) {
if (sqlContext.conf.unsafeEnabled && UnsafeExternalSort.supportsSchema(child.schema)) {
execution.UnsafeExternalSort(sortExprs, global, child)
} else if (sqlContext.conf.externalSortEnabled) {
execution.ExternalSort(sortExprs, global, child)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,16 @@ case class UnsafeExternalSort(
override def outputOrdering: Seq[SortOrder] = sortOrder
}

@DeveloperApi
object UnsafeExternalSort {
/**
* Return true if UnsafeExternalSort can sort rows with the given schema, false otherwise.
*/
def supportsSchema(schema: StructType): Boolean = {
UnsafeExternalRowSorter.supportsSchema(schema)
}
}


/**
* :: DeveloperApi ::
Expand Down

0 comments on commit d246e29

Please sign in to comment.