[SPARK-37578][SQL] Update task metrics from ds v2 custom metrics#35028
[SPARK-37578][SQL] Update task metrics from ds v2 custom metrics#35028viirya wants to merge 5 commits intoapache:masterfrom
Conversation
cef027f to
832453a
Compare
|
cc @cloud-fan |
| Option(TaskContext.get()).map(_.taskMetrics().outputMetrics).foreach { outputMetrics => | ||
| metricName match { | ||
| case "bytesWritten" => outputMetrics.setBytesWritten(metricValue) | ||
| case "recordsWritten" => outputMetrics.setRecordsWritten(metricValue) |
There was a problem hiding this comment.
Could you add the default no-op case?
There was a problem hiding this comment.
Yea, I'll add it. Thanks.
|
|
||
| private[spark] val NUM_ROWS_PER_UPDATE = 100 | ||
|
|
||
| private[spark] val builtInOutputMetrics = Set("bytesWritten", "recordsWritten") |
There was a problem hiding this comment.
builtInOutputMetrics -> BUILTIN_OUTPUT_METRICS?
|
|
||
| // Wait until the new execution is started and being tracked. | ||
| eventually(timeout(10.seconds), interval(10.milliseconds)) { | ||
| assert(statusStore.executionsCount() >= oldCount) |
There was a problem hiding this comment.
| assert(statusStore.executionsCount() >= oldCount) | |
| assert(statusStore.executionsCount() > oldCount) |
| test("SPARK-37578: Update output metrics from Datasource v2") { | ||
| withTempDir { dir => | ||
| val statusStore = spark.sharedState.statusStore | ||
| val oldCount = statusStore.executionsList().size |
There was a problem hiding this comment.
is this different from statusStore.executionsCount()?
There was a problem hiding this comment.
then statusStore.executionsCount() looks more efficient? It's also used in the code below.
There was a problem hiding this comment.
Yea, I will change to executionsCount.
|
@cloud-fan Thanks. Any more comments? |
|
All tests in GA are passed. It only failed on Python lint check. |
|
You can ignore Python linter error~ |
|
Yea, thanks @dongjoon-hyun @cloud-fan . Merging to master. |
What changes were proposed in this pull request?
This patch proposes to update task metrics from datasource v2 custom metrics.
Why are the changes needed?
We have updated task metrics such as
bytesWrittenandrecordsWrittenfrom built-in data source v2 format e.g. Parquet. For other data source v2 formats, we can define some builtin metric names so Spark can also recognize them and update task metrics.Does this PR introduce any user-facing change?
Yes. Some special DS v2 custom metrics are updated to task metrics.
How was this patch tested?
Added unit test.