From f2852188f4c4c86b17d0668595e9640de3beb737 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Sun, 7 Aug 2016 15:57:05 -0700 Subject: [PATCH 1/3] [SPARK-16938][SQL] `dropDuplicate` should not raise exception on qualified column names. --- sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala | 2 +- .../test/scala/org/apache/spark/sql/DataFrameSuite.scala | 7 +++++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 3b3cb820788a2..ed9f0032f9c26 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -1879,7 +1879,7 @@ class Dataset[T] private[sql]( val resolver = sparkSession.sessionState.analyzer.resolver val allColumns = queryExecution.analyzed.output val groupCols = colNames.map { colName => - allColumns.find(col => resolver(col.name, colName)).getOrElse( + allColumns.find(col => resolver(col.qualifiedName, colName)).getOrElse( throw new AnalysisException( s"""Cannot resolve column name "$colName" among (${schema.fieldNames.mkString(", ")})""")) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index c2d256bdd335b..2cf3d24788b2a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -1562,6 +1562,13 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { checkAnswer(df.distinct(), Row(1) :: Row(2) :: Nil) } + test("SPARK-16938: `dropDuplicate` should not raise exception on qualified column names") { + val dfa = Seq((1, 2), (2, 3)).toDF("id", "a").alias("dfa") + val dfb = Seq((1, 0), (1, 1)).toDF("id", "b").alias("dfb") + checkAnswer(dfa.join(dfb, dfa("id") === dfb("id")).dropDuplicates(Array("dfa.id", "dfb.id")), + Row(1, 2, 1, 1) :: Nil) + } + test("SPARK-16181: outer join with isNull filter") { val left = Seq("x").toDF("col") val right = Seq("y").toDF("col").withColumn("new", lit(true)) From 1172fae10c9ad0338aff06569d0ab9e57041ec87 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Tue, 9 Aug 2016 23:30:24 -0700 Subject: [PATCH 2/3] Address comments. --- .../main/scala/org/apache/spark/sql/Dataset.scala | 6 +++--- .../scala/org/apache/spark/sql/DataFrameSuite.scala | 13 ++++++++++++- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index ed9f0032f9c26..de4a7ba0da04f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -1827,7 +1827,7 @@ class Dataset[T] private[sql]( val resolver = sparkSession.sessionState.analyzer.resolver val allColumns = queryExecution.analyzed.output val remainingCols = allColumns.filter { attribute => - colNames.forall(n => !resolver(attribute.name, n)) + colNames.forall(n => !(resolver(attribute.name, n) || resolver(attribute.qualifiedName, n))) }.map(attribute => Column(attribute)) if (remainingCols.size == allColumns.size) { toDF() @@ -1879,8 +1879,8 @@ class Dataset[T] private[sql]( val resolver = sparkSession.sessionState.analyzer.resolver val allColumns = queryExecution.analyzed.output val groupCols = colNames.map { colName => - allColumns.find(col => resolver(col.qualifiedName, colName)).getOrElse( - throw new AnalysisException( + allColumns.find(col => resolver(col.qualifiedName, colName) || resolver(col.name, colName)) + .getOrElse(throw new AnalysisException( s"""Cannot resolve column name "$colName" among (${schema.fieldNames.mkString(", ")})""")) } val groupColExprIds = groupCols.map(_.exprId) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 2cf3d24788b2a..c15151cb3d1ee 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -1562,9 +1562,20 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { checkAnswer(df.distinct(), Row(1) :: Row(2) :: Nil) } - test("SPARK-16938: `dropDuplicate` should not raise exception on qualified column names") { + test("SPARK-16938: `drop/dropDuplicate` should handle qualified column names") { val dfa = Seq((1, 2), (2, 3)).toDF("id", "a").alias("dfa") val dfb = Seq((1, 0), (1, 1)).toDF("id", "b").alias("dfb") + + checkAnswer(dfa.drop("dfa.id"), Row(2) :: Row(3) :: Nil) + checkAnswer(dfa.drop("id"), Row(2) :: Row(3) :: Nil) + checkAnswer(dfa.drop($"dfa.id"), Row(2) :: Row(3) :: Nil) + checkAnswer(dfa.drop($"id"), Row(2) :: Row(3) :: Nil) + + checkAnswer(dfb.dropDuplicates(Array("dfb.id")), Row(1, 0) :: Nil) + checkAnswer(dfb.dropDuplicates(Array("dfb.b")), Row(1, 0) :: Row(1, 1) :: Nil) + checkAnswer(dfb.dropDuplicates(Array("id")), Row(1, 0) :: Nil) + checkAnswer(dfb.dropDuplicates(Array("b")), Row(1, 0) :: Row(1, 1) :: Nil) + checkAnswer(dfa.join(dfb, dfa("id") === dfb("id")).dropDuplicates(Array("dfa.id", "dfb.id")), Row(1, 2, 1, 1) :: Nil) } From 67ea92448463c559ef9650eead719f69b5f3b51b Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Mon, 22 Aug 2016 14:02:04 -0700 Subject: [PATCH 3/3] Add more testcases. --- .../src/test/scala/org/apache/spark/sql/DataFrameSuite.scala | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index c15151cb3d1ee..10b63d1cd5f1e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -1571,6 +1571,11 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { checkAnswer(dfa.drop($"dfa.id"), Row(2) :: Row(3) :: Nil) checkAnswer(dfa.drop($"id"), Row(2) :: Row(3) :: Nil) + checkAnswer(dfa.join(dfb, dfa("id") === dfb("id")).drop("dfa.id"), + Row(2, 1, 0) :: Row(2, 1, 1) :: Nil) + checkAnswer(dfa.join(dfb, dfa("id") === dfb("id")).drop("dfa.id", "dfb.id"), + Row(2, 0) :: Row(2, 1) :: Nil) + checkAnswer(dfb.dropDuplicates(Array("dfb.id")), Row(1, 0) :: Nil) checkAnswer(dfb.dropDuplicates(Array("dfb.b")), Row(1, 0) :: Row(1, 1) :: Nil) checkAnswer(dfb.dropDuplicates(Array("id")), Row(1, 0) :: Nil)