From 38f9e2d2de410f3bdefd54e0c9122d6e967eb288 Mon Sep 17 00:00:00 2001 From: John Zhuge Date: Fri, 27 Jul 2018 18:46:42 -0700 Subject: [PATCH 1/7] [SPARK-24940][SQL] Coalesce Hint for SQL Queries --- .../spark/sql/catalyst/parser/SqlBase.g4 | 4 +- .../sql/catalyst/analysis/Analyzer.scala | 1 + .../sql/catalyst/analysis/ResolveHints.scala | 35 ++++++++ .../sql/catalyst/parser/AstBuilder.scala | 4 + .../catalyst/analysis/ResolveHintsSuite.scala | 36 +++++++++ .../sql/catalyst/parser/PlanParserSuite.scala | 56 +++++++++++++ .../sql/execution/SparkSqlParserSuite.scala | 80 ++++++++++++++++++- 7 files changed, 212 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index 94283f59011a8..afd41dd93e30d 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -338,7 +338,7 @@ resource ; queryNoWith - : insertInto? queryTerm queryOrganization #singleInsertQuery + : (insertInto (hints+=hint)*)? queryTerm queryOrganization #singleInsertQuery | fromClause multiInsertQueryBody+ #multiInsertQuery ; @@ -352,7 +352,7 @@ queryOrganization ; multiInsertQueryBody - : insertInto? + : (insertInto (hints+=hint)*)? querySpecification queryOrganization ; diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 7f235ac560299..373d8cb19ceb9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -145,6 +145,7 @@ class Analyzer( lazy val batches: Seq[Batch] = Seq( Batch("Hints", fixedPoint, new ResolveHints.ResolveBroadcastHints(conf), + new ResolveHints.ResolveCoalesceHints(conf), ResolveHints.RemoveAllHints), Batch("Simple Sanity Check", Once, LookupFunctions), diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala index bfe5169c25900..9e288e75ed39d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala @@ -20,10 +20,12 @@ package org.apache.spark.sql.catalyst.analysis import java.util.Locale import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.expressions.Literal import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.CurrentOrigin import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.{BooleanType, IntegerType} /** @@ -102,6 +104,39 @@ object ResolveHints { } } + /** + * For coalesce hint, we accept "COALESCE" and "REPARTITION". + * Its parameters include a partition number and an optional boolean to indicate + * whether shuffle is allowed. + */ + class ResolveCoalesceHints(conf: SQLConf) extends Rule[LogicalPlan] { + private val COALESCE_HINT_NAMES = Set("COALESCE", "REPARTITION") + + private def applyCoalesceHint( + plan: LogicalPlan, + numPartitions: Int, + shuffle: Boolean): LogicalPlan = { + Repartition(numPartitions, shuffle, plan) + } + + def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators { + case h: UnresolvedHint if COALESCE_HINT_NAMES.contains(h.name.toUpperCase(Locale.ROOT)) => + h.parameters match { + case Seq(Literal(i, IntegerType)) => + val defaultShuffle = h.name.toUpperCase(Locale.ROOT) match { + case "REPARTITION" => true + case _ => false + } + applyCoalesceHint(h.child, i.asInstanceOf[Int], defaultShuffle) + case Seq(Literal(i, IntegerType), Literal(b, BooleanType)) => + applyCoalesceHint(h.child, i.asInstanceOf[Int], b.asInstanceOf[Boolean]) + case _ => + throw new AnalysisException("Coalesce hint expects a partition number" + + " and an optional boolean to indicate whether shuffle is allowed") + } + } + } + /** * Removes all the hints, used to remove invalid hints provided by the user. * This must be executed after all the other hint rules are executed. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 9906a30b488b8..60532480a2544 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -155,6 +155,8 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging withQuerySpecification(body.querySpecification, from). // Add organization statements. optionalMap(body.queryOrganization)(withQueryResultClauses). + // Add hint. + optionalMap(body.hint)(withHints). // Add insert. optionalMap(body.insertInto())(withInsertInto) } @@ -174,6 +176,8 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging plan(ctx.queryTerm). // Add organization statements. optionalMap(ctx.queryOrganization)(withQueryResultClauses). + // Add hint. + optionalMap(ctx.hint)(withHints). // Add insert. optionalMap(ctx.insertInto())(withInsertInto) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveHintsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveHintsSuite.scala index 9782b5fb0d266..ed1b2dd25ccb4 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveHintsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveHintsSuite.scala @@ -17,8 +17,10 @@ package org.apache.spark.sql.catalyst.analysis +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions.Literal import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.plans.Inner import org.apache.spark.sql.catalyst.plans.logical._ @@ -26,6 +28,14 @@ import org.apache.spark.sql.catalyst.plans.logical._ class ResolveHintsSuite extends AnalysisTest { import org.apache.spark.sql.catalyst.analysis.TestRelations._ + private def intercept(plan: LogicalPlan, messages: String*): Unit = { + val analyzer = getAnalyzer(false) + val e = intercept[AnalysisException](analyzer.executeAndCheck(plan)) + messages.foreach { message => + assert(e.message.contains(message)) + } + } + test("invalid hints should be ignored") { checkAnalysis( UnresolvedHint("some_random_hint_that_does_not_exist", Seq("TaBlE"), table("TaBlE")), @@ -120,4 +130,30 @@ class ResolveHintsSuite extends AnalysisTest { testRelation.where('a > 1).select('a).select('a).analyze, caseSensitive = false) } + + test("coalesce hint") { + checkAnalysis( + UnresolvedHint("COALESCE", Seq(Literal(10)), table("TaBlE")), + Repartition(10, false, testRelation), // Default shuffle is false for COALESCE + caseSensitive = true) + checkAnalysis( + UnresolvedHint("COALESCE", Seq(Literal(20), Literal(true)), table("TaBlE")), + Repartition(20, true, testRelation), + caseSensitive = true) + checkAnalysis( + UnresolvedHint("REPARTITION", Seq(Literal(100)), table("TaBlE")), + Repartition(100, true, testRelation), // Default shuffle is true for REPARTITION + caseSensitive = true) + checkAnalysis( + UnresolvedHint("REPARTITION", Seq(Literal(200), Literal(false)), table("TaBlE")), + Repartition(200, false, testRelation), + caseSensitive = true) + + val errMsg = "Coalesce hint expects a partition number and an optional boolean to indicate" + + " whether shuffle is allowed" + intercept(UnresolvedHint("COALESCE", Seq.empty, table("TaBlE")), errMsg) + intercept(UnresolvedHint("COALESCE", Seq(UnresolvedAttribute("a")), table("TaBlE")), errMsg) + intercept(UnresolvedHint("COALESCE", Seq(Literal(true)), table("TaBlE")), errMsg) + intercept(UnresolvedHint("COALESCE", Seq(Literal(10), Literal(33)), table("TaBlE")), errMsg) + } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala index 924700483dbe4..27ca19c3cbcb4 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala @@ -662,6 +662,62 @@ class PlanParserSuite extends AnalysisTest { ) } + test ("insert hint syntax") { + assertEqual( + "INSERT INTO s /*+ COALESCE(10) */ SELECT * FROM t", + InsertIntoTable(table("s"), Map.empty, + UnresolvedHint("COALESCE", Seq(Literal(10)), + table("t").select(star())), false, false)) + assertEqual( + "INSERT INTO TABLE s /*+ COALESCE(50, true) */ SELECT * FROM t", + InsertIntoTable(table("s"), Map.empty, + UnresolvedHint("COALESCE", Seq(Literal(50), Literal(true)), + table("t").select(star())), false, false)) + assertEqual( + "INSERT INTO s /*+ REPARTITION(100) */ SELECT * FROM t", + InsertIntoTable(table("s"), Map.empty, + UnresolvedHint("REPARTITION", Seq(Literal(100)), + table("t").select(star())), false, false)) + assertEqual( + "INSERT INTO TABLE s /*+ REPARTITION(20, false) */ SELECT * FROM t", + InsertIntoTable(table("s"), Map.empty, + UnresolvedHint("REPARTITION", Seq(Literal(20), Literal(false)), + table("t").select(star())), false, false)) + assertEqual( + "INSERT OVERWRITE TABLE s /*+ COALESCE(10) */ SELECT * FROM t", + InsertIntoTable(table("s"), Map.empty, + UnresolvedHint("COALESCE", Seq(Literal(10)), + table("t").select(star())), true, false)) + assertEqual( + "INSERT OVERWRITE TABLE s /*+ COALESCE(50, true) */ SELECT * FROM t", + InsertIntoTable(table("s"), Map.empty, + UnresolvedHint("COALESCE", Seq(Literal(50), Literal(true)), + table("t").select(star())), true, false)) + assertEqual( + "INSERT OVERWRITE TABLE s /*+ REPARTITION(100) */ SELECT * FROM t", + InsertIntoTable(table("s"), Map.empty, + UnresolvedHint("REPARTITION", Seq(Literal(100)), + table("t").select(star())), true, false)) + assertEqual( + "INSERT OVERWRITE TABLE s /*+ REPARTITION(20, false) */ SELECT * FROM t", + InsertIntoTable(table("s"), Map.empty, + UnresolvedHint("REPARTITION", Seq(Literal(20), Literal(false)), + table("t").select(star())), true, false)) + + // Multiple hints + assertEqual( + "INSERT INTO s /*+ REPARTITION(100), COALESCE(50, true), COALESCE(10) */ SELECT * FROM t", + InsertIntoTable(table("s"), Map.empty, + UnresolvedHint("REPARTITION", Seq(Literal(100)), + UnresolvedHint("COALESCE", Seq(Literal(50), Literal(true)), + UnresolvedHint("COALESCE", Seq(Literal(10)), + table("t").select(star())))), false, false)) + + // Wrong hint location + intercept("INSERT INTO /*+ COALESCE(10) */ s SELECT * FROM t", + "extraneous input '/*+' expecting") + } + test("TRIM function") { intercept("select ltrim(both 'S' from 'SS abc S'", "missing ')' at ''") intercept("select rtrim(trailing 'S' from 'SS abc S'", "missing ')' at ''") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala index 107a2f7109793..d98be477b5845 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala @@ -17,13 +17,16 @@ package org.apache.spark.sql.execution +import java.net.URI + import org.apache.spark.sql.SaveMode import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, UnresolvedAlias, UnresolvedAttribute, UnresolvedRelation, UnresolvedStar} import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType} -import org.apache.spark.sql.catalyst.expressions.{Ascending, Concat, SortOrder} +import org.apache.spark.sql.catalyst.dsl.expressions.star +import org.apache.spark.sql.catalyst.expressions.{Ascending, Concat, Literal, SortOrder} import org.apache.spark.sql.catalyst.parser.ParseException -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, RepartitionByExpression, Sort} +import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.datasources.{CreateTable, RefreshResource} import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} @@ -36,6 +39,7 @@ import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType * defined in the Catalyst module. */ class SparkSqlParserSuite extends AnalysisTest { + import org.apache.spark.sql.catalyst.dsl.plans._ val newConf = new SQLConf private lazy val parser = new SparkSqlParser(newConf) @@ -366,4 +370,76 @@ class SparkSqlParserSuite extends AnalysisTest { "SELECT a || b || c FROM t", Project(UnresolvedAlias(concat) :: Nil, UnresolvedRelation(TableIdentifier("t")))) } + + test ("insert overwrite directory hint syntax") { + val dummyStorage = CatalogStorageFormat( + locationUri = Option(URI.create("/d")), + inputFormat = None, + outputFormat = None, + serde = None, + compressed = false, + properties = Map.empty) + assertEqual( + "INSERT OVERWRITE DIRECTORY '/d' USING orc /*+ COALESCE(10) */ SELECT * FROM t", + InsertIntoDir(false, dummyStorage, Some("orc"), + UnresolvedHint("COALESCE", Seq(Literal(10)), + table("t").select(star())), true)) + assertEqual( + "INSERT OVERWRITE DIRECTORY '/d' USING orc /*+ COALESCE(50, true) */ SELECT * FROM t", + InsertIntoDir(false, dummyStorage, Some("orc"), + UnresolvedHint("COALESCE", Seq(Literal(50), Literal(true)), + table("t").select(star())), true)) + assertEqual( + "INSERT OVERWRITE DIRECTORY '/d' USING orc /*+ REPARTITION(100) */ SELECT * FROM t", + InsertIntoDir(false, dummyStorage, Some("orc"), + UnresolvedHint("REPARTITION", Seq(Literal(100)), + table("t").select(star())), true)) + assertEqual( + "INSERT OVERWRITE DIRECTORY '/d' USING orc /*+ REPARTITION(20, false) */ SELECT * FROM t", + InsertIntoDir(false, dummyStorage, Some("orc"), + UnresolvedHint("REPARTITION", Seq(Literal(20), Literal(false)), + table("t").select(star())), true)) + + val dummyHiveStorage = CatalogStorageFormat( + locationUri = Option(URI.create("/d")), + inputFormat = Some("org.apache.hadoop.mapred.TextInputFormat"), + outputFormat = Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat"), + serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"), + compressed = false, + properties = Map.empty) + assertEqual( + "INSERT OVERWRITE DIRECTORY '/d' /*+ COALESCE(10) */ SELECT * FROM t", + InsertIntoDir(false, dummyHiveStorage, Some("hive"), + UnresolvedHint("COALESCE", Seq(Literal(10)), + table("t").select(star())), true)) + assertEqual( + "INSERT OVERWRITE DIRECTORY '/d' /*+ COALESCE(50, true) */ SELECT * FROM t", + InsertIntoDir(false, dummyHiveStorage, Some("hive"), + UnresolvedHint("COALESCE", Seq(Literal(50), Literal(true)), + table("t").select(star())), true)) + assertEqual( + "INSERT OVERWRITE DIRECTORY '/d' /*+ REPARTITION(100) */ SELECT * FROM t", + InsertIntoDir(false, dummyHiveStorage, Some("hive"), + UnresolvedHint("REPARTITION", Seq(Literal(100)), + table("t").select(star())), true)) + assertEqual( + "INSERT OVERWRITE DIRECTORY '/d' /*+ REPARTITION(20, false) */ SELECT * FROM t", + InsertIntoDir(false, dummyHiveStorage, Some("hive"), + UnresolvedHint("REPARTITION", Seq(Literal(20), Literal(false)), + table("t").select(star())), true)) + + // Multiple hints + assertEqual( + "INSERT OVERWRITE DIRECTORY '/d' /*+ REPARTITION(100), COALESCE(50, true), COALESCE(10)" + + " */ SELECT * FROM t", + InsertIntoDir(false, dummyHiveStorage, Some("hive"), + UnresolvedHint("REPARTITION", Seq(Literal(100)), + UnresolvedHint("COALESCE", Seq(Literal(50), Literal(true)), + UnresolvedHint("COALESCE", Seq(Literal(10)), + table("t").select(star())))), true)) + + // Wrong hint location + intercept("INSERT OVERWRITE DIRECTORY /*+ COALESCE(10) */ '/d' SELECT * FROM t", + "no viable alternative at input") + } } From 93ad14e3da380dc61c78d958625aa61a4dd704d6 Mon Sep 17 00:00:00 2001 From: John Zhuge Date: Wed, 1 Aug 2018 20:55:25 -0700 Subject: [PATCH 2/7] [SPARK-24940][SQL] Incorporate review comments --- .../spark/sql/catalyst/parser/SqlBase.g4 | 4 +- .../sql/catalyst/analysis/ResolveHints.scala | 20 ++--- .../sql/catalyst/parser/AstBuilder.scala | 4 - .../catalyst/analysis/ResolveHintsSuite.scala | 23 ++---- .../sql/catalyst/parser/PlanParserSuite.scala | 72 ++++------------- .../sql/execution/SparkSqlParserSuite.scala | 80 +------------------ 6 files changed, 34 insertions(+), 169 deletions(-) diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index afd41dd93e30d..94283f59011a8 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -338,7 +338,7 @@ resource ; queryNoWith - : (insertInto (hints+=hint)*)? queryTerm queryOrganization #singleInsertQuery + : insertInto? queryTerm queryOrganization #singleInsertQuery | fromClause multiInsertQueryBody+ #multiInsertQuery ; @@ -352,7 +352,7 @@ queryOrganization ; multiInsertQueryBody - : (insertInto (hints+=hint)*)? + : insertInto? querySpecification queryOrganization ; diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala index 9e288e75ed39d..0da9523ee00fc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.CurrentOrigin import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.{BooleanType, IntegerType} +import org.apache.spark.sql.types.IntegerType /** @@ -105,9 +105,8 @@ object ResolveHints { } /** - * For coalesce hint, we accept "COALESCE" and "REPARTITION". - * Its parameters include a partition number and an optional boolean to indicate - * whether shuffle is allowed. + * COALESCE Hint accepts name "COALESCE" and "REPARTITION". + * Its parameter includes a partition number. */ class ResolveCoalesceHints(conf: SQLConf) extends Rule[LogicalPlan] { private val COALESCE_HINT_NAMES = Set("COALESCE", "REPARTITION") @@ -122,17 +121,14 @@ object ResolveHints { def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators { case h: UnresolvedHint if COALESCE_HINT_NAMES.contains(h.name.toUpperCase(Locale.ROOT)) => h.parameters match { - case Seq(Literal(i, IntegerType)) => - val defaultShuffle = h.name.toUpperCase(Locale.ROOT) match { + case Seq(Literal(numPartitions: Int, IntegerType)) => + val shuffle = h.name.toUpperCase(Locale.ROOT) match { case "REPARTITION" => true - case _ => false + case "COALESCE" => false } - applyCoalesceHint(h.child, i.asInstanceOf[Int], defaultShuffle) - case Seq(Literal(i, IntegerType), Literal(b, BooleanType)) => - applyCoalesceHint(h.child, i.asInstanceOf[Int], b.asInstanceOf[Boolean]) + applyCoalesceHint(h.child, numPartitions, shuffle) case _ => - throw new AnalysisException("Coalesce hint expects a partition number" + - " and an optional boolean to indicate whether shuffle is allowed") + throw new AnalysisException("COALESCE Hint expects a partition number as parameter") } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 60532480a2544..9906a30b488b8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -155,8 +155,6 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging withQuerySpecification(body.querySpecification, from). // Add organization statements. optionalMap(body.queryOrganization)(withQueryResultClauses). - // Add hint. - optionalMap(body.hint)(withHints). // Add insert. optionalMap(body.insertInto())(withInsertInto) } @@ -176,8 +174,6 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging plan(ctx.queryTerm). // Add organization statements. optionalMap(ctx.queryOrganization)(withQueryResultClauses). - // Add hint. - optionalMap(ctx.hint)(withHints). // Add insert. optionalMap(ctx.insertInto())(withInsertInto) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveHintsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveHintsSuite.scala index ed1b2dd25ccb4..1949d926ff536 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveHintsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveHintsSuite.scala @@ -134,26 +134,15 @@ class ResolveHintsSuite extends AnalysisTest { test("coalesce hint") { checkAnalysis( UnresolvedHint("COALESCE", Seq(Literal(10)), table("TaBlE")), - Repartition(10, false, testRelation), // Default shuffle is false for COALESCE - caseSensitive = true) - checkAnalysis( - UnresolvedHint("COALESCE", Seq(Literal(20), Literal(true)), table("TaBlE")), - Repartition(20, true, testRelation), - caseSensitive = true) + Repartition(numPartitions = 10, shuffle = false, child = testRelation)) checkAnalysis( UnresolvedHint("REPARTITION", Seq(Literal(100)), table("TaBlE")), - Repartition(100, true, testRelation), // Default shuffle is true for REPARTITION - caseSensitive = true) - checkAnalysis( - UnresolvedHint("REPARTITION", Seq(Literal(200), Literal(false)), table("TaBlE")), - Repartition(200, false, testRelation), - caseSensitive = true) + Repartition(numPartitions = 100, shuffle = true, child = testRelation)) - val errMsg = "Coalesce hint expects a partition number and an optional boolean to indicate" + - " whether shuffle is allowed" + val errMsg = "COALESCE Hint expects a partition number as parameter" intercept(UnresolvedHint("COALESCE", Seq.empty, table("TaBlE")), errMsg) - intercept(UnresolvedHint("COALESCE", Seq(UnresolvedAttribute("a")), table("TaBlE")), errMsg) - intercept(UnresolvedHint("COALESCE", Seq(Literal(true)), table("TaBlE")), errMsg) - intercept(UnresolvedHint("COALESCE", Seq(Literal(10), Literal(33)), table("TaBlE")), errMsg) + intercept(UnresolvedHint("COALESCE", Seq(Literal(10), Literal(false)), table("TaBlE")), errMsg) + intercept(UnresolvedHint("REPARTITION", Seq(UnresolvedAttribute("a")), table("TaBlE")), errMsg) + intercept(UnresolvedHint("REPARTITION", Seq(Literal(true)), table("TaBlE")), errMsg) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala index 27ca19c3cbcb4..a3926c9e31f4c 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala @@ -593,6 +593,22 @@ class PlanParserSuite extends AnalysisTest { parsePlan("SELECT /*+ MAPJOIN(t) */ a from t where true group by a order by a"), UnresolvedHint("MAPJOIN", Seq($"t"), table("t").where(Literal(true)).groupBy('a)('a)).orderBy('a.asc)) + + comparePlans( + parsePlan("SELECT /*+ COALESCE(10) */ * FROM t"), + UnresolvedHint("COALESCE", Seq(Literal(10)), + table("t").select(star()))) + + comparePlans( + parsePlan("SELECT /*+ REPARTITION(100) */ * FROM t"), + UnresolvedHint("REPARTITION", Seq(Literal(100)), + table("t").select(star()))) + + comparePlans( + parsePlan("INSERT INTO s SELECT /*+ COALESCE(10) */ * FROM t"), + InsertIntoTable(table("s"), Map.empty, + UnresolvedHint("COALESCE", Seq(Literal(10)), + table("t").select(star())), overwrite = false, ifPartitionNotExists = false)) } test("SPARK-20854: select hint syntax with expressions") { @@ -662,62 +678,6 @@ class PlanParserSuite extends AnalysisTest { ) } - test ("insert hint syntax") { - assertEqual( - "INSERT INTO s /*+ COALESCE(10) */ SELECT * FROM t", - InsertIntoTable(table("s"), Map.empty, - UnresolvedHint("COALESCE", Seq(Literal(10)), - table("t").select(star())), false, false)) - assertEqual( - "INSERT INTO TABLE s /*+ COALESCE(50, true) */ SELECT * FROM t", - InsertIntoTable(table("s"), Map.empty, - UnresolvedHint("COALESCE", Seq(Literal(50), Literal(true)), - table("t").select(star())), false, false)) - assertEqual( - "INSERT INTO s /*+ REPARTITION(100) */ SELECT * FROM t", - InsertIntoTable(table("s"), Map.empty, - UnresolvedHint("REPARTITION", Seq(Literal(100)), - table("t").select(star())), false, false)) - assertEqual( - "INSERT INTO TABLE s /*+ REPARTITION(20, false) */ SELECT * FROM t", - InsertIntoTable(table("s"), Map.empty, - UnresolvedHint("REPARTITION", Seq(Literal(20), Literal(false)), - table("t").select(star())), false, false)) - assertEqual( - "INSERT OVERWRITE TABLE s /*+ COALESCE(10) */ SELECT * FROM t", - InsertIntoTable(table("s"), Map.empty, - UnresolvedHint("COALESCE", Seq(Literal(10)), - table("t").select(star())), true, false)) - assertEqual( - "INSERT OVERWRITE TABLE s /*+ COALESCE(50, true) */ SELECT * FROM t", - InsertIntoTable(table("s"), Map.empty, - UnresolvedHint("COALESCE", Seq(Literal(50), Literal(true)), - table("t").select(star())), true, false)) - assertEqual( - "INSERT OVERWRITE TABLE s /*+ REPARTITION(100) */ SELECT * FROM t", - InsertIntoTable(table("s"), Map.empty, - UnresolvedHint("REPARTITION", Seq(Literal(100)), - table("t").select(star())), true, false)) - assertEqual( - "INSERT OVERWRITE TABLE s /*+ REPARTITION(20, false) */ SELECT * FROM t", - InsertIntoTable(table("s"), Map.empty, - UnresolvedHint("REPARTITION", Seq(Literal(20), Literal(false)), - table("t").select(star())), true, false)) - - // Multiple hints - assertEqual( - "INSERT INTO s /*+ REPARTITION(100), COALESCE(50, true), COALESCE(10) */ SELECT * FROM t", - InsertIntoTable(table("s"), Map.empty, - UnresolvedHint("REPARTITION", Seq(Literal(100)), - UnresolvedHint("COALESCE", Seq(Literal(50), Literal(true)), - UnresolvedHint("COALESCE", Seq(Literal(10)), - table("t").select(star())))), false, false)) - - // Wrong hint location - intercept("INSERT INTO /*+ COALESCE(10) */ s SELECT * FROM t", - "extraneous input '/*+' expecting") - } - test("TRIM function") { intercept("select ltrim(both 'S' from 'SS abc S'", "missing ')' at ''") intercept("select rtrim(trailing 'S' from 'SS abc S'", "missing ')' at ''") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala index d98be477b5845..107a2f7109793 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala @@ -17,16 +17,13 @@ package org.apache.spark.sql.execution -import java.net.URI - import org.apache.spark.sql.SaveMode import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, UnresolvedAlias, UnresolvedAttribute, UnresolvedRelation, UnresolvedStar} import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType} -import org.apache.spark.sql.catalyst.dsl.expressions.star -import org.apache.spark.sql.catalyst.expressions.{Ascending, Concat, Literal, SortOrder} +import org.apache.spark.sql.catalyst.expressions.{Ascending, Concat, SortOrder} import org.apache.spark.sql.catalyst.parser.ParseException -import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, RepartitionByExpression, Sort} import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.datasources.{CreateTable, RefreshResource} import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} @@ -39,7 +36,6 @@ import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType * defined in the Catalyst module. */ class SparkSqlParserSuite extends AnalysisTest { - import org.apache.spark.sql.catalyst.dsl.plans._ val newConf = new SQLConf private lazy val parser = new SparkSqlParser(newConf) @@ -370,76 +366,4 @@ class SparkSqlParserSuite extends AnalysisTest { "SELECT a || b || c FROM t", Project(UnresolvedAlias(concat) :: Nil, UnresolvedRelation(TableIdentifier("t")))) } - - test ("insert overwrite directory hint syntax") { - val dummyStorage = CatalogStorageFormat( - locationUri = Option(URI.create("/d")), - inputFormat = None, - outputFormat = None, - serde = None, - compressed = false, - properties = Map.empty) - assertEqual( - "INSERT OVERWRITE DIRECTORY '/d' USING orc /*+ COALESCE(10) */ SELECT * FROM t", - InsertIntoDir(false, dummyStorage, Some("orc"), - UnresolvedHint("COALESCE", Seq(Literal(10)), - table("t").select(star())), true)) - assertEqual( - "INSERT OVERWRITE DIRECTORY '/d' USING orc /*+ COALESCE(50, true) */ SELECT * FROM t", - InsertIntoDir(false, dummyStorage, Some("orc"), - UnresolvedHint("COALESCE", Seq(Literal(50), Literal(true)), - table("t").select(star())), true)) - assertEqual( - "INSERT OVERWRITE DIRECTORY '/d' USING orc /*+ REPARTITION(100) */ SELECT * FROM t", - InsertIntoDir(false, dummyStorage, Some("orc"), - UnresolvedHint("REPARTITION", Seq(Literal(100)), - table("t").select(star())), true)) - assertEqual( - "INSERT OVERWRITE DIRECTORY '/d' USING orc /*+ REPARTITION(20, false) */ SELECT * FROM t", - InsertIntoDir(false, dummyStorage, Some("orc"), - UnresolvedHint("REPARTITION", Seq(Literal(20), Literal(false)), - table("t").select(star())), true)) - - val dummyHiveStorage = CatalogStorageFormat( - locationUri = Option(URI.create("/d")), - inputFormat = Some("org.apache.hadoop.mapred.TextInputFormat"), - outputFormat = Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat"), - serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"), - compressed = false, - properties = Map.empty) - assertEqual( - "INSERT OVERWRITE DIRECTORY '/d' /*+ COALESCE(10) */ SELECT * FROM t", - InsertIntoDir(false, dummyHiveStorage, Some("hive"), - UnresolvedHint("COALESCE", Seq(Literal(10)), - table("t").select(star())), true)) - assertEqual( - "INSERT OVERWRITE DIRECTORY '/d' /*+ COALESCE(50, true) */ SELECT * FROM t", - InsertIntoDir(false, dummyHiveStorage, Some("hive"), - UnresolvedHint("COALESCE", Seq(Literal(50), Literal(true)), - table("t").select(star())), true)) - assertEqual( - "INSERT OVERWRITE DIRECTORY '/d' /*+ REPARTITION(100) */ SELECT * FROM t", - InsertIntoDir(false, dummyHiveStorage, Some("hive"), - UnresolvedHint("REPARTITION", Seq(Literal(100)), - table("t").select(star())), true)) - assertEqual( - "INSERT OVERWRITE DIRECTORY '/d' /*+ REPARTITION(20, false) */ SELECT * FROM t", - InsertIntoDir(false, dummyHiveStorage, Some("hive"), - UnresolvedHint("REPARTITION", Seq(Literal(20), Literal(false)), - table("t").select(star())), true)) - - // Multiple hints - assertEqual( - "INSERT OVERWRITE DIRECTORY '/d' /*+ REPARTITION(100), COALESCE(50, true), COALESCE(10)" + - " */ SELECT * FROM t", - InsertIntoDir(false, dummyHiveStorage, Some("hive"), - UnresolvedHint("REPARTITION", Seq(Literal(100)), - UnresolvedHint("COALESCE", Seq(Literal(50), Literal(true)), - UnresolvedHint("COALESCE", Seq(Literal(10)), - table("t").select(star())))), true)) - - // Wrong hint location - intercept("INSERT OVERWRITE DIRECTORY /*+ COALESCE(10) */ '/d' SELECT * FROM t", - "no viable alternative at input") - } } From 0904a1220810b51e9188d67fa0b9f2c3a4713655 Mon Sep 17 00:00:00 2001 From: John Zhuge Date: Thu, 2 Aug 2018 11:43:55 -0700 Subject: [PATCH 3/7] [SPARK-24940][SQL] Incorporate more review comments --- .../sql/catalyst/analysis/ResolveHints.scala | 5 ++- .../catalyst/analysis/ResolveHintsSuite.scala | 40 ++++++++++++------- .../sql/catalyst/parser/PlanParserSuite.scala | 17 ++++++-- 3 files changed, 42 insertions(+), 20 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala index 0da9523ee00fc..62c4ee2402fdd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala @@ -120,15 +120,16 @@ object ResolveHints { def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators { case h: UnresolvedHint if COALESCE_HINT_NAMES.contains(h.name.toUpperCase(Locale.ROOT)) => + val hintName = h.name.toUpperCase(Locale.ROOT) h.parameters match { case Seq(Literal(numPartitions: Int, IntegerType)) => - val shuffle = h.name.toUpperCase(Locale.ROOT) match { + val shuffle = hintName match { case "REPARTITION" => true case "COALESCE" => false } applyCoalesceHint(h.child, numPartitions, shuffle) case _ => - throw new AnalysisException("COALESCE Hint expects a partition number as parameter") + throw new AnalysisException(s"$hintName Hint expects a partition number as parameter") } } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveHintsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveHintsSuite.scala index 1949d926ff536..c9f887146d2b6 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveHintsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveHintsSuite.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.catalyst.analysis -import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.expressions.Literal @@ -28,14 +27,6 @@ import org.apache.spark.sql.catalyst.plans.logical._ class ResolveHintsSuite extends AnalysisTest { import org.apache.spark.sql.catalyst.analysis.TestRelations._ - private def intercept(plan: LogicalPlan, messages: String*): Unit = { - val analyzer = getAnalyzer(false) - val e = intercept[AnalysisException](analyzer.executeAndCheck(plan)) - messages.foreach { message => - assert(e.message.contains(message)) - } - } - test("invalid hints should be ignored") { checkAnalysis( UnresolvedHint("some_random_hint_that_does_not_exist", Seq("TaBlE"), table("TaBlE")), @@ -135,14 +126,33 @@ class ResolveHintsSuite extends AnalysisTest { checkAnalysis( UnresolvedHint("COALESCE", Seq(Literal(10)), table("TaBlE")), Repartition(numPartitions = 10, shuffle = false, child = testRelation)) + checkAnalysis( + UnresolvedHint("coalesce", Seq(Literal(20)), table("TaBlE")), + Repartition(numPartitions = 20, shuffle = false, child = testRelation)) checkAnalysis( UnresolvedHint("REPARTITION", Seq(Literal(100)), table("TaBlE")), Repartition(numPartitions = 100, shuffle = true, child = testRelation)) - - val errMsg = "COALESCE Hint expects a partition number as parameter" - intercept(UnresolvedHint("COALESCE", Seq.empty, table("TaBlE")), errMsg) - intercept(UnresolvedHint("COALESCE", Seq(Literal(10), Literal(false)), table("TaBlE")), errMsg) - intercept(UnresolvedHint("REPARTITION", Seq(UnresolvedAttribute("a")), table("TaBlE")), errMsg) - intercept(UnresolvedHint("REPARTITION", Seq(Literal(true)), table("TaBlE")), errMsg) + checkAnalysis( + UnresolvedHint("RePARTITion", Seq(Literal(200)), table("TaBlE")), + Repartition(numPartitions = 200, shuffle = true, child = testRelation)) + + val errMsgCoal = "COALESCE Hint expects a partition number as parameter" + assertAnalysisError( + UnresolvedHint("COALESCE", Seq.empty, table("TaBlE")), + Seq(errMsgCoal)) + assertAnalysisError( + UnresolvedHint("COALESCE", Seq(Literal(10), Literal(false)), table("TaBlE")), + Seq(errMsgCoal)) + assertAnalysisError( + UnresolvedHint("COALESCE", Seq(Literal(1.0)), table("TaBlE")), + Seq(errMsgCoal)) + + val errMsgRepa = "REPARTITION Hint expects a partition number as parameter" + assertAnalysisError( + UnresolvedHint("REPARTITION", Seq(UnresolvedAttribute("a")), table("TaBlE")), + Seq(errMsgRepa)) + assertAnalysisError( + UnresolvedHint("REPARTITION", Seq(Literal(true)), table("TaBlE")), + Seq(errMsgRepa)) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala index a3926c9e31f4c..d7200d0bff5d6 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala @@ -605,10 +605,21 @@ class PlanParserSuite extends AnalysisTest { table("t").select(star()))) comparePlans( - parsePlan("INSERT INTO s SELECT /*+ COALESCE(10) */ * FROM t"), + parsePlan( + "INSERT INTO s SELECT /*+ REPARTITION(100), COALESCE(500), COALESCE(10) */ * FROM t"), InsertIntoTable(table("s"), Map.empty, - UnresolvedHint("COALESCE", Seq(Literal(10)), - table("t").select(star())), overwrite = false, ifPartitionNotExists = false)) + UnresolvedHint("REPARTITION", Seq(Literal(100)), + UnresolvedHint("COALESCE", Seq(Literal(500)), + UnresolvedHint("COALESCE", Seq(Literal(10)), + table("t").select(star())))), overwrite = false, ifPartitionNotExists = false)) + + comparePlans( + parsePlan("SELECT /*+ BROADCASTJOIN(u), REPARTITION(100) */ * FROM t"), + UnresolvedHint("BROADCASTJOIN", Seq($"u"), + UnresolvedHint("REPARTITION", Seq(Literal(100)), + table("t").select(star())))) + + intercept("SELECT /*+ COALESCE(30 + 50) */ * FROM t", "mismatched input") } test("SPARK-20854: select hint syntax with expressions") { From 739aeb44e9b9bb15b74271e2b42fb3dfe6f1c8fe Mon Sep 17 00:00:00 2001 From: John Zhuge Date: Thu, 2 Aug 2018 23:28:31 -0700 Subject: [PATCH 4/7] [SPARK-24940][SQL] Add more tests --- .../sql/catalyst/analysis/ResolveHints.scala | 10 +++-- .../catalyst/analysis/ResolveHintsSuite.scala | 2 +- .../apache/spark/sql/DataFrameHintSuite.scala | 10 +++++ .../org/apache/spark/sql/SQLQuerySuite.scala | 40 +++++++++++++++++++ 4 files changed, 57 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala index 62c4ee2402fdd..8a9c397779e0b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala @@ -121,12 +121,14 @@ object ResolveHints { def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators { case h: UnresolvedHint if COALESCE_HINT_NAMES.contains(h.name.toUpperCase(Locale.ROOT)) => val hintName = h.name.toUpperCase(Locale.ROOT) + val shuffle = hintName match { + case "REPARTITION" => true + case "COALESCE" => false + } h.parameters match { case Seq(Literal(numPartitions: Int, IntegerType)) => - val shuffle = hintName match { - case "REPARTITION" => true - case "COALESCE" => false - } + applyCoalesceHint(h.child, numPartitions, shuffle) + case Seq(numPartitions: Int) => applyCoalesceHint(h.child, numPartitions, shuffle) case _ => throw new AnalysisException(s"$hintName Hint expects a partition number as parameter") diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveHintsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveHintsSuite.scala index c9f887146d2b6..bd66ee5355f45 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveHintsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveHintsSuite.scala @@ -122,7 +122,7 @@ class ResolveHintsSuite extends AnalysisTest { caseSensitive = false) } - test("coalesce hint") { + test("coalesce and repartition hint") { checkAnalysis( UnresolvedHint("COALESCE", Seq(Literal(10)), table("TaBlE")), Repartition(numPartitions = 10, shuffle = false, child = testRelation)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameHintSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameHintSuite.scala index 0dd5bdcba2e4c..7ef8b542c79a8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameHintSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameHintSuite.scala @@ -59,4 +59,14 @@ class DataFrameHintSuite extends AnalysisTest with SharedSQLContext { ) ) } + + test("coalesce and repartition hint") { + check( + df.hint("COALESCE", 10), + UnresolvedHint("COALESCE", Seq(10), df.logicalPlan)) + + check( + df.hint("REPARTITION", 100), + UnresolvedHint("REPARTITION", Seq(100), df.logicalPlan)) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 86083d1701c2c..6e9bd4ecd3f27 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -27,6 +27,7 @@ import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart} import org.apache.spark.sql.catalyst.util.StringUtils import org.apache.spark.sql.execution.aggregate import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, SortAggregateExec} +import org.apache.spark.sql.execution.datasources.FilePartition import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, CartesianProductExec, SortMergeJoinExec} import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf @@ -2797,4 +2798,43 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { checkAnswer(df, Seq(Row(3, 99, 1))) } } + + + test("SPARK-24940: coalesce and repartition hint") { + def getNumFiles(tableName: String): Int = { + spark.table(tableName).rdd.partitions.map { + case fp: FilePartition => + fp.files.size + }.sum + } + + withTempView("nums1") { + val numPartitionsSrc = 10 + spark.range(0, 100, 1, numPartitionsSrc).createOrReplaceTempView("nums1") + assert(spark.table("nums1").rdd.getNumPartitions == numPartitionsSrc) + + withTable("nums") { + sql("CREATE TABLE nums (id INT) USING parquet") + + Seq(5, 20, 2).foreach { numPartitions => + sql( + s""" + |INSERT OVERWRITE TABLE nums + |SELECT /*+ REPARTITION($numPartitions) */ * + |FROM nums1 + """.stripMargin) + assert(getNumFiles("nums") == numPartitions) + + sql( + s""" + |INSERT OVERWRITE TABLE nums + |SELECT /*+ COALESCE($numPartitions) */ * + |FROM nums1 + """.stripMargin) + // Coalesce can not increase the number of partitions + assert(getNumFiles("nums") == Seq(numPartitions, numPartitionsSrc).min) + } + } + } + } } From c950b6e5e5f943155b296542238c08a412a35472 Mon Sep 17 00:00:00 2001 From: John Zhuge Date: Fri, 3 Aug 2018 10:03:34 -0700 Subject: [PATCH 5/7] [SPARK-24940][SQL] Marco's comments --- .../spark/sql/catalyst/analysis/Analyzer.scala | 2 +- .../spark/sql/catalyst/analysis/ResolveHints.scala | 13 +++---------- 2 files changed, 4 insertions(+), 11 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 373d8cb19ceb9..b5016fdb29d92 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -145,7 +145,7 @@ class Analyzer( lazy val batches: Seq[Batch] = Seq( Batch("Hints", fixedPoint, new ResolveHints.ResolveBroadcastHints(conf), - new ResolveHints.ResolveCoalesceHints(conf), + ResolveHints.ResolveCoalesceHints, ResolveHints.RemoveAllHints), Batch("Simple Sanity Check", Once, LookupFunctions), diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala index 8a9c397779e0b..7959b5a30bbfb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala @@ -108,16 +108,9 @@ object ResolveHints { * COALESCE Hint accepts name "COALESCE" and "REPARTITION". * Its parameter includes a partition number. */ - class ResolveCoalesceHints(conf: SQLConf) extends Rule[LogicalPlan] { + object ResolveCoalesceHints extends Rule[LogicalPlan] { private val COALESCE_HINT_NAMES = Set("COALESCE", "REPARTITION") - private def applyCoalesceHint( - plan: LogicalPlan, - numPartitions: Int, - shuffle: Boolean): LogicalPlan = { - Repartition(numPartitions, shuffle, plan) - } - def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators { case h: UnresolvedHint if COALESCE_HINT_NAMES.contains(h.name.toUpperCase(Locale.ROOT)) => val hintName = h.name.toUpperCase(Locale.ROOT) @@ -127,9 +120,9 @@ object ResolveHints { } h.parameters match { case Seq(Literal(numPartitions: Int, IntegerType)) => - applyCoalesceHint(h.child, numPartitions, shuffle) + Repartition(numPartitions, shuffle, h.child) case Seq(numPartitions: Int) => - applyCoalesceHint(h.child, numPartitions, shuffle) + Repartition(numPartitions, shuffle, h.child) case _ => throw new AnalysisException(s"$hintName Hint expects a partition number as parameter") } From acda5e281f1e3e8fd87faf4c8582d39ea857d215 Mon Sep 17 00:00:00 2001 From: John Zhuge Date: Fri, 3 Aug 2018 13:07:50 -0700 Subject: [PATCH 6/7] [SPARK-24940][SQL] Fix a nit --- .../apache/spark/sql/catalyst/analysis/ResolveHints.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala index 7959b5a30bbfb..1ef482b0e9f5b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala @@ -118,14 +118,15 @@ object ResolveHints { case "REPARTITION" => true case "COALESCE" => false } - h.parameters match { + val numPartitions = h.parameters match { case Seq(Literal(numPartitions: Int, IntegerType)) => - Repartition(numPartitions, shuffle, h.child) + numPartitions case Seq(numPartitions: Int) => - Repartition(numPartitions, shuffle, h.child) + numPartitions case _ => throw new AnalysisException(s"$hintName Hint expects a partition number as parameter") } + Repartition(numPartitions, shuffle, h.child) } } From 2dcab4b491e38766889c9cb83b8bebd59aa507fd Mon Sep 17 00:00:00 2001 From: John Zhuge Date: Fri, 3 Aug 2018 16:26:31 -0700 Subject: [PATCH 7/7] [SPARK-24940][SQL] Simplify the unit test in SQLQuerySuite Thanks to Ryan's suggestion. --- .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 6e9bd4ecd3f27..2cb7a04714a52 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -2801,13 +2801,6 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { test("SPARK-24940: coalesce and repartition hint") { - def getNumFiles(tableName: String): Int = { - spark.table(tableName).rdd.partitions.map { - case fp: FilePartition => - fp.files.size - }.sum - } - withTempView("nums1") { val numPartitionsSrc = 10 spark.range(0, 100, 1, numPartitionsSrc).createOrReplaceTempView("nums1") @@ -2823,7 +2816,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { |SELECT /*+ REPARTITION($numPartitions) */ * |FROM nums1 """.stripMargin) - assert(getNumFiles("nums") == numPartitions) + assert(spark.table("nums").inputFiles.length == numPartitions) sql( s""" @@ -2832,7 +2825,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { |FROM nums1 """.stripMargin) // Coalesce can not increase the number of partitions - assert(getNumFiles("nums") == Seq(numPartitions, numPartitionsSrc).min) + assert(spark.table("nums").inputFiles.length == Seq(numPartitions, numPartitionsSrc).min) } } }