From 59acb16c1828a79ea4ce83d3b4f850e4f603da10 Mon Sep 17 00:00:00 2001 From: seancxmao Date: Wed, 5 Dec 2018 14:28:02 +0800 Subject: [PATCH 1/4] [MINOR][SQL][TEST] WholeStageCodegen metrics should be tested with whole-stage codegen enabled --- .../sql/execution/metric/SQLMetricsSuite.scala | 11 ++++++++--- .../sql/execution/metric/SQLMetricsTestUtils.scala | 14 ++++++++++---- 2 files changed, 18 insertions(+), 7 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index 7368a6c9e1d64..6174ec4c8908c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -77,11 +77,16 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared } test("WholeStageCodegen metrics") { - // Assume the execution plan is - // WholeStageCodegen(nodeId = 0, Range(nodeId = 2) -> Filter(nodeId = 1)) + // Assume the execution plan with node id is + // WholeStageCodegen(nodeId = 0) + // Filter(nodeId = 1) + // Range(nodeId = 2) // TODO: update metrics in generated operators val ds = spark.range(10).filter('id < 5) - testSparkPlanMetrics(ds.toDF(), 1, Map.empty) + testSparkPlanMetricsWithPredicates(ds.toDF(), 1, Map( + 0L -> (("WholeStageCodegen", Map( + "duration total (min, med, max)" -> {_.toString.matches(timingMetricPattern)}))) + ), true) } test("Aggregate metrics") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala index 2d245d2ba1e35..0d458d947f794 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala @@ -144,6 +144,7 @@ trait SQLMetricsTestUtils extends SQLTestUtils { * @param df `DataFrame` to run * @param expectedNumOfJobs number of jobs that will run * @param expectedNodeIds the node ids of the metrics to collect from execution data. + * @param enableWholeStage enable whole-stage code generation or not. */ protected def getSparkPlanMetrics( df: DataFrame, @@ -192,16 +193,19 @@ trait SQLMetricsTestUtils extends SQLTestUtils { * @param expectedNumOfJobs number of jobs that will run * @param expectedMetrics the expected metrics. The format is * `nodeId -> (operatorName, metric name -> metric value)`. + * @param enableWholeStage enable whole-stage code generation or not. */ protected def testSparkPlanMetrics( df: DataFrame, expectedNumOfJobs: Int, - expectedMetrics: Map[Long, (String, Map[String, Any])]): Unit = { + expectedMetrics: Map[Long, (String, Map[String, Any])], + enableWholeStage: Boolean = false): Unit = { val expectedMetricsPredicates = expectedMetrics.mapValues { case (nodeName, nodeMetrics) => (nodeName, nodeMetrics.mapValues(expectedMetricValue => (actualMetricValue: Any) => expectedMetricValue.toString === actualMetricValue)) } - testSparkPlanMetricsWithPredicates(df, expectedNumOfJobs, expectedMetricsPredicates) + testSparkPlanMetricsWithPredicates( + df, expectedNumOfJobs, expectedMetricsPredicates, enableWholeStage) } /** @@ -210,13 +214,15 @@ trait SQLMetricsTestUtils extends SQLTestUtils { * @param expectedNumOfJobs number of jobs that will run * @param expectedMetricsPredicates the expected metrics predicates. The format is * `nodeId -> (operatorName, metric name -> metric predicate)`. + * @param enableWholeStage enable whole-stage code generation or not. */ protected def testSparkPlanMetricsWithPredicates( df: DataFrame, expectedNumOfJobs: Int, - expectedMetricsPredicates: Map[Long, (String, Map[String, Any => Boolean])]): Unit = { + expectedMetricsPredicates: Map[Long, (String, Map[String, Any => Boolean])], + enableWholeStage: Boolean = false): Unit = { val optActualMetrics = - getSparkPlanMetrics(df, expectedNumOfJobs, expectedMetricsPredicates.keySet) + getSparkPlanMetrics(df, expectedNumOfJobs, expectedMetricsPredicates.keySet, enableWholeStage) optActualMetrics.foreach { actualMetrics => assert(expectedMetricsPredicates.keySet === actualMetrics.keySet) for ((nodeId, (expectedNodeName, expectedMetricsPredicatesMap)) From d410a9c67be3269707604048f380ac2f61bf00af Mon Sep 17 00:00:00 2001 From: seancxmao Date: Fri, 7 Dec 2018 20:02:48 +0800 Subject: [PATCH 2/4] check whole-stage codegen enabled --- .../spark/sql/execution/metric/SQLMetricsSuite.scala | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index 6174ec4c8908c..c28dc0fadabae 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -82,11 +82,18 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared // Filter(nodeId = 1) // Range(nodeId = 2) // TODO: update metrics in generated operators +<<<<<<< HEAD val ds = spark.range(10).filter('id < 5) testSparkPlanMetricsWithPredicates(ds.toDF(), 1, Map( 0L -> (("WholeStageCodegen", Map( "duration total (min, med, max)" -> {_.toString.matches(timingMetricPattern)}))) ), true) +======= + val df = spark.range(10).filter('id < 5).toDF() + testSparkPlanMetrics(df, 1, Map.empty, true) + df.queryExecution.executedPlan.find(_.isInstanceOf[WholeStageCodegenExec]) + .getOrElse(assert(false)) +>>>>>>> check whole-stage codegen enabled } test("Aggregate metrics") { From fd7c63bd7c765f9dc93c2ae2b0d3a723095d9c10 Mon Sep 17 00:00:00 2001 From: seancxmao Date: Sat, 8 Dec 2018 14:51:02 +0800 Subject: [PATCH 3/4] make test case more readable --- .../spark/sql/execution/metric/SQLMetricsSuite.scala | 7 ------- 1 file changed, 7 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index c28dc0fadabae..6174ec4c8908c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -82,18 +82,11 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared // Filter(nodeId = 1) // Range(nodeId = 2) // TODO: update metrics in generated operators -<<<<<<< HEAD val ds = spark.range(10).filter('id < 5) testSparkPlanMetricsWithPredicates(ds.toDF(), 1, Map( 0L -> (("WholeStageCodegen", Map( "duration total (min, med, max)" -> {_.toString.matches(timingMetricPattern)}))) ), true) -======= - val df = spark.range(10).filter('id < 5).toDF() - testSparkPlanMetrics(df, 1, Map.empty, true) - df.queryExecution.executedPlan.find(_.isInstanceOf[WholeStageCodegenExec]) - .getOrElse(assert(false)) ->>>>>>> check whole-stage codegen enabled } test("Aggregate metrics") { From 00284b6274f43d752b2ba05e90b18f3e9baaf941 Mon Sep 17 00:00:00 2001 From: seancxmao Date: Wed, 2 Jan 2019 22:15:15 +0800 Subject: [PATCH 4/4] remove unused flag --- .../spark/sql/execution/metric/SQLMetricsTestUtils.scala | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala index 0d458d947f794..0e13f7dd55bae 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala @@ -193,19 +193,16 @@ trait SQLMetricsTestUtils extends SQLTestUtils { * @param expectedNumOfJobs number of jobs that will run * @param expectedMetrics the expected metrics. The format is * `nodeId -> (operatorName, metric name -> metric value)`. - * @param enableWholeStage enable whole-stage code generation or not. */ protected def testSparkPlanMetrics( df: DataFrame, expectedNumOfJobs: Int, - expectedMetrics: Map[Long, (String, Map[String, Any])], - enableWholeStage: Boolean = false): Unit = { + expectedMetrics: Map[Long, (String, Map[String, Any])]): Unit = { val expectedMetricsPredicates = expectedMetrics.mapValues { case (nodeName, nodeMetrics) => (nodeName, nodeMetrics.mapValues(expectedMetricValue => (actualMetricValue: Any) => expectedMetricValue.toString === actualMetricValue)) } - testSparkPlanMetricsWithPredicates( - df, expectedNumOfJobs, expectedMetricsPredicates, enableWholeStage) + testSparkPlanMetricsWithPredicates(df, expectedNumOfJobs, expectedMetricsPredicates) } /**