Skip to content

Commit

Permalink
[SPARK-9546][SQL] Centralize orderable data type checking.
Browse files Browse the repository at this point in the history
This pull request creates two isOrderable functions in RowOrdering that can be used to check whether a data type or a sequence of expressions can be used in sorting.

Author: Reynold Xin <rxin@databricks.com>

Closes #7880 from rxin/SPARK-9546 and squashes the following commits:

f9e322d [Reynold Xin] Fixed tests.
0439b43 [Reynold Xin] [SPARK-9546][SQL] Centralize orderable data type checking.
  • Loading branch information
rxin committed Aug 3, 2015
1 parent 536d2ad commit 30e8911
Show file tree
Hide file tree
Showing 15 changed files with 173 additions and 144 deletions.
Expand Up @@ -130,11 +130,9 @@ trait CheckAnalysis {

case Sort(orders, _, _) =>
orders.foreach { order =>
order.dataType match {
case t: AtomicType => // OK
case NullType => // OK
case t =>
failAnalysis(s"Sorting is not supported for columns of type ${t.simpleString}")
if (!RowOrdering.isOrderable(order.dataType)) {
failAnalysis(
s"sorting is not supported for columns of type ${order.dataType.simpleString}")
}
}

Expand Down
Expand Up @@ -44,8 +44,8 @@ trait ExpectsInputTypes extends Expression {
override def checkInputDataTypes(): TypeCheckResult = {
val mismatches = children.zip(inputTypes).zipWithIndex.collect {
case ((child, expected), idx) if !expected.acceptsType(child.dataType) =>
s"argument ${idx + 1} is expected to be of type ${expected.simpleString}, " +
s"however, '${child.prettyString}' is of type ${child.dataType.simpleString}."
s"argument ${idx + 1} requires ${expected.simpleString} type, " +
s"however, '${child.prettyString}' is of ${child.dataType.simpleString} type."
}

if (mismatches.isEmpty) {
Expand Down
Expand Up @@ -420,7 +420,7 @@ abstract class BinaryOperator extends BinaryExpression with ExpectsInputTypes {
TypeCheckResult.TypeCheckFailure(s"differing types in '$prettyString' " +
s"(${left.dataType.simpleString} and ${right.dataType.simpleString}).")
} else if (!inputType.acceptsType(left.dataType)) {
TypeCheckResult.TypeCheckFailure(s"'$prettyString' accepts ${inputType.simpleString} type," +
TypeCheckResult.TypeCheckFailure(s"'$prettyString' requires ${inputType.simpleString} type," +
s" not ${left.dataType.simpleString}")
} else {
TypeCheckResult.TypeCheckSuccess
Expand Down
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.expressions

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types._


/**
* An interpreted row ordering comparator.
*/
class RowOrdering(ordering: Seq[SortOrder]) extends Ordering[InternalRow] {

def this(ordering: Seq[SortOrder], inputSchema: Seq[Attribute]) =
this(ordering.map(BindReferences.bindReference(_, inputSchema)))

def compare(a: InternalRow, b: InternalRow): Int = {
var i = 0
while (i < ordering.size) {
val order = ordering(i)
val left = order.child.eval(a)
val right = order.child.eval(b)

if (left == null && right == null) {
// Both null, continue looking.
} else if (left == null) {
return if (order.direction == Ascending) -1 else 1
} else if (right == null) {
return if (order.direction == Ascending) 1 else -1
} else {
val comparison = order.dataType match {
case dt: AtomicType if order.direction == Ascending =>
dt.ordering.asInstanceOf[Ordering[Any]].compare(left, right)
case dt: AtomicType if order.direction == Descending =>
dt.ordering.asInstanceOf[Ordering[Any]].reverse.compare(left, right)
case s: StructType if order.direction == Ascending =>
s.ordering.asInstanceOf[Ordering[Any]].compare(left, right)
case s: StructType if order.direction == Descending =>
s.ordering.asInstanceOf[Ordering[Any]].reverse.compare(left, right)
case other =>
throw new IllegalArgumentException(s"Type $other does not support ordered operations")
}
if (comparison != 0) {
return comparison
}
}
i += 1
}
return 0
}
}

object RowOrdering {

/**
* Returns true iff the data type can be ordered (i.e. can be sorted).
*/
def isOrderable(dataType: DataType): Boolean = dataType match {
case NullType => true
case dt: AtomicType => true
case struct: StructType => struct.fields.forall(f => isOrderable(f.dataType))
case _ => false
}

/**
* Returns true iff outputs from the expressions can be ordered.
*/
def isOrderable(exprs: Seq[Expression]): Boolean = exprs.forall(e => isOrderable(e.dataType))

/**
* Creates a [[RowOrdering]] for the given schema, in natural ascending order.
*/
def forSchema(dataTypes: Seq[DataType]): RowOrdering = {
new RowOrdering(dataTypes.zipWithIndex.map {
case (dt, index) => new SortOrder(BoundReference(index, dt, nullable = true), Ascending)
})
}
}
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.catalyst.expressions

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.expressions.codegen.{GeneratedExpressionCode, CodeGenContext}
import org.apache.spark.sql.types._
import org.apache.spark.util.collection.unsafe.sort.PrefixComparators.DoublePrefixComparator
Expand All @@ -36,6 +37,14 @@ case class SortOrder(child: Expression, direction: SortDirection)
/** Sort order is not foldable because we don't have an eval for it. */
override def foldable: Boolean = false

override def checkInputDataTypes(): TypeCheckResult = {
if (RowOrdering.isOrderable(dataType)) {
TypeCheckResult.TypeCheckSuccess
} else {
TypeCheckResult.TypeCheckFailure(s"cannot sort data type ${dataType.simpleString}")
}
}

override def dataType: DataType = child.dataType
override def nullable: Boolean = child.nullable

Expand Down
Expand Up @@ -220,7 +220,11 @@ class CodeGenContext {
}

/**
* Generates code for compare expression in Java.
* Generates code for comparing two expressions.
*
* @param dataType data type of the expressions
* @param c1 name of the variable of expression 1's output
* @param c2 name of the variable of expression 2's output
*/
def genComp(dataType: DataType, c1: String, c2: String): String = dataType match {
// java boolean doesn't support > or < operator
Expand All @@ -231,7 +235,7 @@ class CodeGenContext {
case dt: DataType if isPrimitiveType(dt) => s"($c1 > $c2 ? 1 : $c1 < $c2 ? -1 : 0)"
case BinaryType => s"org.apache.spark.sql.catalyst.util.TypeUtils.compareBinary($c1, $c2)"
case NullType => "0"
case schema: StructType if schema.supportOrdering(schema) =>
case schema: StructType =>
val comparisons = GenerateOrdering.genComparisons(this, schema)
val compareFunc = freshName("compareStruct")
val funcCode: String =
Expand All @@ -245,8 +249,8 @@ class CodeGenContext {
addNewFunction(compareFunc, funcCode)
s"this.$compareFunc($c1, $c2)"
case other if other.isInstanceOf[AtomicType] => s"$c1.compare($c2)"
case _ => throw new IllegalArgumentException(
"cannot generate compare code for un-comparable type")
case _ =>
throw new IllegalArgumentException("cannot generate compare code for un-comparable type")
}

/**
Expand Down
Expand Up @@ -18,15 +18,13 @@
package org.apache.spark.sql.catalyst.expressions.codegen

import org.apache.spark.Logging
import org.apache.spark.annotation.Private
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types.StructType

/**
* Inherits some default implementation for Java from `Ordering[Row]`
*/
@Private
class BaseOrdering extends Ordering[InternalRow] {
def compare(a: InternalRow, b: InternalRow): Int = {
throw new UnsupportedOperationException
Expand Down
Expand Up @@ -20,6 +20,7 @@ import java.util.Comparator

import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenFallback, CodeGenContext, GeneratedExpressionCode}
import org.apache.spark.sql.catalyst.util.TypeUtils
import org.apache.spark.sql.types._

/**
Expand Down Expand Up @@ -54,15 +55,17 @@ case class SortArray(base: Expression, ascendingOrder: Expression)
override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType, BooleanType)

override def checkInputDataTypes(): TypeCheckResult = base.dataType match {
case _ @ ArrayType(n: AtomicType, _) => TypeCheckResult.TypeCheckSuccess
case _ @ ArrayType(n, _) => TypeCheckResult.TypeCheckFailure(
s"Type $n is not the AtomicType, we can not perform the ordering operations")
case other =>
TypeCheckResult.TypeCheckFailure(s"ArrayType(AtomicType) is expected, but we got $other")
case ArrayType(dt, _) if RowOrdering.isOrderable(dt) =>
TypeCheckResult.TypeCheckSuccess
case ArrayType(dt, _) =>
TypeCheckResult.TypeCheckFailure(
s"$prettyName does not support sorting array of type ${dt.simpleString}")
case _ =>
TypeCheckResult.TypeCheckFailure(s"$prettyName only supports array input.")
}

@transient
private lazy val lt = {
private lazy val lt: Comparator[Any] = {
val ordering = base.dataType match {
case _ @ ArrayType(n: AtomicType, _) => n.ordering.asInstanceOf[Ordering[Any]]
}
Expand All @@ -83,7 +86,7 @@ case class SortArray(base: Expression, ascendingOrder: Expression)
}

@transient
private lazy val gt = {
private lazy val gt: Comparator[Any] = {
val ordering = base.dataType match {
case _ @ ArrayType(n: AtomicType, _) => n.ordering.asInstanceOf[Ordering[Any]]
}
Expand All @@ -106,9 +109,7 @@ case class SortArray(base: Expression, ascendingOrder: Expression)
override def nullSafeEval(array: Any, ascending: Any): Any = {
val elementType = base.dataType.asInstanceOf[ArrayType].elementType
val data = array.asInstanceOf[ArrayData].toArray[AnyRef](elementType)
java.util.Arrays.sort(
data,
if (ascending.asInstanceOf[Boolean]) lt else gt)
java.util.Arrays.sort(data, if (ascending.asInstanceOf[Boolean]) lt else gt)
new GenericArrayData(data.asInstanceOf[Array[Any]])
}

Expand Down
Expand Up @@ -121,47 +121,3 @@ class GenericMutableRow(val values: Array[Any]) extends MutableRow {

override def copy(): InternalRow = new GenericInternalRow(values.clone())
}

class RowOrdering(ordering: Seq[SortOrder]) extends Ordering[InternalRow] {
def this(ordering: Seq[SortOrder], inputSchema: Seq[Attribute]) =
this(ordering.map(BindReferences.bindReference(_, inputSchema)))

def compare(a: InternalRow, b: InternalRow): Int = {
var i = 0
while (i < ordering.size) {
val order = ordering(i)
val left = order.child.eval(a)
val right = order.child.eval(b)

if (left == null && right == null) {
// Both null, continue looking.
} else if (left == null) {
return if (order.direction == Ascending) -1 else 1
} else if (right == null) {
return if (order.direction == Ascending) 1 else -1
} else {
val comparison = order.dataType match {
case n: AtomicType if order.direction == Ascending =>
n.ordering.asInstanceOf[Ordering[Any]].compare(left, right)
case n: AtomicType if order.direction == Descending =>
n.ordering.asInstanceOf[Ordering[Any]].reverse.compare(left, right)
case s: StructType if order.direction == Ascending =>
s.ordering.asInstanceOf[Ordering[Any]].compare(left, right)
case s: StructType if order.direction == Descending =>
s.ordering.asInstanceOf[Ordering[Any]].reverse.compare(left, right)
case other => sys.error(s"Type $other does not support ordered operations")
}
if (comparison != 0) return comparison
}
i += 1
}
return 0
}
}

object RowOrdering {
def forSchema(dataTypes: Seq[DataType]): RowOrdering =
new RowOrdering(dataTypes.zipWithIndex.map {
case(dt, index) => new SortOrder(BoundReference(index, dt, nullable = true), Ascending)
})
}
Expand Up @@ -18,39 +18,34 @@
package org.apache.spark.sql.catalyst.util

import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.expressions.RowOrdering
import org.apache.spark.sql.types._

/**
* Helper functions to check for valid data types.
*/
object TypeUtils {
def checkForNumericExpr(t: DataType, caller: String): TypeCheckResult = {
if (t.isInstanceOf[NumericType] || t == NullType) {
def checkForNumericExpr(dt: DataType, caller: String): TypeCheckResult = {
if (dt.isInstanceOf[NumericType] || dt == NullType) {
TypeCheckResult.TypeCheckSuccess
} else {
TypeCheckResult.TypeCheckFailure(s"$caller accepts numeric types, not $t")
TypeCheckResult.TypeCheckFailure(s"$caller requires numeric types, not $dt")
}
}

def checkForOrderingExpr(t: DataType, caller: String): TypeCheckResult = {
t match {
case i: AtomicType => TypeCheckResult.TypeCheckSuccess
case n: NullType => TypeCheckResult.TypeCheckSuccess
case s: StructType =>
if (s.supportOrdering(s)) {
TypeCheckResult.TypeCheckSuccess
} else {
TypeCheckResult.TypeCheckFailure(s"Fields in $s do not support ordering")
}
case other => TypeCheckResult.TypeCheckFailure(s"$t doesn't support ordering on $caller")
def checkForOrderingExpr(dt: DataType, caller: String): TypeCheckResult = {
if (RowOrdering.isOrderable(dt)) {
TypeCheckResult.TypeCheckSuccess
} else {
TypeCheckResult.TypeCheckFailure(s"$caller does not support ordering on type $dt")
}

}

def checkForSameTypeInputExpr(types: Seq[DataType], caller: String): TypeCheckResult = {
if (types.distinct.size > 1) {
TypeCheckResult.TypeCheckFailure(
s"input to $caller should all be the same type, but it's ${types.mkString("[", ", ", "]")}")
s"input to $caller should all be the same type, but it's " +
types.map(_.simpleString).mkString("[", ", ", "]"))
} else {
TypeCheckResult.TypeCheckSuccess
}
Expand Down
Expand Up @@ -302,18 +302,6 @@ case class StructType(fields: Array[StructField]) extends DataType with Seq[Stru
}

private[sql] val ordering = RowOrdering.forSchema(this.fields.map(_.dataType))

private[sql] def supportOrdering(s: StructType): Boolean = {
s.fields.forall { f =>
if (f.dataType.isInstanceOf[AtomicType]) {
true
} else if (f.dataType.isInstanceOf[StructType]) {
supportOrdering(f.dataType.asInstanceOf[StructType])
} else {
false
}
}
}
}

object StructType extends AbstractDataType {
Expand Down

0 comments on commit 30e8911

Please sign in to comment.