From f1ae973bca00fc86fb9e77073b1949811842d21d Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Thu, 30 Jun 2016 20:06:06 -0700 Subject: [PATCH 1/4] fix --- .../sql/catalyst/analysis/unresolved.scala | 2 ++ .../org/apache/spark/sql/SQLQuerySuite.scala | 24 +++++++++++++++++++ 2 files changed, 26 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala index b883546135f07..cd568263c1aae 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala @@ -215,6 +215,8 @@ abstract class Star extends LeafExpression with NamedExpression { case class UnresolvedStar(target: Option[Seq[String]]) extends Star with Unevaluable { override def expand(input: LogicalPlan, resolver: Resolver): Seq[NamedExpression] = { + // When the table does not have any column, the input LogicalPlan does not have any column + if (input.output.isEmpty) return Seq.empty[NamedExpression] // First try to expand assuming it is table.*. val expandedAttributes: Seq[Attribute] = target match { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 084ba9b78ec50..e63277eb53c0b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -2115,6 +2115,30 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { } } + test("Star Expansion - table with zero column") { + withTempTable("temp_table_no_cols") { + val rddNoCols = sqlContext.sparkContext.parallelize(1 to 10).map(_ => Row.empty) + val dfNoCols = sqlContext.createDataFrame(rddNoCols, StructType(Seq.empty)) + dfNoCols.createTempView("temp_table_no_cols") + + // ResolvedStar + checkAnswer( + dfNoCols, + dfNoCols.select(dfNoCols.col("*"))) + + // UnresolvedStar + checkAnswer( + dfNoCols, + sql("SELECT * FROM temp_table_no_cols")) + checkAnswer( + dfNoCols, + sql("SELECT a.* FROM temp_table_no_cols a")) + checkAnswer( + dfNoCols, + dfNoCols.select($"*")) + } + } + test("Common subexpression elimination") { // TODO: support subexpression elimination in whole stage codegen withSQLConf("spark.sql.codegen.wholeStage" -> "false") { From e3862262eaea9bef9be7283d1e7dcb105e8fa136 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Thu, 30 Jun 2016 21:26:29 -0700 Subject: [PATCH 2/4] updated the test case --- .../src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index e63277eb53c0b..b1d9f7a54aaa2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -2117,8 +2117,8 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { test("Star Expansion - table with zero column") { withTempTable("temp_table_no_cols") { - val rddNoCols = sqlContext.sparkContext.parallelize(1 to 10).map(_ => Row.empty) - val dfNoCols = sqlContext.createDataFrame(rddNoCols, StructType(Seq.empty)) + val rddNoCols = sparkContext.parallelize(1 to 10).map(_ => Row.empty) + val dfNoCols = spark.createDataFrame(rddNoCols, StructType(Seq.empty)) dfNoCols.createTempView("temp_table_no_cols") // ResolvedStar From c84d0c51e93fbcbdf5f8777c4cb31f478584aa4d Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Fri, 1 Jul 2016 08:51:37 -0700 Subject: [PATCH 3/4] address comments --- .../sql/catalyst/analysis/unresolved.scala | 58 ++++++++----------- 1 file changed, 25 insertions(+), 33 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala index cd568263c1aae..b2a15ee5b94d0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala @@ -215,43 +215,35 @@ abstract class Star extends LeafExpression with NamedExpression { case class UnresolvedStar(target: Option[Seq[String]]) extends Star with Unevaluable { override def expand(input: LogicalPlan, resolver: Resolver): Seq[NamedExpression] = { - // When the table does not have any column, the input LogicalPlan does not have any column - if (input.output.isEmpty) return Seq.empty[NamedExpression] - - // First try to expand assuming it is table.*. - val expandedAttributes: Seq[Attribute] = target match { + if (target.isEmpty) { // If there is no table specified, use all input attributes. - case None => input.output + input.output + } else if (target.get.size == 1) { // If there is a table, pick out attributes that are part of this table. - case Some(t) => if (t.size == 1) { - input.output.filter(_.qualifier.exists(resolver(_, t.head))) - } else { - List() - } - } - if (expandedAttributes.nonEmpty) return expandedAttributes - - // Try to resolve it as a struct expansion. If there is a conflict and both are possible, - // (i.e. [name].* is both a table and a struct), the struct path can always be qualified. - require(target.isDefined) - val attribute = input.resolve(target.get, resolver) - if (attribute.isDefined) { - // This target resolved to an attribute in child. It must be a struct. Expand it. - attribute.get.dataType match { - case s: StructType => s.zipWithIndex.map { - case (f, i) => - val extract = GetStructField(attribute.get, i) - Alias(extract, f.name)() + input.output.filter(_.qualifier.exists(resolver(_, target.get.head))) + } else { + // Try to resolve it as a struct expansion. If there is a conflict and both are possible, + // (i.e. [name].* is both a table and a struct), the struct path can always be qualified. + require(target.isDefined) + val attribute = input.resolve(target.get, resolver) + if (attribute.isDefined) { + // This target resolved to an attribute in child. It must be a struct. Expand it. + attribute.get.dataType match { + case s: StructType => s.zipWithIndex.map { + case (f, i) => + val extract = GetStructField(attribute.get, i) + Alias(extract, f.name)() + } + + case _ => + throw new AnalysisException("Can only star expand struct data types. Attribute: `" + + target.get + "`") } - - case _ => - throw new AnalysisException("Can only star expand struct data types. Attribute: `" + - target.get + "`") + } else { + val from = input.inputSet.map(_.name).mkString(", ") + val targetString = target.get.mkString(".") + throw new AnalysisException(s"cannot resolve '$targetString.*' give input columns '$from'") } - } else { - val from = input.inputSet.map(_.name).mkString(", ") - val targetString = target.get.mkString(".") - throw new AnalysisException(s"cannot resolve '$targetString.*' give input columns '$from'") } } From bd83643fb1b7adf8eef4694c45fb4b061c03a012 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Fri, 1 Jul 2016 10:27:52 -0700 Subject: [PATCH 4/4] fix the testcases --- .../sql/catalyst/analysis/unresolved.scala | 57 ++++++++++--------- .../org/apache/spark/sql/SQLQuerySuite.scala | 13 ++++- 2 files changed, 40 insertions(+), 30 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala index b2a15ee5b94d0..609089a302c88 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala @@ -215,35 +215,38 @@ abstract class Star extends LeafExpression with NamedExpression { case class UnresolvedStar(target: Option[Seq[String]]) extends Star with Unevaluable { override def expand(input: LogicalPlan, resolver: Resolver): Seq[NamedExpression] = { - if (target.isEmpty) { - // If there is no table specified, use all input attributes. - input.output - } else if (target.get.size == 1) { - // If there is a table, pick out attributes that are part of this table. - input.output.filter(_.qualifier.exists(resolver(_, target.get.head))) - } else { - // Try to resolve it as a struct expansion. If there is a conflict and both are possible, - // (i.e. [name].* is both a table and a struct), the struct path can always be qualified. - require(target.isDefined) - val attribute = input.resolve(target.get, resolver) - if (attribute.isDefined) { - // This target resolved to an attribute in child. It must be a struct. Expand it. - attribute.get.dataType match { - case s: StructType => s.zipWithIndex.map { - case (f, i) => - val extract = GetStructField(attribute.get, i) - Alias(extract, f.name)() - } - - case _ => - throw new AnalysisException("Can only star expand struct data types. Attribute: `" + - target.get + "`") - } + // If there is no table specified, use all input attributes. + if (target.isEmpty) return input.output + + val expandedAttributes = + if (target.get.size == 1) { + // If there is a table, pick out attributes that are part of this table. + input.output.filter(_.qualifier.exists(resolver(_, target.get.head))) } else { - val from = input.inputSet.map(_.name).mkString(", ") - val targetString = target.get.mkString(".") - throw new AnalysisException(s"cannot resolve '$targetString.*' give input columns '$from'") + List() } + if (expandedAttributes.nonEmpty) return expandedAttributes + + // Try to resolve it as a struct expansion. If there is a conflict and both are possible, + // (i.e. [name].* is both a table and a struct), the struct path can always be qualified. + val attribute = input.resolve(target.get, resolver) + if (attribute.isDefined) { + // This target resolved to an attribute in child. It must be a struct. Expand it. + attribute.get.dataType match { + case s: StructType => s.zipWithIndex.map { + case (f, i) => + val extract = GetStructField(attribute.get, i) + Alias(extract, f.name)() + } + + case _ => + throw new AnalysisException("Can only star expand struct data types. Attribute: `" + + target.get + "`") + } + } else { + val from = input.inputSet.map(_.name).mkString(", ") + val targetString = target.get.mkString(".") + throw new AnalysisException(s"cannot resolve '$targetString.*' give input columns '$from'") } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index b1d9f7a54aaa2..dca9e5e503c72 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -2130,12 +2130,19 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { checkAnswer( dfNoCols, sql("SELECT * FROM temp_table_no_cols")) - checkAnswer( - dfNoCols, - sql("SELECT a.* FROM temp_table_no_cols a")) checkAnswer( dfNoCols, dfNoCols.select($"*")) + + var e = intercept[AnalysisException] { + sql("SELECT a.* FROM temp_table_no_cols a") + }.getMessage + assert(e.contains("cannot resolve 'a.*' give input columns ''")) + + e = intercept[AnalysisException] { + dfNoCols.select($"b.*") + }.getMessage + assert(e.contains("cannot resolve 'b.*' give input columns ''")) } }