[SPARK-56551][SQL] Add operation metrics for DELETE queries in DSv2#55428
[SPARK-56551][SQL] Add operation metrics for DELETE queries in DSv2#55428ZiyaZa wants to merge 24 commits intoapache:masterfrom
Conversation
…e operation column
# Conflicts: # sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteRowLevelCommand.scala # sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteUpdateTable.scala # sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/RowDeltaUtils.scala # sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
# Conflicts: # sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala # sql/core/src/test/scala/org/apache/spark/sql/connector/DeltaBasedNoMetadataUpdateTableSuite.scala
| case b: BatchScanExec if b.table.isInstanceOf[RowLevelOperationTable] => | ||
| getMetricValue(b.metrics, "numOutputRows") | ||
| } | ||
| val numCopiedRows = getMetricValue(metrics, "numCopiedRows") |
There was a problem hiding this comment.
This seems like a reasonable approach to handle DELETE. Is there a metric for the overall number of output rows? Can we add some sanity checks that we only had copied rows and its count matches the number of output rows?
There was a problem hiding this comment.
We could do this using totalNumRowsAccumulator.value and verify if the numbers in WriteSummary matches what we expect. Since this can be done for other commands too, let's do this in a follow-up PR for all commands.
There was a problem hiding this comment.
+1 to the idea (check whether it matches numCopiedRows in ReplaceData , and numDeletedRows in WriteDelta case)
# Conflicts: # sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala # sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
andreaschat-db
left a comment
There was a problem hiding this comment.
Looks nice overall. Left two comments.
| } | ||
| val numCopiedRows = getMetricValue(metrics, "numCopiedRows") | ||
| val numDeletedRows = if (numScannedRows.exists(_ >= 0) && numCopiedRows >= 0) { | ||
| numScannedRows.get - numCopiedRows |
There was a problem hiding this comment.
Can scanned rows be less than numCopiedRows? This looks like it needs a sanity check?
There was a problem hiding this comment.
It would mean the plan is creating new rows, and it shouldn't do that. We can add a sanity check, together with others in a follow-up PR. See #55428 (comment). This also has dependency on #55371.
There was a problem hiding this comment.
Metric values can get overcounted on retries. The scan and the write can be executed in different stages, so can have different retries, so technically you can get an overcounted numCopiedRows to turn this negative. Using the metrics infrastructure from #55371 that I want to get it would fix that.
| // DELETE ReplaceData plans filter out the deleted rows early in the plan, and they don't | ||
| // reach this node. We need to calculate this value as numScannedRows - numCopiedRows. | ||
| val numScannedRows = collectFirst(query) { | ||
| case b: BatchScanExec if b.table.isInstanceOf[RowLevelOperationTable] => |
There was a problem hiding this comment.
Is this the only scan type we have today?
There was a problem hiding this comment.
DataSourceV2ScanExecBase has 4 child scans, seemingly only this is used for DELETEs. The others scans seem to be used only for streaming reads.
szehon-ho
left a comment
There was a problem hiding this comment.
Looks good to me. +1 to put assertions in the follow up prs for expected presence or values for various metrics
| // DELETE ReplaceData plans filter out the deleted rows early in the plan, and they don't | ||
| // reach this node. We need to calculate this value as numScannedRows - numCopiedRows. | ||
| val numScannedRows = collectFirst(query) { | ||
| case b: BatchScanExec if b.table.isInstanceOf[RowLevelOperationTable] => |
There was a problem hiding this comment.
we don't expect this to happen right? Should we log a warning if we cannot find batchscanexec, or combine it with one of the assertions?
There was a problem hiding this comment.
Currently if we can't find such a scan node, we set the metric as -1. Is this good enough, what do you think? It kind of fits the theme we have been going with so far, that if there is some problem, we use -1 as the metric value. If any metric value is -1, it means we have a problem somewhere (excluding insert-only merges where this is intentional at the moment, but we'll fix that). When we discussed this previously, we decided against throwing an error and letting connectors handle it, but we didn't discuss logging.
There was a problem hiding this comment.
A case to consider is when the optimizer squashes and replaces the scan with an empty relation. This can happen in multiple cases. What is our behavior in that case?
| case b: BatchScanExec if b.table.isInstanceOf[RowLevelOperationTable] => | ||
| getMetricValue(b.metrics, "numOutputRows") | ||
| } | ||
| val numCopiedRows = getMetricValue(metrics, "numCopiedRows") |
There was a problem hiding this comment.
+1 to the idea (check whether it matches numCopiedRows in ReplaceData , and numDeletedRows in WriteDelta case)
What changes were proposed in this pull request?
Added
numDeletedRowsandnumCopiedRowsmetrics for DELETE operations in DSv2. These metrics are calculated in theWritingSparkTask.Metadata-only DELETEs are excluded from this PR and will be tackled in a future PR.
Why are the changes needed?
For better visibility into what happened as a result of an DELETE query.
Does this PR introduce any user-facing change?
Yes.
How was this patch tested?
Added metric value validation to most DELETE unit tests.
Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Opus 4.7