From 03a4281751e02acd2b97ceff6cf8e1621e83eb93 Mon Sep 17 00:00:00 2001 From: Bogdan Raducanu Date: Thu, 20 Apr 2017 12:59:49 +0200 Subject: [PATCH 1/9] fix + test --- .../parquet/ParquetQuerySuite.scala | 35 ++++++++++++++++++- .../apache/spark/sql/test/SQLTestUtils.scala | 19 ++++++++-- .../spark/sql/test/SharedSQLContext.scala | 13 ++++--- 3 files changed, 60 insertions(+), 7 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala index c36609586c807..32c73495a0f9e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -23,7 +23,7 @@ import java.sql.Timestamp import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.parquet.hadoop.ParquetOutputFormat -import org.apache.spark.SparkException +import org.apache.spark.{DebugFilesystem, SparkException} import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow @@ -316,6 +316,39 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext } } + /** + * this is part of test 'Enabling/disabling ignoreCorruptFiles' but run in a loop + * to increase the chance of failure + */ + test("SPARK-20407 ParquetQuerySuite 'Enabling/disabling ignoreCorruptFiles' flaky test") { + def testIgnoreCorruptFiles(): Unit = { + withTempDir { dir => + val basePath = dir.getCanonicalPath + spark.range(1).toDF("a").write.parquet(new Path(basePath, "first").toString) + spark.range(1, 2).toDF("a").write.parquet(new Path(basePath, "second").toString) + spark.range(2, 3).toDF("a").write.json(new Path(basePath, "third").toString) + val df = spark.read.parquet( + new Path(basePath, "first").toString, + new Path(basePath, "second").toString, + new Path(basePath, "third").toString) + checkAnswer( + df, + Seq(Row(0), Row(1))) + } + } + + for (i <- 1 to 100) { + DebugFilesystem.clearOpenStreams() + withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") { + val exception = intercept[SparkException] { + testIgnoreCorruptFiles() + } + assert(exception.getMessage().contains("is not a Parquet file")) + } + DebugFilesystem.assertNoOpenStreams() + } + } + test("SPARK-8990 DataFrameReader.parquet() should respect user specified options") { withTempPath { dir => val basePath = dir.getCanonicalPath diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala index 6a4cc95d36bea..71ffd316ea8ce 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala @@ -22,11 +22,13 @@ import java.net.URI import java.nio.file.Files import java.util.UUID +import scala.concurrent.duration._ import scala.language.implicitConversions import scala.util.control.NonFatal import org.apache.hadoop.fs.Path import org.scalatest.BeforeAndAfterAll +import org.scalatest.concurrent.Eventually import org.apache.spark.SparkFunSuite import org.apache.spark.sql._ @@ -49,7 +51,7 @@ import org.apache.spark.util.{UninterruptibleThread, Utils} * prone to leaving multiple overlapping [[org.apache.spark.SparkContext]]s in the same JVM. */ private[sql] trait SQLTestUtils - extends SparkFunSuite + extends SparkFunSuite with Eventually with BeforeAndAfterAll with SQLTestData { self => @@ -138,6 +140,15 @@ private[sql] trait SQLTestUtils } } + /** + * Waits for all tasks on all executors to be finished. + */ + protected def waitForTasksToFinish(): Unit = { + eventually(timeout(10.seconds)) { + assert(spark.sparkContext.statusTracker + .getExecutorInfos.map(_.numRunningTasks()).sum == 0) + } + } /** * Creates a temporary directory, which is then passed to `f` and will be deleted after `f` * returns. @@ -146,7 +157,11 @@ private[sql] trait SQLTestUtils */ protected def withTempDir(f: File => Unit): Unit = { val dir = Utils.createTempDir().getCanonicalFile - try f(dir) finally Utils.deleteRecursively(dir) + try f(dir) finally { + // wait for all tasks to finish before deleting files + waitForTasksToFinish() + Utils.deleteRecursively(dir) + } } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala index e122b39f6fc40..3d76e05f616d5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala @@ -17,17 +17,18 @@ package org.apache.spark.sql.test +import scala.concurrent.duration._ + import org.scalatest.BeforeAndAfterEach +import org.scalatest.concurrent.Eventually import org.apache.spark.{DebugFilesystem, SparkConf} import org.apache.spark.sql.{SparkSession, SQLContext} -import org.apache.spark.sql.internal.SQLConf - /** * Helper trait for SQL test suites where all tests share a single [[TestSparkSession]]. */ -trait SharedSQLContext extends SQLTestUtils with BeforeAndAfterEach { +trait SharedSQLContext extends SQLTestUtils with BeforeAndAfterEach with Eventually { protected val sparkConf = new SparkConf() @@ -84,6 +85,10 @@ trait SharedSQLContext extends SQLTestUtils with BeforeAndAfterEach { protected override def afterEach(): Unit = { super.afterEach() - DebugFilesystem.assertNoOpenStreams() + // files can be closed from other threads, so wait a bit + // normally this doesn't take more than 1s + eventually(timeout(10.seconds)) { + DebugFilesystem.assertNoOpenStreams() + } } } From 72cf1d117890abe45aa30c6b91a7e2c527fc4969 Mon Sep 17 00:00:00 2001 From: Bogdan Raducanu Date: Thu, 20 Apr 2017 13:01:40 +0200 Subject: [PATCH 2/9] reverted mistake commit --- .../parquet/ParquetQuerySuite.scala | 35 +------------------ .../apache/spark/sql/test/SQLTestUtils.scala | 19 ++-------- .../spark/sql/test/SharedSQLContext.scala | 13 +++---- 3 files changed, 7 insertions(+), 60 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala index 32c73495a0f9e..c36609586c807 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -23,7 +23,7 @@ import java.sql.Timestamp import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.parquet.hadoop.ParquetOutputFormat -import org.apache.spark.{DebugFilesystem, SparkException} +import org.apache.spark.SparkException import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow @@ -316,39 +316,6 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext } } - /** - * this is part of test 'Enabling/disabling ignoreCorruptFiles' but run in a loop - * to increase the chance of failure - */ - test("SPARK-20407 ParquetQuerySuite 'Enabling/disabling ignoreCorruptFiles' flaky test") { - def testIgnoreCorruptFiles(): Unit = { - withTempDir { dir => - val basePath = dir.getCanonicalPath - spark.range(1).toDF("a").write.parquet(new Path(basePath, "first").toString) - spark.range(1, 2).toDF("a").write.parquet(new Path(basePath, "second").toString) - spark.range(2, 3).toDF("a").write.json(new Path(basePath, "third").toString) - val df = spark.read.parquet( - new Path(basePath, "first").toString, - new Path(basePath, "second").toString, - new Path(basePath, "third").toString) - checkAnswer( - df, - Seq(Row(0), Row(1))) - } - } - - for (i <- 1 to 100) { - DebugFilesystem.clearOpenStreams() - withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") { - val exception = intercept[SparkException] { - testIgnoreCorruptFiles() - } - assert(exception.getMessage().contains("is not a Parquet file")) - } - DebugFilesystem.assertNoOpenStreams() - } - } - test("SPARK-8990 DataFrameReader.parquet() should respect user specified options") { withTempPath { dir => val basePath = dir.getCanonicalPath diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala index 71ffd316ea8ce..6a4cc95d36bea 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala @@ -22,13 +22,11 @@ import java.net.URI import java.nio.file.Files import java.util.UUID -import scala.concurrent.duration._ import scala.language.implicitConversions import scala.util.control.NonFatal import org.apache.hadoop.fs.Path import org.scalatest.BeforeAndAfterAll -import org.scalatest.concurrent.Eventually import org.apache.spark.SparkFunSuite import org.apache.spark.sql._ @@ -51,7 +49,7 @@ import org.apache.spark.util.{UninterruptibleThread, Utils} * prone to leaving multiple overlapping [[org.apache.spark.SparkContext]]s in the same JVM. */ private[sql] trait SQLTestUtils - extends SparkFunSuite with Eventually + extends SparkFunSuite with BeforeAndAfterAll with SQLTestData { self => @@ -140,15 +138,6 @@ private[sql] trait SQLTestUtils } } - /** - * Waits for all tasks on all executors to be finished. - */ - protected def waitForTasksToFinish(): Unit = { - eventually(timeout(10.seconds)) { - assert(spark.sparkContext.statusTracker - .getExecutorInfos.map(_.numRunningTasks()).sum == 0) - } - } /** * Creates a temporary directory, which is then passed to `f` and will be deleted after `f` * returns. @@ -157,11 +146,7 @@ private[sql] trait SQLTestUtils */ protected def withTempDir(f: File => Unit): Unit = { val dir = Utils.createTempDir().getCanonicalFile - try f(dir) finally { - // wait for all tasks to finish before deleting files - waitForTasksToFinish() - Utils.deleteRecursively(dir) - } + try f(dir) finally Utils.deleteRecursively(dir) } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala index 3d76e05f616d5..e122b39f6fc40 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala @@ -17,18 +17,17 @@ package org.apache.spark.sql.test -import scala.concurrent.duration._ - import org.scalatest.BeforeAndAfterEach -import org.scalatest.concurrent.Eventually import org.apache.spark.{DebugFilesystem, SparkConf} import org.apache.spark.sql.{SparkSession, SQLContext} +import org.apache.spark.sql.internal.SQLConf + /** * Helper trait for SQL test suites where all tests share a single [[TestSparkSession]]. */ -trait SharedSQLContext extends SQLTestUtils with BeforeAndAfterEach with Eventually { +trait SharedSQLContext extends SQLTestUtils with BeforeAndAfterEach { protected val sparkConf = new SparkConf() @@ -85,10 +84,6 @@ trait SharedSQLContext extends SQLTestUtils with BeforeAndAfterEach with Eventua protected override def afterEach(): Unit = { super.afterEach() - // files can be closed from other threads, so wait a bit - // normally this doesn't take more than 1s - eventually(timeout(10.seconds)) { - DebugFilesystem.assertNoOpenStreams() - } + DebugFilesystem.assertNoOpenStreams() } } From 84c0746d3d71c5a7f7e99c427ab68b72871817a3 Mon Sep 17 00:00:00 2001 From: Bogdan Raducanu Date: Wed, 24 May 2017 11:14:30 +0200 Subject: [PATCH 3/9] new syntax + tests --- .../spark/sql/catalyst/parser/SqlBase.g4 | 4 +- .../sql/catalyst/analysis/ResolveHints.scala | 8 ++- .../sql/catalyst/parser/AstBuilder.scala | 6 +- .../plans/logical/basicLogicalOperators.scala | 2 +- .../sql/catalyst/parser/PlanParserSuite.scala | 57 ++++++++++++++----- 5 files changed, 55 insertions(+), 22 deletions(-) diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index dc11e536efc45..e2f796e1b1774 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -371,7 +371,7 @@ querySpecification (RECORDREADER recordReader=STRING)? fromClause? (WHERE where=booleanExpression)?) - | ((kind=SELECT hint? setQuantifier? namedExpressionSeq fromClause? + | ((kind=SELECT hint* setQuantifier? namedExpressionSeq fromClause? | fromClause (kind=SELECT setQuantifier? namedExpressionSeq)?) lateralView* (WHERE where=booleanExpression)? @@ -386,7 +386,7 @@ hint hintStatement : hintName=identifier - | hintName=identifier '(' parameters+=identifier (',' parameters+=identifier)* ')' + | hintName=identifier '(' parameters+=primaryExpression (',' parameters+=primaryExpression)* ')' ; fromClause diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala index df688fa0e58ae..c2c4c8f18bf6f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.analysis import java.util.Locale +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.CurrentOrigin @@ -91,7 +92,12 @@ object ResolveHints { BroadcastHint(h.child) } else { // Otherwise, find within the subtree query plans that should be broadcasted. - applyBroadcastHint(h.child, h.parameters.toSet) + applyBroadcastHint(h.child, h.parameters.map { + case tableName: String => tableName + case tableId: UnresolvedAttribute => tableId.name + case unsupported => throw new AnalysisException("Broadcast hint parameter should be " + + s" identifier or string but was $unsupported (${unsupported.getClass}") + }.toSet) } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index f033fd4834c96..63524bd775461 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -407,7 +407,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging val withWindow = withDistinct.optionalMap(windows)(withWindows) // Hint - withWindow.optionalMap(hint)(withHints) + hint.asScala.foldRight(withWindow)(withHints) } } @@ -533,13 +533,13 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging } /** - * Add a [[Hint]] to a logical plan. + * Add [[Hint]]s to a logical plan. */ private def withHints( ctx: HintContext, query: LogicalPlan): LogicalPlan = withOrigin(ctx) { val stmt = ctx.hintStatement - Hint(stmt.hintName.getText, stmt.parameters.asScala.map(_.getText), query) + Hint(stmt.hintName.getText, stmt.parameters.asScala.map(expression), query) } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index d291ca0020838..4ccd4a95246ab 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -390,7 +390,7 @@ case class BroadcastHint(child: LogicalPlan) extends UnaryNode { * A general hint for the child. This node will be eliminated post analysis. * A pair of (name, parameters). */ -case class Hint(name: String, parameters: Seq[String], child: LogicalPlan) extends UnaryNode { +case class Hint(name: String, parameters: Seq[Any], child: LogicalPlan) extends UnaryNode { override lazy val resolved: Boolean = false override def output: Seq[Attribute] = child.output } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala index d78741d032f38..a7b011f86ec2e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.catalyst.parser import org.apache.spark.sql.catalyst.FunctionIdentifier -import org.apache.spark.sql.catalyst.analysis.{UnresolvedGenerator, UnresolvedInlineTable, UnresolvedTableValuedFunction} +import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedGenerator, UnresolvedInlineTable, UnresolvedTableValuedFunction} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ @@ -518,19 +518,13 @@ class PlanParserSuite extends PlanTest { val m = intercept[ParseException] { parsePlan("SELECT /*+ HINT() */ * FROM t") }.getMessage - assert(m.contains("no viable alternative at input")) - - // Hive compatibility: No database. - val m2 = intercept[ParseException] { - parsePlan("SELECT /*+ MAPJOIN(default.t) */ * from default.t") - }.getMessage - assert(m2.contains("mismatched input '.' expecting {')', ','}")) + assert(m.contains("mismatched input")) // Disallow space as the delimiter. val m3 = intercept[ParseException] { parsePlan("SELECT /*+ INDEX(a b c) */ * from default.t") }.getMessage - assert(m3.contains("mismatched input 'b' expecting {')', ','}")) + assert(m3.contains("mismatched input 'b' expecting")) comparePlans( parsePlan("SELECT /*+ HINT */ * FROM t"), @@ -538,26 +532,59 @@ class PlanParserSuite extends PlanTest { comparePlans( parsePlan("SELECT /*+ BROADCASTJOIN(u) */ * FROM t"), - Hint("BROADCASTJOIN", Seq("u"), table("t").select(star()))) + Hint("BROADCASTJOIN", Seq($"u"), table("t").select(star()))) comparePlans( parsePlan("SELECT /*+ MAPJOIN(u) */ * FROM t"), - Hint("MAPJOIN", Seq("u"), table("t").select(star()))) + Hint("MAPJOIN", Seq($"u"), table("t").select(star()))) comparePlans( parsePlan("SELECT /*+ STREAMTABLE(a,b,c) */ * FROM t"), - Hint("STREAMTABLE", Seq("a", "b", "c"), table("t").select(star()))) + Hint("STREAMTABLE", Seq($"a", $"b", $"c"), table("t").select(star()))) comparePlans( parsePlan("SELECT /*+ INDEX(t, emp_job_ix) */ * FROM t"), - Hint("INDEX", Seq("t", "emp_job_ix"), table("t").select(star()))) + Hint("INDEX", Seq($"t", $"emp_job_ix"), table("t").select(star()))) comparePlans( parsePlan("SELECT /*+ MAPJOIN(`default.t`) */ * from `default.t`"), - Hint("MAPJOIN", Seq("default.t"), table("default.t").select(star()))) + Hint("MAPJOIN", Seq(UnresolvedAttribute.quoted("default.t")), + table("default.t").select(star()))) comparePlans( parsePlan("SELECT /*+ MAPJOIN(t) */ a from t where true group by a order by a"), - Hint("MAPJOIN", Seq("t"), table("t").where(Literal(true)).groupBy('a)('a)).orderBy('a.asc)) + Hint("MAPJOIN", Seq($"t"), table("t").where(Literal(true)).groupBy('a)('a)).orderBy('a.asc)) + } + + test("SPARK-20854: select hint syntax with expressions") { + comparePlans( + parsePlan("SELECT /*+ HINT1(a, array(1, 2, 3)) */ * from t"), + Hint("HINT1", Seq($"a", + UnresolvedFunction("array", Literal(1) :: Literal(2) :: Literal(3) :: Nil, false)), + table("t").select(star()) + ) + ) + + comparePlans( + parsePlan("SELECT /*+ HINT1(a, array(1, 2, 3)) */ * from t"), + Hint("HINT1", Seq($"a", + UnresolvedFunction("array", Literal(1) :: Literal(2) :: Literal(3) :: Nil, false)), + table("t").select(star()) + ) + ) + + comparePlans( + parsePlan("SELECT /*+ HINT1(a, 5, 'a', b) */ * from t"), + Hint("HINT1", Seq($"a", Literal(5), Literal("a"), $"b"), + table("t").select(star()) + ) + ) + + comparePlans( + parsePlan("SELECT /*+ HINT1('a', (b, c), (1, 2)) */ * from t"), + Hint("HINT1", Seq(Literal("a"), Literal(5), Literal("a"), $"b"), + table("t").select(star()) + ) + ) } } From 5439468f85c01dc2c5ae36a4c1a0fceb210d9852 Mon Sep 17 00:00:00 2001 From: Bogdan Raducanu Date: Wed, 24 May 2017 13:11:57 +0200 Subject: [PATCH 4/9] multiple hints syntaxes + more tests --- .../spark/sql/catalyst/parser/SqlBase.g4 | 4 +- .../sql/catalyst/parser/AstBuilder.scala | 9 ++- .../sql/catalyst/plans/logical/hints.scala | 2 +- .../sql/catalyst/parser/PlanParserSuite.scala | 44 +++++++++++---- .../scala/org/apache/spark/sql/Dataset.scala | 2 +- .../apache/spark/sql/DataFrameHintSuite.scala | 56 +++++++++++++++++++ 6 files changed, 100 insertions(+), 17 deletions(-) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/DataFrameHintSuite.scala diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index e2f796e1b1774..cffdde8365d03 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -371,7 +371,7 @@ querySpecification (RECORDREADER recordReader=STRING)? fromClause? (WHERE where=booleanExpression)?) - | ((kind=SELECT hint* setQuantifier? namedExpressionSeq fromClause? + | ((kind=SELECT (hints+=hint)* setQuantifier? namedExpressionSeq fromClause? | fromClause (kind=SELECT setQuantifier? namedExpressionSeq)?) lateralView* (WHERE where=booleanExpression)? @@ -381,7 +381,7 @@ querySpecification ; hint - : '/*+' hintStatement '*/' + : '/*+' hintStatements+=hintStatement (',' hintStatements+=hintStatement)* '*/' ; hintStatement diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index de577b2b1aa2f..199384be51fde 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -407,7 +407,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging val withWindow = withDistinct.optionalMap(windows)(withWindows) // Hint - hint.asScala.foldRight(withWindow)(withHints) + hints.asScala.foldRight(withWindow)(withHints) } } @@ -538,8 +538,11 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging private def withHints( ctx: HintContext, query: LogicalPlan): LogicalPlan = withOrigin(ctx) { - val stmt = ctx.hintStatement - UnresolvedHint(stmt.hintName.getText, stmt.parameters.asScala.map(expression), query) + var plan = query + ctx.hintStatements.asScala.foreach { case stmt => + plan = UnresolvedHint(stmt.hintName.getText, stmt.parameters.asScala.map(expression), plan) + } + plan } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala index 9bcbfbb4d1397..fb9a5014788cf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.internal.SQLConf * should be removed This node will be eliminated post analysis. * A pair of (name, parameters). */ -case class UnresolvedHint(name: String, parameters: Seq[String], child: LogicalPlan) +case class UnresolvedHint(name: String, parameters: Seq[Any], child: LogicalPlan) extends UnaryNode { override lazy val resolved: Boolean = false diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala index ccb57cfe73a7d..f341868470484 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala @@ -532,34 +532,35 @@ class PlanParserSuite extends PlanTest { comparePlans( parsePlan("SELECT /*+ BROADCASTJOIN(u) */ * FROM t"), - UnresolvedHint("BROADCASTJOIN", Seq("u"), table("t").select(star()))) + UnresolvedHint("BROADCASTJOIN", Seq($"u"), table("t").select(star()))) comparePlans( parsePlan("SELECT /*+ MAPJOIN(u) */ * FROM t"), - UnresolvedHint("MAPJOIN", Seq("u"), table("t").select(star()))) + UnresolvedHint("MAPJOIN", Seq($"u"), table("t").select(star()))) comparePlans( parsePlan("SELECT /*+ STREAMTABLE(a,b,c) */ * FROM t"), - UnresolvedHint("STREAMTABLE", Seq("a", "b", "c"), table("t").select(star()))) + UnresolvedHint("STREAMTABLE", Seq($"a", $"b", $"c"), table("t").select(star()))) comparePlans( parsePlan("SELECT /*+ INDEX(t, emp_job_ix) */ * FROM t"), - UnresolvedHint("INDEX", Seq("t", "emp_job_ix"), table("t").select(star()))) + UnresolvedHint("INDEX", Seq($"t", $"emp_job_ix"), table("t").select(star()))) comparePlans( parsePlan("SELECT /*+ MAPJOIN(`default.t`) */ * from `default.t`"), - UnresolvedHint("MAPJOIN", Seq("default.t"), table("default.t").select(star()))) + UnresolvedHint("MAPJOIN", Seq(UnresolvedAttribute.quoted("default.t")), + table("default.t").select(star()))) comparePlans( parsePlan("SELECT /*+ MAPJOIN(t) */ a from t where true group by a order by a"), - UnresolvedHint("MAPJOIN", Seq("t"), + UnresolvedHint("MAPJOIN", Seq($"t"), table("t").where(Literal(true)).groupBy('a)('a)).orderBy('a.asc)) } test("SPARK-20854: select hint syntax with expressions") { comparePlans( parsePlan("SELECT /*+ HINT1(a, array(1, 2, 3)) */ * from t"), - Hint("HINT1", Seq($"a", + UnresolvedHint("HINT1", Seq($"a", UnresolvedFunction("array", Literal(1) :: Literal(2) :: Literal(3) :: Nil, false)), table("t").select(star()) ) @@ -567,7 +568,7 @@ class PlanParserSuite extends PlanTest { comparePlans( parsePlan("SELECT /*+ HINT1(a, array(1, 2, 3)) */ * from t"), - Hint("HINT1", Seq($"a", + UnresolvedHint("HINT1", Seq($"a", UnresolvedFunction("array", Literal(1) :: Literal(2) :: Literal(3) :: Nil, false)), table("t").select(star()) ) @@ -575,16 +576,39 @@ class PlanParserSuite extends PlanTest { comparePlans( parsePlan("SELECT /*+ HINT1(a, 5, 'a', b) */ * from t"), - Hint("HINT1", Seq($"a", Literal(5), Literal("a"), $"b"), + UnresolvedHint("HINT1", Seq($"a", Literal(5), Literal("a"), $"b"), table("t").select(star()) ) ) comparePlans( parsePlan("SELECT /*+ HINT1('a', (b, c), (1, 2)) */ * from t"), - Hint("HINT1", Seq(Literal("a"), Literal(5), Literal("a"), $"b"), + UnresolvedHint("HINT1", + Seq(Literal("a"), + CreateStruct($"b" :: $"c" :: Nil), + CreateStruct(Literal(1) :: Literal(2) :: Nil)), table("t").select(star()) ) ) } + + test("SPARK-20854: multiple hints") { + comparePlans( + parsePlan("SELECT /*+ HINT1(a, 1), hint2(b, 2) */ * from t"), + UnresolvedHint("hint2", Seq($"b", Literal(2)), + UnresolvedHint("HINT1", Seq($"a", Literal(1)), + table("t").select(star()) + ) + ) + ) + + comparePlans( + parsePlan("SELECT /*+ HINT1(a, 1) */ /*+ hint2(b, 2) */ * from t"), + UnresolvedHint("HINT1", Seq($"a", Literal(1)), + UnresolvedHint("hint2", Seq($"b", Literal(2)), + table("t").select(star()) + ) + ) + ) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index cbab029b87b2a..04c720af30c23 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -1173,7 +1173,7 @@ class Dataset[T] private[sql]( * @since 2.2.0 */ @scala.annotation.varargs - def hint(name: String, parameters: String*): Dataset[T] = withTypedPlan { + def hint(name: String, parameters: Any*): Dataset[T] = withTypedPlan { UnresolvedHint(name, parameters, logicalPlan) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameHintSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameHintSuite.scala new file mode 100644 index 0000000000000..142d5464e68f3 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameHintSuite.scala @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.sql.catalyst.expressions.Literal +import org.apache.spark.sql.catalyst.plans.PlanTest +import org.apache.spark.sql.catalyst.plans.logical.UnresolvedHint +import org.apache.spark.sql.test.SharedSQLContext + +class DataFrameHintSuite extends PlanTest with SharedSQLContext { + import testImplicits._ + lazy val df = spark.range(10) + + test("various hint parameters") { + comparePlans( + df.hint("hint1").queryExecution.logical, + UnresolvedHint("hint1", Seq(), + df.logicalPlan + ) + ) + + comparePlans( + df.hint("hint1", 1, "a").queryExecution.logical, + UnresolvedHint("hint1", Seq(1, "a"), df.logicalPlan) + ) + + comparePlans( + df.hint("hint1", 1, $"a").queryExecution.logical, + UnresolvedHint("hint1", Seq(1, $"a"), + df.logicalPlan + ) + ) + + comparePlans( + df.hint("hint1", Seq(1, 2, 3), Seq($"a", $"b", $"c")).queryExecution.logical, + UnresolvedHint("hint1", Seq(Seq(1, 2, 3), Seq($"a", $"b", $"c")), + df.logicalPlan + ) + ) + } +} From 6e4030160fa12934dccacea59b06033fdc8ced80 Mon Sep 17 00:00:00 2001 From: Bogdan Raducanu Date: Fri, 26 May 2017 13:55:18 +0200 Subject: [PATCH 5/9] fixed merged + space instead of comma for multiple hints syntax --- .../main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 | 2 +- .../org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala | 2 +- sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index cffdde8365d03..2fcf2800b81f1 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -381,7 +381,7 @@ querySpecification ; hint - : '/*+' hintStatements+=hintStatement (',' hintStatements+=hintStatement)* '*/' + : '/*+' hintStatements+=hintStatement (hintStatements+=hintStatement)* '*/' ; hintStatement diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala index f341868470484..c7087b3ae8bd4 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala @@ -594,7 +594,7 @@ class PlanParserSuite extends PlanTest { test("SPARK-20854: multiple hints") { comparePlans( - parsePlan("SELECT /*+ HINT1(a, 1), hint2(b, 2) */ * from t"), + parsePlan("SELECT /*+ HINT1(a, 1) hint2(b, 2) */ * from t"), UnresolvedHint("hint2", Seq($"b", Literal(2)), UnresolvedHint("HINT1", Seq($"a", Literal(1)), table("t").select(star()) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index f9bd8f3d278ad..d2d2b6cd5dfa3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -1176,7 +1176,7 @@ class Dataset[T] private[sql]( * @since 2.2.0 */ @scala.annotation.varargs - def hint(name: String, parameters: String*): Dataset[T] = withTypedPlan { + def hint(name: String, parameters: Any*): Dataset[T] = withTypedPlan { UnresolvedHint(name, parameters, planWithBarrier) } From 14a6150ee8f64cff2196d8141823eb787aa34c89 Mon Sep 17 00:00:00 2001 From: Bogdan Raducanu Date: Tue, 30 May 2017 13:59:44 +0200 Subject: [PATCH 6/9] dsl test and hint(), minor fixes, parser: made comma separating hints optional --- .../spark/sql/catalyst/parser/SqlBase.g4 | 2 +- .../sql/catalyst/analysis/ResolveHints.scala | 2 +- .../spark/sql/catalyst/dsl/package.scala | 3 +++ .../sql/catalyst/parser/AstBuilder.scala | 2 +- .../sql/catalyst/plans/logical/hints.scala | 4 ++- .../sql/catalyst/parser/PlanParserSuite.scala | 22 ++++++++++++++- ...ataFrameSuite.scala => DSLHintSuite.scala} | 0 .../apache/spark/sql/DataFrameHintSuite.scala | 27 ++++++++++++------- 8 files changed, 47 insertions(+), 15 deletions(-) rename sql/core/src/test/scala/org/apache/spark/sql/{DataFrameSuite.scala => DSLHintSuite.scala} (100%) diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index 2fcf2800b81f1..dcf074cc64d6e 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -386,7 +386,7 @@ hint hintStatement : hintName=identifier - | hintName=identifier '(' parameters+=primaryExpression (',' parameters+=primaryExpression)* ')' + | hintName=identifier '(' parameters+=primaryExpression (','? parameters+=primaryExpression)* ')' ; fromClause diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala index 874ac529083ad..62a3482d9fac1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala @@ -96,7 +96,7 @@ object ResolveHints { case tableName: String => tableName case tableId: UnresolvedAttribute => tableId.name case unsupported => throw new AnalysisException("Broadcast hint parameter should be " + - s" identifier or string but was $unsupported (${unsupported.getClass}") + s"an identifier or string but was $unsupported (${unsupported.getClass}") }.toSet) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala index ed423e7e334b6..beee93d906f0f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala @@ -381,6 +381,9 @@ package object dsl { def analyze: LogicalPlan = EliminateSubqueryAliases(analysis.SimpleAnalyzer.execute(logicalPlan)) + + def hint(name: String, parameters: Any*): LogicalPlan = + UnresolvedHint(name, parameters, logicalPlan) } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 199384be51fde..8c827665e832b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -533,7 +533,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging } /** - * Add a [[UnresolvedHint]]s to a logical plan. + * Add [[UnresolvedHint]]s to a logical plan. */ private def withHints( ctx: HintContext, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala index e2b7a5c94ad70..c64920566ff99 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala @@ -23,7 +23,9 @@ import org.apache.spark.sql.internal.SQLConf /** * A general hint for the child that is not yet resolved. This node is generated by the parser and * should be removed This node will be eliminated post analysis. - * A pair of (name, parameters). + * @param name the name of the hint + * @param parameters the parameters of the hint + * @param child the [[LogicalPlan]] on which this hint applies */ case class UnresolvedHint(name: String, parameters: Seq[Any], child: LogicalPlan) extends UnaryNode { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala index c7087b3ae8bd4..2e389117fb591 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala @@ -597,7 +597,16 @@ class PlanParserSuite extends PlanTest { parsePlan("SELECT /*+ HINT1(a, 1) hint2(b, 2) */ * from t"), UnresolvedHint("hint2", Seq($"b", Literal(2)), UnresolvedHint("HINT1", Seq($"a", Literal(1)), - table("t").select(star()) + table("t").select(star()) + ) + ) + ) + + comparePlans( + parsePlan("SELECT /*+ HINT1(a, 1),hint2(b, 2) */ * from t"), + UnresolvedHint("hint2", Seq($"b", Literal(2)), + UnresolvedHint("HINT1", Seq($"a", Literal(1)), + table("t").select(star()) ) ) ) @@ -610,5 +619,16 @@ class PlanParserSuite extends PlanTest { ) ) ) + + comparePlans( + parsePlan("SELECT /*+ HINT1(a, 1), hint2(b, 2) */ /*+ hint3(c, 3) */ * from t"), + UnresolvedHint("HINT1", Seq($"a", Literal(1)), + UnresolvedHint("hint2", Seq($"b", Literal(2)), + UnresolvedHint("hint3", Seq($"c", Literal(3)), + table("t").select(star()) + ) + ) + ) + ) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DSLHintSuite.scala similarity index 100% rename from sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala rename to sql/core/src/test/scala/org/apache/spark/sql/DSLHintSuite.scala diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameHintSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameHintSuite.scala index 142d5464e68f3..be6781860bd74 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameHintSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameHintSuite.scala @@ -17,37 +17,44 @@ package org.apache.spark.sql -import org.apache.spark.sql.catalyst.expressions.Literal +import org.apache.spark.sql.catalyst.analysis.EliminateBarriers import org.apache.spark.sql.catalyst.plans.PlanTest -import org.apache.spark.sql.catalyst.plans.logical.UnresolvedHint +import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.test.SharedSQLContext class DataFrameHintSuite extends PlanTest with SharedSQLContext { import testImplicits._ lazy val df = spark.range(10) - test("various hint parameters") { + private def check(df: Dataset[_], expected: LogicalPlan) = { comparePlans( - df.hint("hint1").queryExecution.logical, + EliminateBarriers(df.queryExecution.logical), + expected + ) + } + + test("various hint parameters") { + check( + df.hint("hint1"), UnresolvedHint("hint1", Seq(), df.logicalPlan ) ) - comparePlans( - df.hint("hint1", 1, "a").queryExecution.logical, + check( + df.hint("hint1", 1, "a"), UnresolvedHint("hint1", Seq(1, "a"), df.logicalPlan) ) - comparePlans( - df.hint("hint1", 1, $"a").queryExecution.logical, + check( + df.hint("hint1", 1, $"a"), UnresolvedHint("hint1", Seq(1, $"a"), df.logicalPlan ) ) - comparePlans( - df.hint("hint1", Seq(1, 2, 3), Seq($"a", $"b", $"c")).queryExecution.logical, + check( + df.hint("hint1", Seq(1, 2, 3), Seq($"a", $"b", $"c")), UnresolvedHint("hint1", Seq(Seq(1, 2, 3), Seq($"a", $"b", $"c")), df.logicalPlan ) From 394d6442aeea80f2ad60cd959decc22ec56d02e4 Mon Sep 17 00:00:00 2001 From: Bogdan Raducanu Date: Tue, 30 May 2017 13:59:55 +0200 Subject: [PATCH 7/9] DSLHintSuite --- .../sql/catalyst/analysis/DSLHintSuite.scala | 59 +++++++++++++++++++ 1 file changed, 59 insertions(+) create mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DSLHintSuite.scala diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DSLHintSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DSLHintSuite.scala new file mode 100644 index 0000000000000..118db5e3d4465 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DSLHintSuite.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.sql.catalyst.analysis.AnalysisTest +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.logical._ + +class DSLHintSuite extends AnalysisTest { + lazy val a = 'a.int + lazy val b = 'b.string + lazy val c = 'c.string + lazy val r1 = LocalRelation(a, b, c) + + test("various hint parameters") { + comparePlans( + r1.hint("hint1"), + UnresolvedHint("hint1", Seq(), + r1 + ) + ) + + comparePlans( + r1.hint("hint1", 1, "a"), + UnresolvedHint("hint1", Seq(1, "a"), r1) + ) + + comparePlans( + r1.hint("hint1", 1, $"a"), + UnresolvedHint("hint1", Seq(1, $"a"), + r1 + ) + ) + + comparePlans( + r1.hint("hint1", Seq(1, 2, 3), Seq($"a", $"b", $"c")), + UnresolvedHint("hint1", Seq(Seq(1, 2, 3), Seq($"a", $"b", $"c")), + r1 + ) + ) + } +} From 8daa05e1ec595c54f8aac7c041d5c6571a759aaf Mon Sep 17 00:00:00 2001 From: Bogdan Raducanu Date: Tue, 30 May 2017 14:19:51 +0200 Subject: [PATCH 8/9] comma between hints optional, apply hints in order --- .../org/apache/spark/sql/catalyst/parser/SqlBase.g4 | 4 ++-- .../apache/spark/sql/catalyst/parser/AstBuilder.scala | 2 +- .../spark/sql/catalyst/parser/PlanParserSuite.scala | 10 +++++----- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index b9718f8c2bd3e..2ae57986977c6 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -381,12 +381,12 @@ querySpecification ; hint - : '/*+' hintStatements+=hintStatement (hintStatements+=hintStatement)* '*/' + : '/*+' hintStatements+=hintStatement (','? hintStatements+=hintStatement)* '*/' ; hintStatement : hintName=identifier - | hintName=identifier '(' parameters+=primaryExpression (','? parameters+=primaryExpression)* ')' + | hintName=identifier '(' parameters+=primaryExpression (',' parameters+=primaryExpression)* ')' ; fromClause diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 7f69532c496fc..72501f9a988ee 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -539,7 +539,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging ctx: HintContext, query: LogicalPlan): LogicalPlan = withOrigin(ctx) { var plan = query - ctx.hintStatements.asScala.foreach { case stmt => + ctx.hintStatements.asScala.reverse.foreach { case stmt => plan = UnresolvedHint(stmt.hintName.getText, stmt.parameters.asScala.map(expression), plan) } plan diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala index ec5dc2d48f03f..6fb149519fc4f 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.catalyst.parser import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} -import org.apache.spark.sql.catalyst.analysis.{UnresolvedFunction, UnresolvedGenerator, UnresolvedInlineTable, UnresolvedRelation, UnresolvedTableValuedFunction} +import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedGenerator, UnresolvedInlineTable, UnresolvedRelation, UnresolvedTableValuedFunction} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ @@ -602,8 +602,8 @@ class PlanParserSuite extends PlanTest { test("SPARK-20854: multiple hints") { comparePlans( parsePlan("SELECT /*+ HINT1(a, 1) hint2(b, 2) */ * from t"), - UnresolvedHint("hint2", Seq($"b", Literal(2)), - UnresolvedHint("HINT1", Seq($"a", Literal(1)), + UnresolvedHint("HINT1", Seq($"a", Literal(1)), + UnresolvedHint("hint2", Seq($"b", Literal(2)), table("t").select(star()) ) ) @@ -611,8 +611,8 @@ class PlanParserSuite extends PlanTest { comparePlans( parsePlan("SELECT /*+ HINT1(a, 1),hint2(b, 2) */ * from t"), - UnresolvedHint("hint2", Seq($"b", Literal(2)), - UnresolvedHint("HINT1", Seq($"a", Literal(1)), + UnresolvedHint("HINT1", Seq($"a", Literal(1)), + UnresolvedHint("hint2", Seq($"b", Literal(2)), table("t").select(star()) ) ) From 09635a9b1350596961f2bb3f824f9730c3c348d2 Mon Sep 17 00:00:00 2001 From: Bogdan Raducanu Date: Tue, 30 May 2017 18:21:25 +0200 Subject: [PATCH 9/9] reverted mistake rename of DataFrameSuite --- .../apache/spark/sql/{DSLHintSuite.scala => DataFrameSuite.scala} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename sql/core/src/test/scala/org/apache/spark/sql/{DSLHintSuite.scala => DataFrameSuite.scala} (100%) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DSLHintSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala similarity index 100% rename from sql/core/src/test/scala/org/apache/spark/sql/DSLHintSuite.scala rename to sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala