Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-26277][SQL][TEST] WholeStageCodegen metrics should be tested with whole-stage codegen enabled #23224

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,16 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared
}

test("WholeStageCodegen metrics") {
// Assume the execution plan is
// WholeStageCodegen(nodeId = 0, Range(nodeId = 2) -> Filter(nodeId = 1))
// Assume the execution plan with node id is
// WholeStageCodegen(nodeId = 0)
// Filter(nodeId = 1)
// Range(nodeId = 2)
// TODO: update metrics in generated operators
val ds = spark.range(10).filter('id < 5)
testSparkPlanMetrics(ds.toDF(), 1, Map.empty)
testSparkPlanMetricsWithPredicates(ds.toDF(), 1, Map(
0L -> (("WholeStageCodegen", Map(
"duration total (min, med, max)" -> {_.toString.matches(timingMetricPattern)})))
), true)
}

test("Aggregate metrics") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ trait SQLMetricsTestUtils extends SQLTestUtils {
* @param df `DataFrame` to run
* @param expectedNumOfJobs number of jobs that will run
* @param expectedNodeIds the node ids of the metrics to collect from execution data.
* @param enableWholeStage enable whole-stage code generation or not.
*/
protected def getSparkPlanMetrics(
df: DataFrame,
Expand Down Expand Up @@ -192,16 +193,19 @@ trait SQLMetricsTestUtils extends SQLTestUtils {
* @param expectedNumOfJobs number of jobs that will run
* @param expectedMetrics the expected metrics. The format is
* `nodeId -> (operatorName, metric name -> metric value)`.
* @param enableWholeStage enable whole-stage code generation or not.
*/
protected def testSparkPlanMetrics(
df: DataFrame,
expectedNumOfJobs: Int,
expectedMetrics: Map[Long, (String, Map[String, Any])]): Unit = {
expectedMetrics: Map[Long, (String, Map[String, Any])],
enableWholeStage: Boolean = false): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this needed? IIUC it is never set to true...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's set to true in the new test that was introduced

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no testSparkPlanMetricsWithPredicates is used there, not testSparkPlanMetrics. So for testSparkPlanMetrics it is never used.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yes good point. Nothing calls this with a different value here. Yeah this should just pass false, not take a new arg?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should not take any new value here and should pass nothing to testSparkPlanMetricsWithPredicates, as false is the default value there. So basically, no change in the testSparkPlanMetrics method is needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, currently testSparkPlanMetrics is always used with whole-stage codegen disabled. I'd argue It's possible we want to use testSparkPlanMetrics when whole-stage codegen is enabled, just like testSparkPlanMetricsWithPredicates. Do you mean we should limit change scope as possible as we can? Or shall we do something for the future?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, we should do only changes which are strictly needed. If in the future we will need this, we will add the flag. But until then, we shouldn't add something which is not needed. Thanks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. I have removed the unused flag from testSparkPlanMetrics in the new commit.

val expectedMetricsPredicates = expectedMetrics.mapValues { case (nodeName, nodeMetrics) =>
(nodeName, nodeMetrics.mapValues(expectedMetricValue =>
(actualMetricValue: Any) => expectedMetricValue.toString === actualMetricValue))
}
testSparkPlanMetricsWithPredicates(df, expectedNumOfJobs, expectedMetricsPredicates)
testSparkPlanMetricsWithPredicates(
df, expectedNumOfJobs, expectedMetricsPredicates, enableWholeStage)
}

/**
Expand All @@ -210,13 +214,15 @@ trait SQLMetricsTestUtils extends SQLTestUtils {
* @param expectedNumOfJobs number of jobs that will run
* @param expectedMetricsPredicates the expected metrics predicates. The format is
* `nodeId -> (operatorName, metric name -> metric predicate)`.
* @param enableWholeStage enable whole-stage code generation or not.
*/
protected def testSparkPlanMetricsWithPredicates(
df: DataFrame,
expectedNumOfJobs: Int,
expectedMetricsPredicates: Map[Long, (String, Map[String, Any => Boolean])]): Unit = {
expectedMetricsPredicates: Map[Long, (String, Map[String, Any => Boolean])],
enableWholeStage: Boolean = false): Unit = {
val optActualMetrics =
getSparkPlanMetrics(df, expectedNumOfJobs, expectedMetricsPredicates.keySet)
getSparkPlanMetrics(df, expectedNumOfJobs, expectedMetricsPredicates.keySet, enableWholeStage)
optActualMetrics.foreach { actualMetrics =>
assert(expectedMetricsPredicates.keySet === actualMetrics.keySet)
for ((nodeId, (expectedNodeName, expectedMetricsPredicatesMap))
Expand Down