New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-30549][SQL] Fix the subquery shown issue in UI When enable AQE #27260
Conversation
…syn the value both in accumIdsToMetricType and metrics
@cloud-fan help me review. Thanks. |
Test build #116925 has finished for PR 27260 at commit
|
Test build #116950 has finished for PR 27260 at commit
|
retest this please |
Test build #116971 has finished for PR 27260 at commit
|
@@ -132,6 +134,17 @@ case class AdaptiveSparkPlanExec( | |||
executedPlan.resetMetrics() | |||
} | |||
|
|||
private def collectSQLMetrics(plan: SparkPlan): Seq[SQLMetric] = { | |||
val metrics = new mutable.ArrayBuffer[SQLMetric]() | |||
collect(plan) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should use the normal collect
. We don't need to get the SQLMetrics of already materialized query stages.
@@ -151,6 +164,9 @@ case class AdaptiveSparkPlanExec( | |||
currentPhysicalPlan = result.newPlan | |||
if (result.newStages.nonEmpty) { | |||
stagesToReplace = result.newStages ++ stagesToReplace | |||
if (isSubquery) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we put the code in onUpdatePlan
?
accumIdsToMetricType.map { case (accumulatorId, metricType) => | ||
stages.foreach { stageId => | ||
val liveStageMetric = stageMetrics.get(stageId) | ||
liveStageMetric.accumIdsToMetricType += (accumulatorId -> metricType) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about
stageMetrics(stageId) = liveStageMetric.copy(accumIdsToMetricType = ...)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's too hacky to make a UI data class mutable.
val SparkListenerSQLAdaptiveAccumUpdates(executionId, accumIdsToMetricType) = event | ||
|
||
val stages = liveExecutions.get(executionId).stages | ||
accumIdsToMetricType.map { case (accumulatorId, metricType) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need to loop it? we can just update liveStageMetric.accumIdsToMetricType
as
liveStageMetric.copy(accumIdsToMetricType =
liveStageMetric.accumIdsToMetricType ++ accumIdsToMetricType)
@JkSelf can you try it locally and post some screenshots in the PR description? |
Test build #117162 has finished for PR 27260 at commit
|
@@ -132,6 +133,17 @@ case class AdaptiveSparkPlanExec( | |||
executedPlan.resetMetrics() | |||
} | |||
|
|||
private def collectSQLMetrics(plan: SparkPlan): Seq[SQLMetric] = { | |||
val metrics = new mutable.ArrayBuffer[SQLMetric]() | |||
plan.collect { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: foreach
val metrics = new mutable.ArrayBuffer[SQLMetric]() | ||
plan.collect { | ||
case p: SparkPlan => | ||
p.metrics.map { case metric => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
@@ -484,12 +496,24 @@ case class AdaptiveSparkPlanExec( | |||
* Notify the listeners of the physical plan change. | |||
*/ | |||
private def onUpdatePlan(executionId: Long): Unit = { | |||
if (isSubquery) { | |||
onUpdateAccumulator(collectSQLMetrics(currentPhysicalPlan)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can pass the executionId
parameter to the onUpdateAccumulator
method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can not put this code in onUpdatePlan
, Because when the sql is subquery, the executionId
is None and it will not call the OnUpdatePlan
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
then can we not make it None now that we have this if
branch??
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
btw, without setting executeID, this onUpdatePlan
will not be called at all when isSubquery == true
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated.
Test build #117164 has finished for PR 27260 at commit
|
val stages = liveExecutions.get(executionId).stages | ||
stages.foreach { stageId => | ||
val liveStageMetric = stageMetrics.get(stageId) | ||
stageMetrics.put(stageId, liveStageMetric.copy( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be called throughout the entire subquery execution, and by doing this "copy", we could lose metrics for previous stages of the subquery.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, some maps in LiveStageMetrics will lose
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After discussion with @cloud-fan , we update the sql metric in LiveExecutionData.metirc
and here no need to copy. Thanks.
Test build #117213 has finished for PR 27260 at commit
|
Test build #117220 has finished for PR 27260 at commit
|
val sqlPlanMetrics = sqlMetrics.map { case sqlMetric => | ||
SQLPlanMetric(sqlMetric.name.get, sqlMetric.id, sqlMetric.metricType) | ||
} | ||
val executionId = context.session.sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is not needed.
@@ -34,6 +34,11 @@ case class SparkListenerSQLAdaptiveExecutionUpdate( | |||
sparkPlanInfo: SparkPlanInfo) | |||
extends SparkListenerEvent | |||
|
|||
case class SparkListenerSQLAdaptiveAccumUpdates( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SQLExecution.getQueryExecution(executionId).toString, | ||
SparkPlanInfo.fromSparkPlan(this))) | ||
if (isSubquery) { | ||
onUpdateAccumulator(collectSQLMetrics(currentPhysicalPlan), executionId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's add some comments
When executing subqueries, we can't update the query plan in the UI as the UI doesn't support partial
update yet. However, the subquery may have been optimized into a different plan and we must let the
UI know the SQL metrics of the new plan nodes, so that it can track the valid accumulator updates later
and display SQL metrics correctly.
} | ||
} | ||
|
||
private def onUpdateAccumulator(sqlMetrics: Seq[SQLMetric], executionId: Long): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: onUpdateSQLMetrics
SQLPlanMetric(sqlMetric.name.get, sqlMetric.id, sqlMetric.metricType) | ||
} | ||
val executionId = context.session.sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) | ||
context.session.sparkContext.listenerBus.post(SparkListenerSQLAdaptiveAccumUpdates( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SparkListenerSQLAdaptiveSQLMetricUpdates
@cloud-fan Resolved the comments and updated the screenshots. |
Test build #117225 has finished for PR 27260 at commit
|
Test build #117229 has finished for PR 27260 at commit
|
val metrics = new mutable.ArrayBuffer[SQLMetric]() | ||
plan.foreach { | ||
case p: ShuffleQueryStageExec if (p.resultOption.isEmpty) => | ||
collectSQLMetrics(p.plan).foreach(metrics += _) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we either use map/flatMap or pass this metrics
array in collectSQLMetrics
?
} | ||
} | ||
|
||
private def onUpdateSQLMetrics(sqlMetrics: Seq[SQLMetric], executionId: Long): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Why do we need this parameter sqlMetrics: Seq[SQLMetric]
? Can't we just pass executionId
. And IMO, I don't even think this extra method is necessary at this point.
Test build #117234 has finished for PR 27260 at commit
|
Test build #117238 has finished for PR 27260 at commit
|
Thanks! Merged to master. |
What changes were proposed in this pull request?
After PR#25316 fixed the dead lock issue in PR#25308, the subquery metrics can not be shown in UI as following screenshot.
This PR fix the subquery UI shown issue by adding
SparkListenerSQLAdaptiveSQLMetricUpdates
event to update the suquery sql metric. After with this PR, the suquery UI can show correctly as following screenshot:Why are the changes needed?
Showing the subquery metric in UI when enable AQE
Does this PR introduce any user-facing change?
No
How was this patch tested?
Existing UT