From f1474977e61adb4f14ff72432a398e48593fabcd Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Mon, 25 Sep 2017 20:12:50 +0900 Subject: [PATCH 1/4] Respect With in TPCDSQueryBenchmark --- .../benchmark/TPCDSQueryBenchmark.scala | 40 ++++++++++--------- 1 file changed, 21 insertions(+), 19 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala index 99c6df7389205..592c3c816ebac 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala @@ -17,13 +17,15 @@ package org.apache.spark.sql.execution.benchmark +import scala.collection.mutable + import org.apache.spark.SparkConf import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.expressions.SubqueryExpression -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias, With} import org.apache.spark.sql.catalyst.util._ import org.apache.spark.util.Benchmark @@ -66,25 +68,25 @@ object TPCDSQueryBenchmark extends Logging { classLoader = Thread.currentThread().getContextClassLoader) // This is an indirect hack to estimate the size of each query's input by traversing the - // logical plan and adding up the sizes of all tables that appear in the plan. Note that this - // currently doesn't take WITH subqueries into account which might lead to fairly inaccurate - // per-row processing time for those cases. - val queryRelations = scala.collection.mutable.HashSet[String]() - spark.sql(queryString).queryExecution.logical.map { - case UnresolvedRelation(t: TableIdentifier) => - queryRelations.add(t.table) - case lp: LogicalPlan => - lp.expressions.foreach { _ foreach { - case subquery: SubqueryExpression => - subquery.plan.foreach { - case UnresolvedRelation(t: TableIdentifier) => - queryRelations.add(t.table) - case _ => - } - case _ => - } + // logical plan and adding up the sizes of all tables that appear in the plan. + val planToCheck = mutable.Stack[LogicalPlan](spark.sql(queryString).queryExecution.logical) + val queryRelations = mutable.HashSet[String]() + while (planToCheck.nonEmpty) { + planToCheck.pop() match { + case UnresolvedRelation(t: TableIdentifier) => + queryRelations.add(t.table) + case With(_, cteRelations) => + cteRelations.foreach { case (_, SubqueryAlias(_, child)) => + planToCheck.push(child) + } + case lp => + lp.expressions.foreach { _ foreach { + case subquery: SubqueryExpression => + planToCheck.push(subquery.plan) + case _ => + }} + planToCheck.pushAll(lp.children) } - case _ => } val numRows = queryRelations.map(tableSizes.getOrElse(_, 0L)).sum val benchmark = new Benchmark(s"TPCDS Snappy", numRows, 5) From 489f2a242bf24cc61fd85882c798455daf72e296 Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Fri, 29 Sep 2017 00:39:49 +0900 Subject: [PATCH 2/4] Fix --- .../benchmark/TPCDSQueryBenchmark.scala | 31 +++++-------------- 1 file changed, 7 insertions(+), 24 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala index 592c3c816ebac..6932c85214101 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala @@ -17,16 +17,12 @@ package org.apache.spark.sql.execution.benchmark -import scala.collection.mutable - import org.apache.spark.SparkConf import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation -import org.apache.spark.sql.catalyst.expressions.SubqueryExpression -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias, With} +import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.util.Benchmark /** @@ -69,24 +65,11 @@ object TPCDSQueryBenchmark extends Logging { // This is an indirect hack to estimate the size of each query's input by traversing the // logical plan and adding up the sizes of all tables that appear in the plan. - val planToCheck = mutable.Stack[LogicalPlan](spark.sql(queryString).queryExecution.logical) - val queryRelations = mutable.HashSet[String]() - while (planToCheck.nonEmpty) { - planToCheck.pop() match { - case UnresolvedRelation(t: TableIdentifier) => - queryRelations.add(t.table) - case With(_, cteRelations) => - cteRelations.foreach { case (_, SubqueryAlias(_, child)) => - planToCheck.push(child) - } - case lp => - lp.expressions.foreach { _ foreach { - case subquery: SubqueryExpression => - planToCheck.push(subquery.plan) - case _ => - }} - planToCheck.pushAll(lp.children) - } + val queryRelations = scala.collection.mutable.HashSet[String]() + spark.sql(queryString).queryExecution.analyzed.map { + case SubqueryAlias(name, _: LogicalRelation) => + queryRelations.add(name) + case _ => } val numRows = queryRelations.map(tableSizes.getOrElse(_, 0L)).sum val benchmark = new Benchmark(s"TPCDS Snappy", numRows, 5) From 5691cf6f7bda6732b7d610701c2485397a6a94b5 Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Fri, 29 Sep 2017 19:18:08 +0900 Subject: [PATCH 3/4] Fix --- .../spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala index 6932c85214101..a07f2ccf1c758 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala @@ -66,7 +66,7 @@ object TPCDSQueryBenchmark extends Logging { // This is an indirect hack to estimate the size of each query's input by traversing the // logical plan and adding up the sizes of all tables that appear in the plan. val queryRelations = scala.collection.mutable.HashSet[String]() - spark.sql(queryString).queryExecution.analyzed.map { + spark.sql(queryString).queryExecution.analyzed.foreach { case SubqueryAlias(name, _: LogicalRelation) => queryRelations.add(name) case _ => From 8d8a9ff8a249aeb2442ccaf9ba1989bbe8c9bda9 Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Sat, 30 Sep 2017 09:16:58 +0900 Subject: [PATCH 4/4] Fix more --- .../sql/execution/benchmark/TPCDSQueryBenchmark.scala | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala index a07f2ccf1c758..69247d7f4e9aa 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.benchmark import org.apache.spark.SparkConf import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.catalog.HiveTableRelation import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.execution.datasources.LogicalRelation @@ -67,8 +68,12 @@ object TPCDSQueryBenchmark extends Logging { // logical plan and adding up the sizes of all tables that appear in the plan. val queryRelations = scala.collection.mutable.HashSet[String]() spark.sql(queryString).queryExecution.analyzed.foreach { - case SubqueryAlias(name, _: LogicalRelation) => - queryRelations.add(name) + case SubqueryAlias(alias, _: LogicalRelation) => + queryRelations.add(alias) + case LogicalRelation(_, _, Some(catalogTable), _) => + queryRelations.add(catalogTable.identifier.table) + case HiveTableRelation(tableMeta, _, _) => + queryRelations.add(tableMeta.identifier.table) case _ => } val numRows = queryRelations.map(tableSizes.getOrElse(_, 0L)).sum