Skip to content

Commit

Permalink
[SPARK-18674][SQL] improve the error message of using join
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

The current error message of USING join is quite confusing, for example:
```
scala> val df1 = List(1,2,3).toDS.withColumnRenamed("value", "c1")
df1: org.apache.spark.sql.DataFrame = [c1: int]

scala> val df2 = List(1,2,3).toDS.withColumnRenamed("value", "c2")
df2: org.apache.spark.sql.DataFrame = [c2: int]

scala> df1.join(df2, usingColumn = "c1")
org.apache.spark.sql.AnalysisException: using columns ['c1] can not be resolved given input columns: [c1, c2] ;;
'Join UsingJoin(Inner,List('c1))
:- Project [value#1 AS c1#3]
:  +- LocalRelation [value#1]
+- Project [value#7 AS c2#9]
   +- LocalRelation [value#7]
```

after this PR, it becomes:
```
scala> val df1 = List(1,2,3).toDS.withColumnRenamed("value", "c1")
df1: org.apache.spark.sql.DataFrame = [c1: int]

scala> val df2 = List(1,2,3).toDS.withColumnRenamed("value", "c2")
df2: org.apache.spark.sql.DataFrame = [c2: int]

scala> df1.join(df2, usingColumn = "c1")
org.apache.spark.sql.AnalysisException: USING column `c1` can not be resolved with the right join side, the right output is: [c2];
```

## How was this patch tested?

updated tests

Author: Wenchen Fan <wenchen@databricks.com>

Closes #16100 from cloud-fan/natural.
  • Loading branch information
cloud-fan authored and hvanhovell committed Dec 1, 2016
1 parent 2ab8551 commit e653484
Show file tree
Hide file tree
Showing 7 changed files with 34 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1958,15 +1958,7 @@ class Analyzer(
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case j @ Join(left, right, UsingJoin(joinType, usingCols), condition)
if left.resolved && right.resolved && j.duplicateResolved =>
// Resolve the column names referenced in using clause from both the legs of join.
val lCols = usingCols.flatMap(col => left.resolveQuoted(col.name, resolver))
val rCols = usingCols.flatMap(col => right.resolveQuoted(col.name, resolver))
if ((lCols.length == usingCols.length) && (rCols.length == usingCols.length)) {
val joinNames = lCols.map(exp => exp.name)
commonNaturalJoinProcessing(left, right, joinType, joinNames, None)
} else {
j
}
commonNaturalJoinProcessing(left, right, joinType, usingCols, None)
case j @ Join(left, right, NaturalJoin(joinType), condition) if j.resolvedExceptNatural =>
// find common column names from both sides
val joinNames = left.output.map(_.name).intersect(right.output.map(_.name))
Expand All @@ -1981,18 +1973,16 @@ class Analyzer(
joinNames: Seq[String],
condition: Option[Expression]) = {
val leftKeys = joinNames.map { keyName =>
val joinColumn = left.output.find(attr => resolver(attr.name, keyName))
assert(
joinColumn.isDefined,
s"$keyName should exist in ${left.output.map(_.name).mkString(",")}")
joinColumn.get
left.output.find(attr => resolver(attr.name, keyName)).getOrElse {
throw new AnalysisException(s"USING column `$keyName` can not be resolved with the " +
s"left join side, the left output is: [${left.output.map(_.name).mkString(", ")}]")
}
}
val rightKeys = joinNames.map { keyName =>
val joinColumn = right.output.find(attr => resolver(attr.name, keyName))
assert(
joinColumn.isDefined,
s"$keyName should exist in ${right.output.map(_.name).mkString(",")}")
joinColumn.get
right.output.find(attr => resolver(attr.name, keyName)).getOrElse {
throw new AnalysisException(s"USING column `$keyName` can not be resolved with the " +
s"right join side, the right output is: [${right.output.map(_.name).mkString(", ")}]")
}
}
val joinPairs = leftKeys.zip(rightKeys)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,12 +190,6 @@ trait CheckAnalysis extends PredicateHelper {
case e =>
}

case j @ Join(_, _, UsingJoin(_, cols), _) =>
val from = operator.inputSet.map(_.name).mkString(", ")
failAnalysis(
s"using columns [${cols.mkString(",")}] " +
s"can not be resolved given input columns: [$from] ")

case j @ Join(_, _, _, Some(condition)) if condition.dataType != BooleanType =>
failAnalysis(
s"join condition '${condition.sql}' " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -555,10 +555,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging {
// Resolve the join type and join condition
val (joinType, condition) = Option(join.joinCriteria) match {
case Some(c) if c.USING != null =>
val columns = c.identifier.asScala.map { column =>
UnresolvedAttribute.quoted(column.getText)
}
(UsingJoin(baseJoinType, columns), None)
(UsingJoin(baseJoinType, c.identifier.asScala.map(_.getText)), None)
case Some(c) if c.booleanExpression != null =>
(baseJoinType, Option(expression(c.booleanExpression)))
case None if join.NATURAL != null =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ case class NaturalJoin(tpe: JoinType) extends JoinType {
override def sql: String = "NATURAL " + tpe.sql
}

case class UsingJoin(tpe: JoinType, usingColumns: Seq[UnresolvedAttribute]) extends JoinType {
case class UsingJoin(tpe: JoinType, usingColumns: Seq[String]) extends JoinType {
require(Seq(Inner, LeftOuter, LeftSemi, RightOuter, FullOuter, LeftAnti).contains(tpe),
"Unsupported using join type " + tpe)
override def sql: String = "USING " + tpe.sql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,31 +38,31 @@ class ResolveNaturalJoinSuite extends AnalysisTest {

test("natural/using inner join") {
val naturalPlan = r1.join(r2, NaturalJoin(Inner), None)
val usingPlan = r1.join(r2, UsingJoin(Inner, Seq(UnresolvedAttribute("a"))), None)
val usingPlan = r1.join(r2, UsingJoin(Inner, Seq("a")), None)
val expected = r1.join(r2, Inner, Some(EqualTo(a, a))).select(a, b, c)
checkAnalysis(naturalPlan, expected)
checkAnalysis(usingPlan, expected)
}

test("natural/using left join") {
val naturalPlan = r1.join(r2, NaturalJoin(LeftOuter), None)
val usingPlan = r1.join(r2, UsingJoin(LeftOuter, Seq(UnresolvedAttribute("a"))), None)
val usingPlan = r1.join(r2, UsingJoin(LeftOuter, Seq("a")), None)
val expected = r1.join(r2, LeftOuter, Some(EqualTo(a, a))).select(a, b, c)
checkAnalysis(naturalPlan, expected)
checkAnalysis(usingPlan, expected)
}

test("natural/using right join") {
val naturalPlan = r1.join(r2, NaturalJoin(RightOuter), None)
val usingPlan = r1.join(r2, UsingJoin(RightOuter, Seq(UnresolvedAttribute("a"))), None)
val usingPlan = r1.join(r2, UsingJoin(RightOuter, Seq("a")), None)
val expected = r1.join(r2, RightOuter, Some(EqualTo(a, a))).select(a, b, c)
checkAnalysis(naturalPlan, expected)
checkAnalysis(usingPlan, expected)
}

test("natural/using full outer join") {
val naturalPlan = r1.join(r2, NaturalJoin(FullOuter), None)
val usingPlan = r1.join(r2, UsingJoin(FullOuter, Seq(UnresolvedAttribute("a"))), None)
val usingPlan = r1.join(r2, UsingJoin(FullOuter, Seq("a")), None)
val expected = r1.join(r2, FullOuter, Some(EqualTo(a, a))).select(
Alias(Coalesce(Seq(a, a)), "a")(), b, c)
checkAnalysis(naturalPlan, expected)
Expand All @@ -71,7 +71,7 @@ class ResolveNaturalJoinSuite extends AnalysisTest {

test("natural/using inner join with no nullability") {
val naturalPlan = r3.join(r4, NaturalJoin(Inner), None)
val usingPlan = r3.join(r4, UsingJoin(Inner, Seq(UnresolvedAttribute("b"))), None)
val usingPlan = r3.join(r4, UsingJoin(Inner, Seq("b")), None)
val expected = r3.join(r4, Inner, Some(EqualTo(bNotNull, bNotNull))).select(
bNotNull, aNotNull, cNotNull)
checkAnalysis(naturalPlan, expected)
Expand All @@ -80,7 +80,7 @@ class ResolveNaturalJoinSuite extends AnalysisTest {

test("natural/using left join with no nullability") {
val naturalPlan = r3.join(r4, NaturalJoin(LeftOuter), None)
val usingPlan = r3.join(r4, UsingJoin(LeftOuter, Seq(UnresolvedAttribute("b"))), None)
val usingPlan = r3.join(r4, UsingJoin(LeftOuter, Seq("b")), None)
val expected = r3.join(r4, LeftOuter, Some(EqualTo(bNotNull, bNotNull))).select(
bNotNull, aNotNull, c)
checkAnalysis(naturalPlan, expected)
Expand All @@ -89,7 +89,7 @@ class ResolveNaturalJoinSuite extends AnalysisTest {

test("natural/using right join with no nullability") {
val naturalPlan = r3.join(r4, NaturalJoin(RightOuter), None)
val usingPlan = r3.join(r4, UsingJoin(RightOuter, Seq(UnresolvedAttribute("b"))), None)
val usingPlan = r3.join(r4, UsingJoin(RightOuter, Seq("b")), None)
val expected = r3.join(r4, RightOuter, Some(EqualTo(bNotNull, bNotNull))).select(
bNotNull, a, cNotNull)
checkAnalysis(naturalPlan, expected)
Expand All @@ -98,48 +98,43 @@ class ResolveNaturalJoinSuite extends AnalysisTest {

test("natural/using full outer join with no nullability") {
val naturalPlan = r3.join(r4, NaturalJoin(FullOuter), None)
val usingPlan = r3.join(r4, UsingJoin(FullOuter, Seq(UnresolvedAttribute("b"))), None)
val usingPlan = r3.join(r4, UsingJoin(FullOuter, Seq("b")), None)
val expected = r3.join(r4, FullOuter, Some(EqualTo(bNotNull, bNotNull))).select(
Alias(Coalesce(Seq(b, b)), "b")(), a, c)
checkAnalysis(naturalPlan, expected)
checkAnalysis(usingPlan, expected)
}

test("using unresolved attribute") {
val usingPlan = r1.join(r2, UsingJoin(Inner, Seq(UnresolvedAttribute("d"))), None)
val error = intercept[AnalysisException] {
SimpleAnalyzer.checkAnalysis(usingPlan)
}
assert(error.message.contains(
"using columns ['d] can not be resolved given input columns: [b, a, c]"))
assertAnalysisError(
r1.join(r2, UsingJoin(Inner, Seq("d"))),
"USING column `d` can not be resolved with the left join side" :: Nil)
assertAnalysisError(
r1.join(r2, UsingJoin(Inner, Seq("b"))),
"USING column `b` can not be resolved with the right join side" :: Nil)
}

test("using join with a case sensitive analyzer") {
val expected = r1.join(r2, Inner, Some(EqualTo(a, a))).select(a, b, c)

{
val usingPlan = r1.join(r2, UsingJoin(Inner, Seq(UnresolvedAttribute("a"))), None)
checkAnalysis(usingPlan, expected, caseSensitive = true)
}
val usingPlan = r1.join(r2, UsingJoin(Inner, Seq("a")), None)
checkAnalysis(usingPlan, expected, caseSensitive = true)

{
val usingPlan = r1.join(r2, UsingJoin(Inner, Seq(UnresolvedAttribute("A"))), None)
assertAnalysisError(
usingPlan,
Seq("using columns ['A] can not be resolved given input columns: [b, a, c, a]"))
}
assertAnalysisError(
r1.join(r2, UsingJoin(Inner, Seq("A"))),
"USING column `A` can not be resolved with the left join side" :: Nil)
}

test("using join with a case insensitive analyzer") {
val expected = r1.join(r2, Inner, Some(EqualTo(a, a))).select(a, b, c)

{
val usingPlan = r1.join(r2, UsingJoin(Inner, Seq(UnresolvedAttribute("a"))), None)
val usingPlan = r1.join(r2, UsingJoin(Inner, Seq("a")), None)
checkAnalysis(usingPlan, expected, caseSensitive = false)
}

{
val usingPlan = r1.join(r2, UsingJoin(Inner, Seq(UnresolvedAttribute("A"))), None)
val usingPlan = r1.join(r2, UsingJoin(Inner, Seq("A")), None)
checkAnalysis(usingPlan, expected, caseSensitive = false)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ class PlanParserSuite extends PlanTest {
val testUsingJoin = (sql: String, jt: JoinType) => {
assertEqual(
s"select * from t $sql u using(a, b)",
table("t").join(table("u"), UsingJoin(jt, Seq('a.attr, 'b.attr)), None).select(star()))
table("t").join(table("u"), UsingJoin(jt, Seq("a", "b")), None).select(star()))
}
val testAll = Seq(testUnconditionalJoin, testConditionalJoin, testNaturalJoin, testUsingJoin)
val testExistence = Seq(testUnconditionalJoin, testConditionalJoin, testUsingJoin)
Expand Down
2 changes: 1 addition & 1 deletion sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -776,7 +776,7 @@ class Dataset[T] private[sql](
Join(
joined.left,
joined.right,
UsingJoin(JoinType(joinType), usingColumns.map(UnresolvedAttribute(_))),
UsingJoin(JoinType(joinType), usingColumns),
None)
}
}
Expand Down

0 comments on commit e653484

Please sign in to comment.