From 9c55b36b78fb12fec6cac66696f159918631ff88 Mon Sep 17 00:00:00 2001 From: Charles Yu Date: Tue, 30 Sep 2025 16:56:59 -0400 Subject: [PATCH 1/2] Make tests explicit for physical plan field --- .../instrumentation/spark/AbstractSpark24SqlTest.groovy | 8 +++++++- .../instrumentation/spark/AbstractSpark32SqlTest.groovy | 8 ++++++++ 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSpark24SqlTest.groovy b/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSpark24SqlTest.groovy index 91ef13db3a7..714d284b6e0 100644 --- a/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSpark24SqlTest.groovy +++ b/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSpark24SqlTest.groovy @@ -137,7 +137,7 @@ abstract class AbstractSpark24SqlTest extends InstrumentationSpecification { sparkSession.stop() - def firstStagePlan = """{"node":"Exchange","nodeId":-1909876497,"metrics":[{"data size total (min, med, max)":"any","type":"size"}],"children":[{"node":"WholeStageCodegen","nodeId":724251804,"metrics":[{"duration total (min, med, max)":"any","type":"timing"}],"children":[{"node":"HashAggregate","nodeId":1128016273,"metrics":[{"aggregate time total (min, med, max)":"any","type":"timing"},{"number of output rows":"any","type":"sum"},{"peak memory total (min, med, max)":"any","type":"size"}],"children":[{"node":"InputAdapter","nodeId":180293,"children":[{"node":"LocalTableScan","nodeId":1632930767,"metrics":[{"number of output rows":3,"type":"sum"}]}]}]}]}]}""" + def firstStagePlan = """{"node":"Exchange","nodeId":-1909876497,"nodeDetailsString":"","metrics":[{"data size total (min, med, max)":"any","type":"size"}],"children":[{"node":"WholeStageCodegen","nodeId":724251804,"metrics":[{"duration total (min, med, max)":"any","type":"timing"}],"children":[{"node":"HashAggregate","nodeId":1128016273,"metrics":[{"aggregate time total (min, med, max)":"any","type":"timing"},{"number of output rows":"any","type":"sum"},{"peak memory total (min, med, max)":"any","type":"size"}],"children":[{"node":"InputAdapter","nodeId":180293,"children":[{"node":"LocalTableScan","nodeId":1632930767,"metrics":[{"number of output rows":3,"type":"sum"}]}]}]}]}]}""" def secondStagePlan = """{"node":"WholeStageCodegen","nodeId":724251804,"metrics":[{"duration total (min, med, max)":"any","type":"timing"}],"children":[{"node":"HashAggregate","nodeId":126020943,"metrics":[{"aggregate time total (min, med, max)":"any","type":"timing"},{"avg hash probe (min, med, max)":"any","type":"average"},{"number of output rows":2,"type":"sum"},{"peak memory total (min, med, max)":"any","type":"size"}],"children":[{"node":"InputAdapter","nodeId":180293}]}]}""" expect: @@ -162,6 +162,7 @@ abstract class AbstractSpark24SqlTest extends InstrumentationSpecification { spanType "spark" childOf(span(2)) assertStringSQLPlanEquals(secondStagePlan, span.tags["_dd.spark.sql_plan"].toString()) + assert span.tags["_dd.spark.physical_plan"] == null assert span.tags["_dd.spark.sql_parent_stage_ids"] == "[0]" } span { @@ -169,6 +170,7 @@ abstract class AbstractSpark24SqlTest extends InstrumentationSpecification { spanType "spark" childOf(span(2)) assertStringSQLPlanEquals(firstStagePlan, span.tags["_dd.spark.sql_plan"].toString()) + assert span.tags["_dd.spark.physical_plan"] == null assert span.tags["_dd.spark.sql_parent_stage_ids"] == "[]" } } @@ -221,6 +223,7 @@ abstract class AbstractSpark24SqlTest extends InstrumentationSpecification { spanType "spark" childOf(span(2)) assertStringSQLPlanSubset(fourthStagePlan, span.tags["_dd.spark.sql_plan"].toString()) + assert span.tags["_dd.spark.physical_plan"] == null assert span.tags["_dd.spark.sql_parent_stage_ids"] == "[2]" } span { @@ -228,6 +231,7 @@ abstract class AbstractSpark24SqlTest extends InstrumentationSpecification { spanType "spark" childOf(span(2)) assertStringSQLPlanEquals(thirdStagePlan, span.tags["_dd.spark.sql_plan"].toString()) + assert span.tags["_dd.spark.physical_plan"] == null assert ["[0, 1]", "[1, 0]"].contains(span.tags["_dd.spark.sql_parent_stage_ids"]) } span { @@ -235,6 +239,7 @@ abstract class AbstractSpark24SqlTest extends InstrumentationSpecification { spanType "spark" childOf(span(2)) assertStringSQLPlanEquals(secondStagePlan, span.tags["_dd.spark.sql_plan"].toString()) + assert span.tags["_dd.spark.physical_plan"] == null assert span.tags["_dd.spark.sql_parent_stage_ids"] == "[]" } span { @@ -242,6 +247,7 @@ abstract class AbstractSpark24SqlTest extends InstrumentationSpecification { spanType "spark" childOf(span(2)) assertStringSQLPlanEquals(firstStagePlan, span.tags["_dd.spark.sql_plan"].toString()) + assert span.tags["_dd.spark.physical_plan"] == null assert span.tags["_dd.spark.sql_parent_stage_ids"] == "[]" } } diff --git a/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSpark32SqlTest.groovy b/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSpark32SqlTest.groovy index 528545cdab3..e8b677d2191 100644 --- a/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSpark32SqlTest.groovy +++ b/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSpark32SqlTest.groovy @@ -58,6 +58,7 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { spanType "spark" childOf(span(2)) actualPlans.add(span.tags["_dd.spark.sql_plan"].toString()) + assert span.tags["_dd.spark.physical_plan"] == null assert span.tags["_dd.spark.sql_parent_stage_ids"] == "[4]" } span { @@ -70,6 +71,7 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { spanType "spark" childOf(span(4)) actualPlans.add(span.tags["_dd.spark.sql_plan"].toString()) + assert span.tags["_dd.spark.physical_plan"] == null assert span.tags["_dd.spark.sql_parent_stage_ids"] == "[0]" } span { @@ -82,6 +84,7 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { spanType "spark" childOf(span(6)) actualPlans.add(span.tags["_dd.spark.sql_plan"].toString()) + assert span.tags["_dd.spark.physical_plan"] == null assert span.tags["_dd.spark.sql_parent_stage_ids"] == "[0]" } span { @@ -94,6 +97,7 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { spanType "spark" childOf(span(8)) actualPlans.add(span.tags["_dd.spark.sql_plan"].toString()) + assert span.tags["_dd.spark.physical_plan"] == null assert span.tags["_dd.spark.sql_parent_stage_ids"] == "[]" } } @@ -152,6 +156,7 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { spanType "spark" childOf(span(2)) actualPlans.add(span.tags["_dd.spark.sql_plan"].toString()) + assert span.tags["_dd.spark.physical_plan"] == null assert span.tags["_dd.spark.sql_parent_stage_ids"] == "[4]" } span { @@ -164,6 +169,7 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { spanType "spark" childOf(span(4)) actualPlans.add(span.tags["_dd.spark.sql_plan"].toString()) + assert span.tags["_dd.spark.physical_plan"] == null assert ["[0, 1]", "[1, 0]"].contains(span.tags["_dd.spark.sql_parent_stage_ids"]) } span { @@ -176,6 +182,7 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { spanType "spark" childOf(span(6)) actualPlans.add(span.tags["_dd.spark.sql_plan"].toString()) + assert span.tags["_dd.spark.physical_plan"] == null assert span.tags["_dd.spark.sql_parent_stage_ids"] == "[]" } span { @@ -188,6 +195,7 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { spanType "spark" childOf(span(8)) actualPlans.add(span.tags["_dd.spark.sql_plan"].toString()) + assert span.tags["_dd.spark.physical_plan"] == null assert span.tags["_dd.spark.sql_parent_stage_ids"] == "[]" } } From afc6103109681395a07fc4a76232d5381e055480 Mon Sep 17 00:00:00 2001 From: Charles Yu Date: Tue, 30 Sep 2025 18:21:43 -0400 Subject: [PATCH 2/2] Extract simpleString from Spark StagePlanInfo, add to JSON payload in span --- .../instrumentation/spark/SparkSQLUtils.java | 8 + .../spark/AbstractSpark24SqlTest.groovy | 356 +++++++- .../spark/AbstractSpark32SqlTest.groovy | 836 +++++++++++++++++- 3 files changed, 1184 insertions(+), 16 deletions(-) diff --git a/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/SparkSQLUtils.java b/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/SparkSQLUtils.java index f3f1536c42a..e58f24d3dd2 100644 --- a/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/SparkSQLUtils.java +++ b/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/SparkSQLUtils.java @@ -165,6 +165,14 @@ private void toJson(JsonGenerator generator, Map acc generator.writeStringField("node", plan.nodeName()); generator.writeNumberField("nodeId", plan.hashCode()); + String nodeDetails = plan.simpleString(); + if (nodeDetails.startsWith(plan.nodeName())) { + nodeDetails = nodeDetails.substring(plan.nodeName().length()).trim(); + } + if (!nodeDetails.isEmpty()) { + generator.writeStringField("nodeDetailString", nodeDetails); + } + // Metadata is only present for FileSourceScan nodes if (!plan.metadata().isEmpty()) { generator.writeFieldName("meta"); diff --git a/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSpark24SqlTest.groovy b/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSpark24SqlTest.groovy index 714d284b6e0..7427c89e542 100644 --- a/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSpark24SqlTest.groovy +++ b/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSpark24SqlTest.groovy @@ -29,6 +29,23 @@ abstract class AbstractSpark24SqlTest extends InstrumentationSpecification { spark.createDataFrame(rows, structType) } + static assertStringSQLPlanIn(ArrayList expectedStrings, String actualString) { + def jsonSlurper = new JsonSlurper() + def actual = jsonSlurper.parseText(actualString) + + for (String expectedString : expectedStrings) { + try { + def expected = jsonSlurper.parseText(expectedString) + assertSQLPlanEquals(expected, actual) + return + } catch (AssertionError e) { + System.println("Failed to assert $expectedString, attempting next") + } + } + + throw new AssertionError("No matching SQL Plan found for $actualString in $expectedStrings") + } + static assertStringSQLPlanEquals(String expectedString, String actualString) { System.err.println("Checking if expected $expectedString SQL plan match actual $actualString") @@ -137,8 +154,115 @@ abstract class AbstractSpark24SqlTest extends InstrumentationSpecification { sparkSession.stop() - def firstStagePlan = """{"node":"Exchange","nodeId":-1909876497,"nodeDetailsString":"","metrics":[{"data size total (min, med, max)":"any","type":"size"}],"children":[{"node":"WholeStageCodegen","nodeId":724251804,"metrics":[{"duration total (min, med, max)":"any","type":"timing"}],"children":[{"node":"HashAggregate","nodeId":1128016273,"metrics":[{"aggregate time total (min, med, max)":"any","type":"timing"},{"number of output rows":"any","type":"sum"},{"peak memory total (min, med, max)":"any","type":"size"}],"children":[{"node":"InputAdapter","nodeId":180293,"children":[{"node":"LocalTableScan","nodeId":1632930767,"metrics":[{"number of output rows":3,"type":"sum"}]}]}]}]}]}""" - def secondStagePlan = """{"node":"WholeStageCodegen","nodeId":724251804,"metrics":[{"duration total (min, med, max)":"any","type":"timing"}],"children":[{"node":"HashAggregate","nodeId":126020943,"metrics":[{"aggregate time total (min, med, max)":"any","type":"timing"},{"avg hash probe (min, med, max)":"any","type":"average"},{"number of output rows":2,"type":"sum"},{"peak memory total (min, med, max)":"any","type":"size"}],"children":[{"node":"InputAdapter","nodeId":180293}]}]}""" + def firstStagePlan = """ + { + "node": "Exchange", + "nodeId": -1909876497, + "nodeDetailString": "hashpartitioning(string_col#0, 2)", + "metrics": [ + { + "data size total (min, med, max)": "any", + "type": "size" + } + ], + "children": [ + { + "node": "WholeStageCodegen", + "nodeId": 724251804, + "metrics": [ + { + "duration total (min, med, max)": "any", + "type": "timing" + } + ], + "children": [ + { + "node": "HashAggregate", + "nodeId": 1128016273, + "nodeDetailString": "(keys=[string_col#0], functions=[partial_avg(double_col#1)])", + "metrics": [ + { + "aggregate time total (min, med, max)": "any", + "type": "timing" + }, + { + "number of output rows": "any", + "type": "sum" + }, + { + "peak memory total (min, med, max)": "any", + "type": "size" + } + ], + "children": [ + { + "node": "InputAdapter", + "nodeId": 180293, + "children": [ + { + "node": "LocalTableScan", + "nodeId": 1632930767, + "nodeDetailString": "[string_col#0, double_col#1]", + "metrics": [ + { + "number of output rows": 3, + "type": "sum" + } + ] + } + ] + } + ] + } + ] + } + ] + } + """ + + def secondStagePlan = """ + { + "node": "WholeStageCodegen", + "nodeId": 724251804, + "metrics": [ + { + "duration total (min, med, max)": "any", + "type": "timing" + } + ], + "children": [ + { + "node": "HashAggregate", + "nodeId": 126020943, + "nodeDetailString": "(keys=[string_col#0], functions=[avg(double_col#1)])", + "metrics": [ + { + "aggregate time total (min, med, max)": "any", + "type": "timing" + }, + { + "avg hash probe (min, med, max)": "any", + "type": "average" + }, + { + "number of output rows": 2, + "type": "sum" + }, + { + "peak memory total (min, med, max)": "any", + "type": "size" + } + ], + "children": [ + { + "node": "InputAdapter", + "nodeId": 180293 + } + ] + } + ] + } + """ expect: assertTraces(1) { @@ -196,10 +320,226 @@ abstract class AbstractSpark24SqlTest extends InstrumentationSpecification { sparkSession.stop() - def firstStagePlan = """{"node":"Exchange","nodeId":"any","metrics":[{"data size total (min, med, max)":"any","type":"size"}],"children":[{"node":"LocalTableScan","nodeId":"any","metrics":[{"number of output rows":"any","type":"sum"}]}]}""" - def secondStagePlan = """{"node":"Exchange","nodeId":"any","metrics":[{"data size total (min, med, max)":"any","type":"size"}],"children":[{"node":"LocalTableScan","nodeId":"any","metrics":[{"number of output rows":"any","type":"sum"}]}]}""" - def thirdStagePlan = """{"node":"Exchange","nodeId":-1350402171,"metrics":[{"data size total (min, med, max)":"any","type":"size"}],"children":[{"node":"WholeStageCodegen","nodeId":724251804,"metrics":[{"duration total (min, med, max)":"any","type":"timing"}],"children":[{"node":"HashAggregate","nodeId":-879128980,"metrics":[{"aggregate time total (min, med, max)":"any","type":"timing"},{"number of output rows":"any","type":"sum"}],"children":[{"node":"Project","nodeId":1355342585,"children":[{"node":"SortMergeJoin","nodeId":-1975876610,"metrics":[{"number of output rows":"any","type":"sum"}],"children":[{"node":"InputAdapter","nodeId":180293,"children":[{"node":"WholeStageCodegen","nodeId":724251804,"metrics":[{"duration total (min, med, max)":"any","type":"timing"}],"children":[{"node":"Sort","nodeId":66807398,"metrics":[{"peak memory total (min, med, max)":"any","type":"size"},{"sort time total (min, med, max)":"any","type":"timing"}],"children":[{"node":"InputAdapter","nodeId":180293}]}]}]},{"node":"InputAdapter","nodeId":180293,"children":[{"node":"WholeStageCodegen","nodeId":724251804,"metrics":[{"duration total (min, med, max)":"any","type":"timing"}],"children":[{"node":"Sort","nodeId":-952138782,"metrics":[{"peak memory total (min, med, max)":"any","type":"size"}],"children":[{"node":"InputAdapter","nodeId":180293}]}]}]}]}]}]}]}]}""" - def fourthStagePlan = """{"node":"WholeStageCodegen","nodeId":724251804,"metrics":[{"duration total (min, med, max)":"any","type":"timing"}],"children":[{"node":"HashAggregate","nodeId":724815342,"metrics":[{"number of output rows":1,"type":"sum"}],"children":[{"node":"InputAdapter","nodeId":180293}]}]}""" + def firstStagePlan = """ + { + "node": "Exchange", + "nodeId": "any", + "nodeDetailString": "hashpartitioning(string_col#25, 2)", + "metrics": [ + { + "data size total (min, med, max)": "any", + "type": "size" + } + ], + "children": [ + { + "node": "LocalTableScan", + "nodeId": "any", + "nodeDetailString": "[string_col#25]", + "metrics": [ + { + "number of output rows": "any", + "type": "sum" + } + ] + } + ] + } + """ + def secondStagePlan = """ + { + "node": "Exchange", + "nodeId": "any", + "nodeDetailString": "hashpartitioning(string_col#21, 2)", + "metrics": [ + { + "data size total (min, med, max)": "any", + "type": "size" + } + ], + "children": [ + { + "node": "LocalTableScan", + "nodeId": "any", + "nodeDetailString": "[string_col#21]", + "metrics": [ + { + "number of output rows": "any", + "type": "sum" + } + ] + } + ] + } + """ + def thirdStagePlan = """ + { + "node": "Exchange", + "nodeId": -1350402171, + "nodeDetailString": "SinglePartition", + "metrics": [ + { + "data size total (min, med, max)": "any", + "type": "size" + } + ], + "children": [ + { + "node": "WholeStageCodegen", + "nodeId": 724251804, + "metrics": [ + { + "duration total (min, med, max)": "any", + "type": "timing" + } + ], + "children": [ + { + "node": "HashAggregate", + "nodeId": -879128980, + "nodeDetailString": "(keys=[], functions=[partial_count(1)])", + "metrics": [ + { + "aggregate time total (min, med, max)": "any", + "type": "timing" + }, + { + "number of output rows": "any", + "type": "sum" + } + ], + "children": [ + { + "node": "Project", + "nodeId": 1355342585, + "children": [ + { + "node": "SortMergeJoin", + "nodeId": -1975876610, + "nodeDetailString": "[string_col#21], [string_col#25], Inner", + "metrics": [ + { + "number of output rows": "any", + "type": "sum" + } + ], + "children": [ + { + "node": "InputAdapter", + "nodeId": 180293, + "children": [ + { + "node": "WholeStageCodegen", + "nodeId": 724251804, + "metrics": [ + { + "duration total (min, med, max)": "any", + "type": "timing" + } + ], + "children": [ + { + "node": "Sort", + "nodeId": 66807398, + "nodeDetailString": "[string_col#21 ASC NULLS FIRST], false, 0", + "metrics": [ + { + "peak memory total (min, med, max)": "any", + "type": "size" + }, + { + "sort time total (min, med, max)": "any", + "type": "timing" + } + ], + "children": [ + { + "node": "InputAdapter", + "nodeId": 180293 + } + ] + } + ] + } + ] + }, + { + "node": "InputAdapter", + "nodeId": 180293, + "children": [ + { + "node": "WholeStageCodegen", + "nodeId": 724251804, + "metrics": [ + { + "duration total (min, med, max)": "any", + "type": "timing" + } + ], + "children": [ + { + "node": "Sort", + "nodeId": -952138782, + "nodeDetailString": "[string_col#25 ASC NULLS FIRST], false, 0", + "metrics": [ + { + "peak memory total (min, med, max)": "any", + "type": "size" + } + ], + "children": [ + { + "node": "InputAdapter", + "nodeId": 180293 + } + ] + } + ] + } + ] + } + ] + } + ] + } + ] + } + ] + } + ] + } + """ + def fourthStagePlan = """ + { + "node": "WholeStageCodegen", + "nodeId": 724251804, + "metrics": [ + { + "duration total (min, med, max)": "any", + "type": "timing" + } + ], + "children": [ + { + "node": "HashAggregate", + "nodeId": 724815342, + "nodeDetailString": "(keys=[], functions=[count(1)])", + "metrics": [ + { + "number of output rows": 1, + "type": "sum" + } + ], + "children": [ + { + "node": "InputAdapter", + "nodeId": 180293 + } + ] + } + ] + } + """ expect: assertTraces(1) { @@ -238,7 +578,7 @@ abstract class AbstractSpark24SqlTest extends InstrumentationSpecification { operationName "spark.stage" spanType "spark" childOf(span(2)) - assertStringSQLPlanEquals(secondStagePlan, span.tags["_dd.spark.sql_plan"].toString()) + assertStringSQLPlanIn([firstStagePlan, secondStagePlan], span.tags["_dd.spark.sql_plan"].toString()) assert span.tags["_dd.spark.physical_plan"] == null assert span.tags["_dd.spark.sql_parent_stage_ids"] == "[]" } @@ -246,7 +586,7 @@ abstract class AbstractSpark24SqlTest extends InstrumentationSpecification { operationName "spark.stage" spanType "spark" childOf(span(2)) - assertStringSQLPlanEquals(firstStagePlan, span.tags["_dd.spark.sql_plan"].toString()) + assertStringSQLPlanIn([firstStagePlan, secondStagePlan], span.tags["_dd.spark.sql_plan"].toString()) assert span.tags["_dd.spark.physical_plan"] == null assert span.tags["_dd.spark.sql_parent_stage_ids"] == "[]" } diff --git a/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSpark32SqlTest.groovy b/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSpark32SqlTest.groovy index e8b677d2191..875cdcafdbc 100644 --- a/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSpark32SqlTest.groovy +++ b/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSpark32SqlTest.groovy @@ -30,10 +30,402 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { """).show() sparkSession.stop() - def firstStagePlan = """{"node":"Exchange","nodeId":"nodeId_4","metrics":[{"data size":"any","type":"size"},{"shuffle bytes written":"any","type":"size"},{"shuffle records written":3,"type":"sum"},{"shuffle write time":"any","type":"nsTiming"}],"children":[{"node":"WholeStageCodegen (1)","nodeId":"nodeId_1","metrics":[{"duration":"any","type":"timing"}],"children":[{"node":"HashAggregate","nodeId":"nodeId_3","metrics":[{"number of output rows":3,"type":"sum"},{"peak memory":"any","type":"size"},{"time in aggregation build":"any","type":"timing"}],"children":[{"node":"LocalTableScan","nodeId":"nodeId_2","metrics":[{"number of output rows":3,"type":"sum"}]}]}]}]}""" - def secondStagePlan = """{"node":"WholeStageCodegen (2)","nodeId":"nodeId_8","metrics":[{"duration":"any","type":"timing"}],"children":[{"node":"HashAggregate","nodeId":"nodeId_9","metrics":[{"avg hash probe bucket list iters":"any","type":"average"},{"number of output rows":2,"type":"sum"},{"peak memory":"any","type":"size"},{"time in aggregation build":"any","type":"timing"}],"children":[{"node":"InputAdapter","nodeId":"nodeId_7","children":[{"node":"AQEShuffleRead","nodeId":"nodeId_5","metrics":[],"children":[{"node":"ShuffleQueryStage","nodeId":"nodeId_6","children":[{"node":"Exchange","nodeId":"nodeId_4","metrics":[{"data size":"any","type":"size"},{"fetch wait time":"any","type":"timing"},{"local blocks read":"any","type":"sum"},{"local bytes read":"any","type":"size"},{"records read":3,"type":"sum"},{"shuffle bytes written":"any","type":"size"},{"shuffle records written":3,"type":"sum"},{"shuffle write time":"any","type":"nsTiming"}]}]}]}]}]}]}""" - def thirdStagePlan = """{"node":"Exchange","nodeId":"nodeId_10","metrics":[{"data size":"any","type":"size"},{"shuffle bytes written":"any","type":"size"},{"shuffle records written":2,"type":"sum"},{"shuffle write time":"any","type":"nsTiming"}],"children":[{"node":"WholeStageCodegen (2)","nodeId":"nodeId_8","metrics":[{"duration":"any","type":"timing"}],"children":[{"node":"HashAggregate","nodeId":"nodeId_9","metrics":[{"avg hash probe bucket list iters":"any","type":"average"},{"number of output rows":"any","type":"sum"},{"peak memory":"any","type":"size"},{"time in aggregation build":"any","type":"timing"}],"children":[{"node":"InputAdapter","nodeId":"nodeId_7","children":[{"node":"AQEShuffleRead","nodeId":"nodeId_5","metrics":[],"children":[{"node":"ShuffleQueryStage","nodeId":"nodeId_6","children":[{"node":"Exchange","nodeId":"nodeId_4","metrics":[{"data size":"any","type":"size"},{"fetch wait time":"any","type":"timing"},{"local blocks read":"any","type":"sum"},{"local bytes read":"any","type":"size"},{"records read":"any","type":"sum"},{"shuffle bytes written":"any","type":"size"},{"shuffle records written":"any","type":"sum"},{"shuffle write time":"any","type":"nsTiming"}]}]}]}]}]}]}]}""" - def fourthStagePlan = """{"node":"WholeStageCodegen (3)","nodeId":"nodeId_12","metrics":[{"duration":"any","type":"timing"}],"children":[{"node":"Project","nodeId":"nodeId_11","children":[{"node":"Sort","nodeId":"nodeId_13","metrics":[{"peak memory":"any","type":"size"},{"sort time":"any","type":"timing"},{"spill size":"any","type":"size"}],"children":[{"node":"InputAdapter","nodeId":"nodeId_7","children":[{"node":"AQEShuffleRead","nodeId":"nodeId_5","metrics":[],"children":[{"node":"ShuffleQueryStage","nodeId":"nodeId_14","children":[{"node":"Exchange","nodeId":"nodeId_10","metrics":[{"data size":"any","type":"size"},{"fetch wait time":"any","type":"timing"},{"local blocks read":"any","type":"sum"},{"local bytes read":"any","type":"size"},{"records read":2,"type":"sum"},{"shuffle bytes written":"any","type":"size"},{"shuffle records written":2,"type":"sum"},{"shuffle write time":"any","type":"nsTiming"}]}]}]}]}]}]}]}""" + def firstStagePlan = """ + { + "node": "Exchange", + "nodeId": "nodeId_4", + "nodeDetailString": "hashpartitioning(string_col#0, 2), ENSURE_REQUIREMENTS, [plan_id\\u003d38]", + "metrics": [ + { + "data size": "any", + "type": "size" + }, + { + "shuffle bytes written": "any", + "type": "size" + }, + { + "shuffle records written": 3, + "type": "sum" + }, + { + "shuffle write time": "any", + "type": "nsTiming" + } + ], + "children": [ + { + "node": "WholeStageCodegen (1)", + "nodeId": "nodeId_1", + "metrics": [ + { + "duration": "any", + "type": "timing" + } + ], + "children": [ + { + "node": "HashAggregate", + "nodeId": "nodeId_3", + "nodeDetailString": "(keys=[string_col#0], functions=[partial_avg(double_col#1)])", + "metrics": [ + { + "number of output rows": 3, + "type": "sum" + }, + { + "peak memory": "any", + "type": "size" + }, + { + "time in aggregation build": "any", + "type": "timing" + } + ], + "children": [ + { + "node": "LocalTableScan", + "nodeId": "nodeId_2", + "nodeDetailString": "[string_col#0, double_col#1]", + "metrics": [ + { + "number of output rows": 3, + "type": "sum" + } + ] + } + ] + } + ] + } + ] + } + """ + def secondStagePlan = """ + { + "node": "WholeStageCodegen (2)", + "nodeId": "nodeId_8", + "metrics": [ + { + "duration": "any", + "type": "timing" + } + ], + "children": [ + { + "node": "HashAggregate", + "nodeId": "nodeId_9", + "nodeDetailString": "(keys\\u003d[string_col#0], functions\\u003d[avg(double_col#1)])", + "metrics": [ + { + "avg hash probe bucket list iters": "any", + "type": "average" + }, + { + "number of output rows": 2, + "type": "sum" + }, + { + "peak memory": "any", + "type": "size" + }, + { + "time in aggregation build": "any", + "type": "timing" + } + ], + "children": [ + { + "node": "InputAdapter", + "nodeId": "nodeId_7", + "children": [ + { + "node": "AQEShuffleRead", + "nodeId": "nodeId_5", + "nodeDetailString": "coalesced", + "metrics": [], + "children": [ + { + "node": "ShuffleQueryStage", + "nodeId": "nodeId_6", + "nodeDetailString": "0", + "children": [ + { + "node": "Exchange", + "nodeId": "nodeId_4", + "nodeDetailString": "hashpartitioning(string_col#0, 2), ENSURE_REQUIREMENTS, [plan_id\\u003d38]", + "metrics": [ + { + "data size": "any", + "type": "size" + }, + { + "fetch wait time": "any", + "type": "timing" + }, + { + "local blocks read": "any", + "type": "sum" + }, + { + "local bytes read": "any", + "type": "size" + }, + { + "records read": 3, + "type": "sum" + }, + { + "shuffle bytes written": "any", + "type": "size" + }, + { + "shuffle records written": 3, + "type": "sum" + }, + { + "shuffle write time": "any", + "type": "nsTiming" + } + ] + } + ] + } + ] + } + ] + } + ] + } + ] + } + """ + def thirdStagePlan = """ + { + "node": "Exchange", + "nodeId": "nodeId_10", + "nodeDetailString": "rangepartitioning(avg(double_col)#5 DESC NULLS LAST, 2), ENSURE_REQUIREMENTS, [plan_id\\u003d67]", + "metrics": [ + { + "data size": "any", + "type": "size" + }, + { + "shuffle bytes written": "any", + "type": "size" + }, + { + "shuffle records written": 2, + "type": "sum" + }, + { + "shuffle write time": "any", + "type": "nsTiming" + } + ], + "children": [ + { + "node": "WholeStageCodegen (2)", + "nodeId": "nodeId_8", + "metrics": [ + { + "duration": "any", + "type": "timing" + } + ], + "children": [ + { + "node": "HashAggregate", + "nodeId": "nodeId_9", + "nodeDetailString": "(keys\\u003d[string_col#0], functions\\u003d[avg(double_col#1)])", + "metrics": [ + { + "avg hash probe bucket list iters": "any", + "type": "average" + }, + { + "number of output rows": "any", + "type": "sum" + }, + { + "peak memory": "any", + "type": "size" + }, + { + "time in aggregation build": "any", + "type": "timing" + } + ], + "children": [ + { + "node": "InputAdapter", + "nodeId": "nodeId_7", + "children": [ + { + "node": "AQEShuffleRead", + "nodeId": "nodeId_5", + "nodeDetailString": "coalesced", + "metrics": [], + "children": [ + { + "node": "ShuffleQueryStage", + "nodeId": "nodeId_6", + "nodeDetailString": "0", + "children": [ + { + "node": "Exchange", + "nodeId": "nodeId_4", + "nodeDetailString": "hashpartitioning(string_col#0, 2), ENSURE_REQUIREMENTS, [plan_id\\u003d38]", + "metrics": [ + { + "data size": "any", + "type": "size" + }, + { + "fetch wait time": "any", + "type": "timing" + }, + { + "local blocks read": "any", + "type": "sum" + }, + { + "local bytes read": "any", + "type": "size" + }, + { + "records read": "any", + "type": "sum" + }, + { + "shuffle bytes written": "any", + "type": "size" + }, + { + "shuffle records written": "any", + "type": "sum" + }, + { + "shuffle write time": "any", + "type": "nsTiming" + } + ] + } + ] + } + ] + } + ] + } + ] + } + ] + } + ] + } + """ + def fourthStagePlan = """ + { + "node": "WholeStageCodegen (3)", + "nodeId": "nodeId_12", + "metrics": [ + { + "duration": "any", + "type": "timing" + } + ], + "children": [ + { + "node": "Project", + "nodeId": "nodeId_11", + "nodeDetailString": "[string_col#0, cast(avg(double_col)#5 as string) AS avg(double_col)#12]", + "children": [ + { + "node": "Sort", + "nodeId": "nodeId_13", + "nodeDetailString": "[avg(double_col)#5 DESC NULLS LAST], true, 0", + "metrics": [ + { + "peak memory": "any", + "type": "size" + }, + { + "sort time": "any", + "type": "timing" + }, + { + "spill size": "any", + "type": "size" + } + ], + "children": [ + { + "node": "InputAdapter", + "nodeId": "nodeId_7", + "children": [ + { + "node": "AQEShuffleRead", + "nodeId": "nodeId_5", + "nodeDetailString": "coalesced", + "metrics": [], + "children": [ + { + "node": "ShuffleQueryStage", + "nodeId": "nodeId_14", + "nodeDetailString": "1", + "children": [ + { + "node": "Exchange", + "nodeId": "nodeId_10", + "nodeDetailString": "rangepartitioning(avg(double_col)#5 DESC NULLS LAST, 2), ENSURE_REQUIREMENTS, [plan_id\\u003d67]", + "metrics": [ + { + "data size": "any", + "type": "size" + }, + { + "fetch wait time": "any", + "type": "timing" + }, + { + "local blocks read": "any", + "type": "sum" + }, + { + "local bytes read": "any", + "type": "size" + }, + { + "records read": 2, + "type": "sum" + }, + { + "shuffle bytes written": "any", + "type": "size" + }, + { + "shuffle records written": 2, + "type": "sum" + }, + { + "shuffle write time": "any", + "type": "nsTiming" + } + ] + } + ] + } + ] + } + ] + } + ] + } + ] + } + ] + } + """ expect: def actualPlans = [] as List @@ -128,10 +520,438 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { sparkSession.stop() - def firstStagePlan = """{"node":"Exchange","nodeId":"any","metrics":[{"data size":"any","type":"size"},{"shuffle bytes written":"any","type":"size"},{"shuffle records written":"any","type":"sum"},{"shuffle write time":"any","type":"nsTiming"}],"children":[{"node":"LocalTableScan","nodeId":"any","metrics":[{"number of output rows":"any","type":"sum"}]}]}""" - def secondStagePlan = """{"node":"Exchange","nodeId":"any","metrics":[{"data size":"any","type":"size"},{"shuffle bytes written":"any","type":"size"},{"shuffle records written":"any","type":"sum"},{"shuffle write time":"any","type":"nsTiming"}],"children":[{"node":"LocalTableScan","nodeId":"any","metrics":[{"number of output rows":"any","type":"sum"}]}]}""" - def thirdStagePlan = """{"node":"Exchange","nodeId":"nodeId_7","metrics":[{"data size":"any","type":"size"},{"shuffle bytes written":"any","type":"size"},{"shuffle records written":1,"type":"sum"},{"shuffle write time":"any","type":"nsTiming"}],"children":[{"node":"WholeStageCodegen (3)","nodeId":"nodeId_9","metrics":[{"duration":"any","type":"timing"}],"children":[{"node":"HashAggregate","nodeId":"nodeId_16","metrics":[{"number of output rows":1,"type":"sum"},{"time in aggregation build":"any","type":"timing"}],"children":[{"node":"Project","nodeId":"nodeId_13","children":[{"node":"SortMergeJoin","nodeId":"nodeId_15","metrics":[{"number of output rows":"any","type":"sum"}],"children":[{"node":"InputAdapter","nodeId":"nodeId_6","children":[{"node":"WholeStageCodegen (1)","nodeId":"nodeId_8","metrics":[{"duration":"any","type":"timing"}],"children":[{"node":"Sort","nodeId":"nodeId_11","metrics":[{"peak memory":"any","type":"size"},{"sort time":"any","type":"timing"},{"spill size":"any","type":"size"}],"children":[{"node":"InputAdapter","nodeId":"nodeId_6","children":[{"node":"AQEShuffleRead","nodeId":"nodeId_5","metrics":[],"children":[{"node":"ShuffleQueryStage","nodeId":"nodeId_14","children":[{"node":"Exchange","nodeId":"nodeId_2","metrics":[{"data size":"any","type":"size"},{"fetch wait time":"any","type":"timing"},{"local blocks read":"any","type":"sum"},{"local bytes read":"any","type":"size"},{"records read":"any","type":"sum"},{"shuffle bytes written":"any","type":"size"},{"shuffle records written":"any","type":"sum"},{"shuffle write time":"any","type":"nsTiming"}]}]}]}]}]}]}]},{"node":"InputAdapter","nodeId":"nodeId_6","children":[{"node":"WholeStageCodegen (2)","nodeId":"nodeId_12","metrics":[{"duration":"any","type":"timing"}],"children":[{"node":"Sort","nodeId":"nodeId_17","metrics":[{"peak memory":"any","type":"size"},{"sort time":"any","type":"timing"},{"spill size":"any","type":"size"}],"children":[{"node":"InputAdapter","nodeId":"nodeId_6","children":[{"node":"AQEShuffleRead","nodeId":"nodeId_5","metrics":[],"children":[{"node":"ShuffleQueryStage","nodeId":"nodeId_10","children":[{"node":"Exchange","nodeId":"nodeId_4","metrics":[{"data size":"any","type":"size"},{"fetch wait time":"any","type":"timing"},{"local blocks read":"any","type":"sum"},{"local bytes read":"any","type":"size"},{"records read":"any","type":"sum"},{"shuffle bytes written":"any","type":"size"},{"shuffle records written":"any","type":"sum"},{"shuffle write time":"any","type":"nsTiming"}]}]}]}]}]}]}]}]}]}]}]}]}""" - def fourthStagePlan = """{"node":"WholeStageCodegen (4)","nodeId":"nodeId_20","metrics":[{"duration":"any","type":"timing"}],"children":[{"node":"HashAggregate","nodeId":"nodeId_18","metrics":[{"number of output rows":"any","type":"sum"},{"time in aggregation build":"any","type":"timing"}],"children":[{"node":"InputAdapter","nodeId":"nodeId_6","children":[{"node":"ShuffleQueryStage","nodeId":"nodeId_19","children":[{"node":"Exchange","nodeId":"nodeId_7","metrics":[{"data size":"any","type":"size"},{"fetch wait time":"any","type":"timing"},{"local blocks read":"any","type":"sum"},{"local bytes read":"any","type":"size"},{"records read":"any","type":"sum"},{"shuffle bytes written":"any","type":"size"},{"shuffle records written":"any","type":"sum"},{"shuffle write time":"any","type":"nsTiming"}]}]}]}]}]}""" + def firstStagePlan = """ + { + "node": "Exchange", + "nodeId": "any", + "nodeDetailString": "hashpartitioning(string_col#28, 2), ENSURE_REQUIREMENTS, [plan_id\\u003d119]", + "metrics": [ + { + "data size": "any", + "type": "size" + }, + { + "shuffle bytes written": "any", + "type": "size" + }, + { + "shuffle records written": "any", + "type": "sum" + }, + { + "shuffle write time": "any", + "type": "nsTiming" + } + ], + "children": [ + { + "node": "LocalTableScan", + "nodeId": "any", + "nodeDetailString": "[string_col#28]", + "metrics": [ + { + "number of output rows": "any", + "type": "sum" + } + ] + } + ] + } + """ + def secondStagePlan = """ + { + "node": "Exchange", + "nodeId": "any", + "nodeDetailString": "hashpartitioning(string_col#32, 2), ENSURE_REQUIREMENTS, [plan_id\\u003d120]", + "metrics": [ + { + "data size": "any", + "type": "size" + }, + { + "shuffle bytes written": "any", + "type": "size" + }, + { + "shuffle records written": "any", + "type": "sum" + }, + { + "shuffle write time": "any", + "type": "nsTiming" + } + ], + "children": [ + { + "node": "LocalTableScan", + "nodeId": "any", + "nodeDetailString": "[string_col#32]", + "metrics": [ + { + "number of output rows": "any", + "type": "sum" + } + ] + } + ] + } + """ + def thirdStagePlan = """ + { + "node": "Exchange", + "nodeId": "nodeId_7", + "nodeDetailString": "SinglePartition, ENSURE_REQUIREMENTS, [plan_id\\u003d230]", + "metrics": [ + { + "data size": "any", + "type": "size" + }, + { + "shuffle bytes written": "any", + "type": "size" + }, + { + "shuffle records written": 1, + "type": "sum" + }, + { + "shuffle write time": "any", + "type": "nsTiming" + } + ], + "children": [ + { + "node": "WholeStageCodegen (3)", + "nodeId": "nodeId_9", + "metrics": [ + { + "duration": "any", + "type": "timing" + } + ], + "children": [ + { + "node": "HashAggregate", + "nodeId": "nodeId_16", + "nodeDetailString": "(keys\\u003d[], functions\\u003d[partial_count(1)])", + "metrics": [ + { + "number of output rows": 1, + "type": "sum" + }, + { + "time in aggregation build": "any", + "type": "timing" + } + ], + "children": [ + { + "node": "Project", + "nodeId": "nodeId_13", + "children": [ + { + "node": "SortMergeJoin", + "nodeId": "nodeId_15", + "nodeDetailString": "[string_col#28], [string_col#32], Inner", + "metrics": [ + { + "number of output rows": "any", + "type": "sum" + } + ], + "children": [ + { + "node": "InputAdapter", + "nodeId": "nodeId_6", + "children": [ + { + "node": "WholeStageCodegen (1)", + "nodeId": "nodeId_8", + "metrics": [ + { + "duration": "any", + "type": "timing" + } + ], + "children": [ + { + "node": "Sort", + "nodeId": "nodeId_11", + "nodeDetailString": "[string_col#28 ASC NULLS FIRST], false, 0", + "metrics": [ + { + "peak memory": "any", + "type": "size" + }, + { + "sort time": "any", + "type": "timing" + }, + { + "spill size": "any", + "type": "size" + } + ], + "children": [ + { + "node": "InputAdapter", + "nodeId": "nodeId_6", + "children": [ + { + "node": "AQEShuffleRead", + "nodeId": "nodeId_5", + "nodeDetailString": "coalesced", + "metrics": [], + "children": [ + { + "node": "ShuffleQueryStage", + "nodeId": "nodeId_14", + "nodeDetailString": "0", + "children": [ + { + "node": "Exchange", + "nodeId": "nodeId_2", + "nodeDetailString": "hashpartitioning(string_col#28, 2), ENSURE_REQUIREMENTS, [plan_id\\u003d119]", + "metrics": [ + { + "data size": "any", + "type": "size" + }, + { + "fetch wait time": "any", + "type": "timing" + }, + { + "local blocks read": "any", + "type": "sum" + }, + { + "local bytes read": "any", + "type": "size" + }, + { + "records read": "any", + "type": "sum" + }, + { + "shuffle bytes written": "any", + "type": "size" + }, + { + "shuffle records written": "any", + "type": "sum" + }, + { + "shuffle write time": "any", + "type": "nsTiming" + } + ] + } + ] + } + ] + } + ] + } + ] + } + ] + } + ] + }, + { + "node": "InputAdapter", + "nodeId": "nodeId_6", + "children": [ + { + "node": "WholeStageCodegen (2)", + "nodeId": "nodeId_12", + "metrics": [ + { + "duration": "any", + "type": "timing" + } + ], + "children": [ + { + "node": "Sort", + "nodeId": "nodeId_17", + "nodeDetailString": "[string_col#32 ASC NULLS FIRST], false, 0", + "metrics": [ + { + "peak memory": "any", + "type": "size" + }, + { + "sort time": "any", + "type": "timing" + }, + { + "spill size": "any", + "type": "size" + } + ], + "children": [ + { + "node": "InputAdapter", + "nodeId": "nodeId_6", + "children": [ + { + "node": "AQEShuffleRead", + "nodeId": "nodeId_5", + "nodeDetailString": "coalesced", + "metrics": [], + "children": [ + { + "node": "ShuffleQueryStage", + "nodeId": "nodeId_10", + "nodeDetailString": "1", + "children": [ + { + "node": "Exchange", + "nodeId": "nodeId_4", + "nodeDetailString": "hashpartitioning(string_col#32, 2), ENSURE_REQUIREMENTS, [plan_id\\u003d120]", + "metrics": [ + { + "data size": "any", + "type": "size" + }, + { + "fetch wait time": "any", + "type": "timing" + }, + { + "local blocks read": "any", + "type": "sum" + }, + { + "local bytes read": "any", + "type": "size" + }, + { + "records read": "any", + "type": "sum" + }, + { + "shuffle bytes written": "any", + "type": "size" + }, + { + "shuffle records written": "any", + "type": "sum" + }, + { + "shuffle write time": "any", + "type": "nsTiming" + } + ] + } + ] + } + ] + } + ] + } + ] + } + ] + } + ] + } + ] + } + ] + } + ] + } + ] + } + ] + } + """ + def fourthStagePlan = """ + { + "node": "WholeStageCodegen (4)", + "nodeId": "nodeId_20", + "metrics": [ + { + "duration": "any", + "type": "timing" + } + ], + "children": [ + { + "node": "HashAggregate", + "nodeId": "nodeId_18", + "nodeDetailString": "(keys\\u003d[], functions\\u003d[count(1)])", + "metrics": [ + { + "number of output rows": "any", + "type": "sum" + }, + { + "time in aggregation build": "any", + "type": "timing" + } + ], + "children": [ + { + "node": "InputAdapter", + "nodeId": "nodeId_6", + "children": [ + { + "node": "ShuffleQueryStage", + "nodeId": "nodeId_19", + "nodeDetailString": "2", + "children": [ + { + "node": "Exchange", + "nodeId": "nodeId_7", + "nodeDetailString": "SinglePartition, ENSURE_REQUIREMENTS, [plan_id\\u003d230]", + "metrics": [ + { + "data size": "any", + "type": "size" + }, + { + "fetch wait time": "any", + "type": "timing" + }, + { + "local blocks read": "any", + "type": "sum" + }, + { + "local bytes read": "any", + "type": "size" + }, + { + "records read": "any", + "type": "sum" + }, + { + "shuffle bytes written": "any", + "type": "size" + }, + { + "shuffle records written": "any", + "type": "sum" + }, + { + "shuffle write time": "any", + "type": "nsTiming" + } + ] + } + ] + } + ] + } + ] + } + ] + } + """ expect: def actualPlans = [] as List