Skip to content

Commit

Permalink
Throw AnalysisException when using BinaryType on Join and Aggregate.
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed Jul 30, 2015
1 parent 1221849 commit 4f76cac
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,18 @@ trait CheckAnalysis {
s"join condition '${condition.prettyString}' " +
s"of type ${condition.dataType.simpleString} is not a boolean.")

case j @ Join(_, _, _, Some(condition)) =>
def checkValidJoinConditionExprs(expr: Expression): Unit = expr match {
case p: Predicate =>
p.asInstanceOf[Expression].children.foreach(checkValidJoinConditionExprs)
case e if e.dataType.isInstanceOf[BinaryType] =>
failAnalysis(s"expression ${e.prettyString} in join condition " +
s"'${condition.prettyString}' can't be binary type.")
case _ => // OK
}

checkValidJoinConditionExprs(condition)

case Aggregate(groupingExprs, aggregateExprs, child) =>
def checkValidAggregateExpression(expr: Expression): Unit = expr match {
case _: AggregateExpression => // OK
Expand All @@ -100,7 +112,15 @@ trait CheckAnalysis {
case e => e.children.foreach(checkValidAggregateExpression)
}

def checkValidGroupingExprs(expr: Expression): Unit = expr.dataType match {
case BinaryType =>
failAnalysis(s"grouping expression '${expr.prettyString}' in aggregate can " +
s"not be binary type.")
case _ => // OK
}

aggregateExprs.foreach(checkValidAggregateExpression)
aggregateExprs.foreach(checkValidGroupingExprs)

case Sort(orders, _, _) =>
orders.foreach { order =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql

import org.apache.spark.sql.TestData._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.DecimalType
import org.apache.spark.sql.types.{BinaryType, DecimalType}


class DataFrameAggregateSuite extends QueryTest {
Expand Down Expand Up @@ -191,4 +191,13 @@ class DataFrameAggregateSuite extends QueryTest {
Row(null))
}

test("aggregation can't work on binary type") {
val df = Seq(1, 1, 2, 2).map(i => Tuple1(i.toString)).toDF("c").select($"c" cast BinaryType)
intercept[AnalysisException] {
df.groupBy("c").agg(count("*"))
}
intercept[AnalysisException] {
df.distinct
}
}
}
9 changes: 9 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.scalatest.BeforeAndAfterEach
import org.apache.spark.sql.TestData._
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.execution.joins._
import org.apache.spark.sql.types.BinaryType


class JoinSuite extends QueryTest with BeforeAndAfterEach {
Expand Down Expand Up @@ -489,4 +490,12 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach {
Row(3, 2) :: Nil)

}

test("Join can't work on binary type") {
val left = Seq(1, 1, 2, 2).map(i => Tuple1(i.toString)).toDF("c").select($"c" cast BinaryType)
val right = Seq(1, 1, 2, 2).map(i => Tuple1(i.toString)).toDF("d").select($"d" cast BinaryType)
intercept[AnalysisException] {
left.join(right, ($"left.N" === $"right.N"), "full")
}
}
}

0 comments on commit 4f76cac

Please sign in to comment.