Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-26277][SQL][TEST] WholeStageCodegen metrics should be tested with whole-stage codegen enabled #23224

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,16 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared
}

test("WholeStageCodegen metrics") {
// Assume the execution plan is
// WholeStageCodegen(nodeId = 0, Range(nodeId = 2) -> Filter(nodeId = 1))
// Assume the execution plan with node id is
// WholeStageCodegen(nodeId = 0)
// Filter(nodeId = 1)
// Range(nodeId = 2)
// TODO: update metrics in generated operators
val ds = spark.range(10).filter('id < 5)
testSparkPlanMetrics(ds.toDF(), 1, Map.empty)
testSparkPlanMetricsWithPredicates(ds.toDF(), 1, Map(
0L -> (("WholeStageCodegen", Map(
"duration total (min, med, max)" -> {_.toString.matches(timingMetricPattern)})))
), true)
}

test("Aggregate metrics") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ trait SQLMetricsTestUtils extends SQLTestUtils {
* @param df `DataFrame` to run
* @param expectedNumOfJobs number of jobs that will run
* @param expectedNodeIds the node ids of the metrics to collect from execution data.
* @param enableWholeStage enable whole-stage code generation or not.
*/
protected def getSparkPlanMetrics(
df: DataFrame,
Expand Down Expand Up @@ -210,13 +211,15 @@ trait SQLMetricsTestUtils extends SQLTestUtils {
* @param expectedNumOfJobs number of jobs that will run
* @param expectedMetricsPredicates the expected metrics predicates. The format is
* `nodeId -> (operatorName, metric name -> metric predicate)`.
* @param enableWholeStage enable whole-stage code generation or not.
*/
protected def testSparkPlanMetricsWithPredicates(
df: DataFrame,
expectedNumOfJobs: Int,
expectedMetricsPredicates: Map[Long, (String, Map[String, Any => Boolean])]): Unit = {
expectedMetricsPredicates: Map[Long, (String, Map[String, Any => Boolean])],
enableWholeStage: Boolean = false): Unit = {
val optActualMetrics =
getSparkPlanMetrics(df, expectedNumOfJobs, expectedMetricsPredicates.keySet)
getSparkPlanMetrics(df, expectedNumOfJobs, expectedMetricsPredicates.keySet, enableWholeStage)
optActualMetrics.foreach { actualMetrics =>
assert(expectedMetricsPredicates.keySet === actualMetrics.keySet)
for ((nodeId, (expectedNodeName, expectedMetricsPredicatesMap))
Expand Down