Skip to content

Commit

Permalink
[Gluten-986] Metric "time to collect" is always 0 in ColumnarBroadcas…
Browse files Browse the repository at this point in the history
…tExchange (#989)

Remove totalTime, fix collectTime, add buildTime.
Gluten uses BackendsApiManager.getSparkPlanExecApiInstance.createBroadcastRelation to collect data and build relation, we don't have to separate collectTime and buildTime, cause they are almost equal.

Closes #986
  • Loading branch information
Yohahaha committed Feb 22, 2023
1 parent 3f3f207 commit 191cd5a
Showing 1 changed file with 2 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@ case class ColumnarBroadcastExchangeExec(mode: BroadcastMode, child: SparkPlan)
extends BroadcastExchangeLike {
override lazy val metrics = Map(
"dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"),
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of Rows"),
"totalTime" -> SQLMetrics.createTimingMetric(sparkContext, "totaltime_broadcastExchange"),
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"collectTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to collect"),
"broadcastTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to broadcast")
)
Expand All @@ -66,6 +65,7 @@ case class ColumnarBroadcastExchangeExec(mode: BroadcastMode, child: SparkPlan)
runId.toString,
s"broadcast exchange (runId $runId)",
interruptOnCancel = true)
val beforeCollect = System.nanoTime()

// this created relation ignore HashedRelationBroadcastMode isNullAware, because we cannot
// get child output rows, then compare the hash key is null, if not null, compare the
Expand All @@ -76,16 +76,13 @@ case class ColumnarBroadcastExchangeExec(mode: BroadcastMode, child: SparkPlan)
child,
longMetric("numOutputRows"),
longMetric("dataSize"))
val beforeCollect = System.nanoTime()
val beforeBroadcast = System.nanoTime()

longMetric("collectTime") += NANOSECONDS.toMillis(beforeBroadcast - beforeCollect)

// Broadcast the relation
val broadcasted = sparkContext.broadcast(relation.asInstanceOf[Any])
longMetric("broadcastTime") += NANOSECONDS.toMillis(System.nanoTime() - beforeBroadcast)
longMetric("totalTime").merge(longMetric("collectTime"))
longMetric("totalTime").merge(longMetric("broadcastTime"))

// Update driver metrics
val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
Expand Down

0 comments on commit 191cd5a

Please sign in to comment.