From 8bd8325329d03bef5c9c387b13fc8c04835b3361 Mon Sep 17 00:00:00 2001 From: Ziya Mukhtarov Date: Mon, 20 Apr 2026 17:02:29 +0000 Subject: [PATCH 1/6] Metadata-only DELETE Metrics --- .../connector/catalog/TruncatableTable.java | 19 +++++ .../metric/NumDeletedRowsMetric.java | 48 +++++++++++ .../connector/catalog/InMemoryBaseTable.scala | 36 +++++++- .../sql/connector/catalog/InMemoryTable.scala | 6 +- .../catalog/InMemoryTableWithV2Filter.scala | 8 +- .../datasources/v2/BatchScanExec.scala | 2 +- .../datasources/v2/ContinuousScanExec.scala | 2 +- .../v2/DataSourceV2ScanExecBase.scala | 33 +++---- .../datasources/v2/DeleteFromTableExec.scala | 9 +- .../datasources/v2/MicroBatchScanExec.scala | 2 +- .../v2/RealTimeStreamScanExec.scala | 2 +- .../v2/SupportsCustomDriverMetrics.scala | 82 ++++++++++++++++++ .../datasources/v2/TruncateTableExec.scala | 13 ++- .../datasources/v2/V1FallbackWriters.scala | 23 +++-- .../v2/WriteToDataSourceV2Exec.scala | 85 +++++++++---------- .../spark/sql/connector/DeleteFromTests.scala | 39 +++++++-- .../command/TruncateTableSuiteBase.scala | 32 ++++++- 17 files changed, 338 insertions(+), 103 deletions(-) create mode 100644 sql/catalyst/src/main/java/org/apache/spark/sql/connector/metric/NumDeletedRowsMetric.java create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/SupportsCustomDriverMetrics.scala diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TruncatableTable.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TruncatableTable.java index 4bc2aa6e18ae6..a1d8f59248b92 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TruncatableTable.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TruncatableTable.java @@ -18,6 +18,8 @@ package org.apache.spark.sql.connector.catalog; import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.connector.metric.CustomMetric; +import org.apache.spark.sql.connector.metric.CustomTaskMetric; /** * Represents a table which can be atomically truncated. @@ -34,4 +36,21 @@ public interface TruncatableTable extends Table { * @since 3.2.0 */ boolean truncateTable(); + + /** + * Returns an array of supported custom metrics with name and description. + * By default it returns empty array. + */ + default CustomMetric[] supportedCustomMetrics() { + return new CustomMetric[]{}; + } + + /** + * Returns an array of custom metrics which are collected with values at the driver side only. + * Note that these metrics must be included in the supported custom metrics reported by + * `supportedCustomMetrics`. + */ + default CustomTaskMetric[] reportDriverMetrics() { + return new CustomTaskMetric[]{}; + } } diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/metric/NumDeletedRowsMetric.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/metric/NumDeletedRowsMetric.java new file mode 100644 index 0000000000000..e8f89ceca801e --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/metric/NumDeletedRowsMetric.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.metric; + +import org.apache.spark.annotation.Evolving; + +/** + * Standard {@link CustomMetric} for the number of rows deleted by a DELETE or TRUNCATE + * operation. Connectors implementing + * {@link org.apache.spark.sql.connector.catalog.SupportsDeleteV2} or + * {@link org.apache.spark.sql.connector.catalog.TruncatableTable} can declare this metric from + * their {@code supportedCustomMetrics()} to have Spark surface the value uniformly in the SQL UI + * and elsewhere. + * + * @since 4.2.0 + */ +@Evolving +public class NumDeletedRowsMetric extends CustomSumMetric { + /** + * Well-known metric name used across connectors and Spark's row-level DELETE path. + */ + public static final String NAME = "numDeletedRows"; + + @Override + public String name() { + return NAME; + } + + @Override + public String description() { + return "number of deleted rows"; + } +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala index fd2c0f6e9c2ec..6d802786c83fc 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.connector.catalog.constraints.Constraint import org.apache.spark.sql.connector.distributions.{Distribution, Distributions} import org.apache.spark.sql.connector.expressions._ import org.apache.spark.sql.connector.expressions.{Literal => V2Literal} -import org.apache.spark.sql.connector.metric.{CustomMetric, CustomSumMetric, CustomTaskMetric} +import org.apache.spark.sql.connector.metric.{CustomMetric, CustomSumMetric, CustomTaskMetric, NumDeletedRowsMetric} import org.apache.spark.sql.connector.read._ import org.apache.spark.sql.connector.read.colstats.{ColumnStatistics, Histogram, HistogramBin} import org.apache.spark.sql.connector.read.partitioning.{KeyGroupedPartitioning, Partitioning, UnknownPartitioning} @@ -1125,6 +1125,40 @@ class RowsReadCustomMetric extends CustomSumMetric { override def description(): String = "number of rows read" } +/** + * Mixin for [[InMemoryBaseTable]] subclasses that implement [[TruncatableTable]] (directly or via + * `SupportsDelete` / `SupportsDeleteV2`) and want to surface the number of rows removed by the + * most recent metadata-only DELETE or TRUNCATE. Subclasses call [[recordDeletedRows]] from their + * `deleteWhere` implementation with the set of partition keys being removed. + */ +trait InMemoryDeleteRowCountMetric extends TruncatableTable { self: InMemoryBaseTable => + + private var lastDeletedRowCount: Long = -1L + + protected def recordDeletedRows(keysToDelete: Iterable[Seq[Any]]): Unit = { + lastDeletedRowCount = keysToDelete.iterator + .flatMap(dataMap.get) + .flatten + .map(_.rows.size.toLong) + .sum + } + + override def supportedCustomMetrics(): Array[CustomMetric] = { + Array(new NumDeletedRowsMetric) + } + + override def reportDriverMetrics(): Array[CustomTaskMetric] = { + if (lastDeletedRowCount < 0) { + Array.empty + } else { + Array(new CustomTaskMetric { + override def name(): String = NumDeletedRowsMetric.NAME + override def value(): Long = lastDeletedRowCount + }) + } + } +} + case class Commit(id: Long, writeSummary: Option[WriteSummary] = None) sealed trait Operation diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTable.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTable.scala index d5738475031dc..ea2d90bf06e1e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTable.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTable.scala @@ -49,7 +49,7 @@ class InMemoryTable( override val id: String = UUID.randomUUID().toString) extends InMemoryBaseTable(name, columns, partitioning, properties, constraints, distribution, ordering, numPartitions, advisoryPartitionSize, isDistributionStrictlyRequired, - numRowsPerSplit) with SupportsDelete { + numRowsPerSplit) with SupportsDelete with InMemoryDeleteRowCountMetric { def this( name: String, @@ -69,8 +69,10 @@ class InMemoryTable( override def deleteWhere(filters: Array[Filter]): Unit = dataMap.synchronized { import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper - dataMap --= InMemoryTable + val keysToDelete = InMemoryTable .filtersToKeys(dataMap.keys, partCols.map(_.toSeq.quoted).toImmutableArraySeq, filters) + recordDeletedRows(keysToDelete) + dataMap --= keysToDelete increaseVersion() } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableWithV2Filter.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableWithV2Filter.scala index f2827faf59435..4dbef7bc08f1b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableWithV2Filter.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableWithV2Filter.scala @@ -34,7 +34,9 @@ class InMemoryTableWithV2Filter( columns: Array[Column], partitioning: Array[Transform], properties: util.Map[String, String]) - extends InMemoryBaseTable(name, columns, partitioning, properties) with SupportsDeleteV2 { + extends InMemoryBaseTable(name, columns, partitioning, properties) + with SupportsDeleteV2 + with InMemoryDeleteRowCountMetric { override def canDeleteWhere(predicates: Array[Predicate]): Boolean = { InMemoryTableWithV2Filter.supportsPredicates(predicates) @@ -42,8 +44,10 @@ class InMemoryTableWithV2Filter( override def deleteWhere(filters: Array[Predicate]): Unit = dataMap.synchronized { import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper - dataMap --= InMemoryTableWithV2Filter + val keysToDelete = InMemoryTableWithV2Filter .filtersToKeys(dataMap.keys, partCols.map(_.toSeq.quoted).toImmutableArraySeq, filters) + recordDeletedRows(keysToDelete) + dataMap --= keysToDelete } override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala index 28f4d12d366b2..e9a18833ed9a5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala @@ -129,7 +129,7 @@ case class BatchScanExec( new DataSourceRDD( sparkContext, filteredPartitions, readerFactory, supportsColumnar, customMetrics) } - postDriverMetrics() + postDriverMetrics(scan.reportDriverMetrics()) rdd } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ContinuousScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ContinuousScanExec.scala index e9e5f0f3175cb..f4e5db1536178 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ContinuousScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ContinuousScanExec.scala @@ -65,7 +65,7 @@ case class ContinuousScanExec( schema, readerFactory, customMetrics) - postDriverMetrics() + postDriverMetrics(scan.reportDriverMetrics()) inputRDD } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala index 0bec918039775..f00d8b9b82cb4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala @@ -24,23 +24,22 @@ import org.apache.spark.sql.catalyst.plans.physical import org.apache.spark.sql.catalyst.plans.physical.KeyedPartitioning import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.connector.read.{HasPartitionKey, InputPartition, PartitionReaderFactory, Scan} -import org.apache.spark.sql.execution.{ExplainUtils, LeafExecNode, SafeForKWayMerge, SQLExecution} -import org.apache.spark.sql.execution.metric.SQLMetrics +import org.apache.spark.sql.execution.{ExplainUtils, LeafExecNode, SafeForKWayMerge} +import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.internal.connector.SupportsMetadata import org.apache.spark.sql.vectorized.ColumnarBatch -import org.apache.spark.util.ArrayImplicits._ import org.apache.spark.util.Utils -trait DataSourceV2ScanExecBase extends LeafExecNode with SafeForKWayMerge { +trait DataSourceV2ScanExecBase + extends LeafExecNode + with SafeForKWayMerge + with SupportsCustomDriverMetrics { - lazy val customMetrics = scan.supportedCustomMetrics().map { customMetric => - customMetric.name() -> SQLMetrics.createV2CustomMetric(sparkContext, customMetric) - }.toMap + override lazy val customMetrics: Map[String, SQLMetric] = + createCustomMetrics(scan.supportedCustomMetrics()) - override lazy val metrics = { - Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) ++ - customMetrics - } + override protected lazy val sparkMetrics: Map[String, SQLMetric] = + Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) def scan: Scan @@ -145,18 +144,6 @@ trait DataSourceV2ScanExecBase extends LeafExecNode with SafeForKWayMerge { } } - protected def postDriverMetrics(): Unit = { - val driveSQLMetrics = scan.reportDriverMetrics().map(customTaskMetric => { - val metric = metrics(customTaskMetric.name()) - metric.set(customTaskMetric.value()) - metric - }) - - val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) - SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, - driveSQLMetrics.toImmutableArraySeq) - } - override def doExecuteColumnar(): RDD[ColumnarBatch] = { val numOutputRows = longMetric("numOutputRows") inputRDD.asInstanceOf[RDD[ColumnarBatch]].map { b => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DeleteFromTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DeleteFromTableExec.scala index 8d5ee6038e80f..aa0746184ca42 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DeleteFromTableExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DeleteFromTableExec.scala @@ -21,14 +21,21 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.connector.catalog.SupportsDeleteV2 import org.apache.spark.sql.connector.expressions.filter.Predicate +import org.apache.spark.sql.execution.metric.SQLMetric case class DeleteFromTableExec( table: SupportsDeleteV2, condition: Array[Predicate], - refreshCache: () => Unit) extends LeafV2CommandExec { + refreshCache: () => Unit) + extends LeafV2CommandExec + with SupportsCustomDriverMetrics { + + override lazy val customMetrics: Map[String, SQLMetric] = + createCustomMetrics(table.supportedCustomMetrics()) override protected def run(): Seq[InternalRow] = { table.deleteWhere(condition) + postDriverMetrics(table.reportDriverMetrics()) refreshCache() Seq.empty } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MicroBatchScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MicroBatchScanExec.scala index f81ca001fbe29..bf958ba5aad4d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MicroBatchScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MicroBatchScanExec.scala @@ -55,7 +55,7 @@ case class MicroBatchScanExec( override lazy val inputRDD: RDD[InternalRow] = { val inputRDD = new DataSourceRDD(sparkContext, partitions, readerFactory, supportsColumnar, customMetrics) - postDriverMetrics() + postDriverMetrics(scan.reportDriverMetrics()) inputRDD } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RealTimeStreamScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RealTimeStreamScanExec.scala index 9fff2d91af14c..cb4867cf8c6ac 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RealTimeStreamScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RealTimeStreamScanExec.scala @@ -166,7 +166,7 @@ case class RealTimeStreamScanExec( supportsColumnar, customMetrics ) - postDriverMetrics() + postDriverMetrics(scan.reportDriverMetrics()) inputRDD } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/SupportsCustomDriverMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/SupportsCustomDriverMetrics.scala new file mode 100644 index 0000000000000..2c6cd91d42392 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/SupportsCustomDriverMetrics.scala @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.sql.connector.metric.{CustomMetric, CustomTaskMetric} +import org.apache.spark.sql.execution.{SparkPlan, SQLExecution} +import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} +import org.apache.spark.util.ArrayImplicits._ + +/** + * A mixin for Spark plan nodes that expose driver-side custom metrics reported by a connector. + * Implementations declare the connector-owned metrics via [[customMetrics]]; after the underlying + * operation has executed they call [[postDriverMetrics]] with the connector's reported values so + * they are visible in the SQL UI. + * + * Nodes that also expose Spark-owned metrics (for example a node-computed row counter or a + * row-level operation's numDeletedRows) should supply them via [[sparkMetrics]]. Names in + * [[sparkMetrics]] are reserved: if the connector happens to report a value under the same name, + * Spark's value wins and the connector's is dropped. + */ +trait SupportsCustomDriverMetrics { self: SparkPlan => + + /** + * The custom metrics the connector supports for this operation, keyed by name. + */ + def customMetrics: Map[String, SQLMetric] + + /** + * Spark-owned metrics that should appear alongside the connector-declared ones. Defaults to + * empty. Values under these names are owned by Spark and take precedence on a name collision. + */ + protected def sparkMetrics: Map[String, SQLMetric] = Map.empty + + override lazy val metrics: Map[String, SQLMetric] = customMetrics ++ sparkMetrics + + /** + * Converts an array of connector-declared metrics into the map shape [[customMetrics]] uses. + */ + protected def createCustomMetrics(metrics: Array[CustomMetric]): Map[String, SQLMetric] = { + metrics.map { m => + m.name() -> SQLMetrics.createV2CustomMetric(sparkContext, m) + }.toMap + } + + /** + * Applies the values reported by the connector to the declared metrics and posts them so the + * SQL UI reflects the final values. Metrics not declared via [[customMetrics]] are ignored. + * Metrics whose name collides with [[sparkMetrics]] are also ignored so Spark-owned values + * are preserved. + */ + protected def postDriverMetrics(taskMetrics: Array[CustomTaskMetric]): Unit = { + val updated = taskMetrics.flatMap { t => + if (sparkMetrics.contains(t.name())) { + // Spark metrics take precedence on collisions. + None + } else { + metrics.get(t.name()).map { metric => + metric.set(t.value()) + metric + } + } + } + val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) + SQLMetrics.postDriverMetricUpdates( + sparkContext, executionId, updated.toImmutableArraySeq) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TruncateTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TruncateTableExec.scala index 948dc1bc8c87c..084e2d54b5029 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TruncateTableExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TruncateTableExec.scala @@ -20,18 +20,27 @@ package org.apache.spark.sql.execution.datasources.v2 import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.connector.catalog.TruncatableTable +import org.apache.spark.sql.execution.metric.SQLMetric /** * Physical plan node for table truncation. */ case class TruncateTableExec( table: TruncatableTable, - refreshCache: () => Unit) extends LeafV2CommandExec { + refreshCache: () => Unit) + extends LeafV2CommandExec + with SupportsCustomDriverMetrics { + + override lazy val customMetrics: Map[String, SQLMetric] = + createCustomMetrics(table.supportedCustomMetrics()) override def output: Seq[Attribute] = Seq.empty override protected def run(): Seq[InternalRow] = { - if (table.truncateTable()) refreshCache() + if (table.truncateTable()) { + postDriverMetrics(table.reportDriverMetrics()) + refreshCache() + } Seq.empty } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala index 3eadffb8f0ae4..9e3f5e3e1d4f5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala @@ -23,8 +23,8 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.classic.Dataset import org.apache.spark.sql.connector.catalog.SupportsWrite import org.apache.spark.sql.connector.write.V1Write -import org.apache.spark.sql.execution.{SparkPlan, SQLExecution} -import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.sources.InsertableRelation /** @@ -56,13 +56,15 @@ case class OverwriteByExpressionExecV1( write: V1Write) extends V1FallbackWriters /** Some helper interfaces that use V2 write semantics through the V1 writer interface. */ -sealed trait V1FallbackWriters extends LeafV2CommandExec with SupportsV1Write { +sealed trait V1FallbackWriters + extends LeafV2CommandExec + with SupportsV1Write + with SupportsCustomDriverMetrics { + override def output: Seq[Attribute] = Nil - override val metrics: Map[String, SQLMetric] = - write.supportedCustomMetrics().map { customMetric => - customMetric.name() -> SQLMetrics.createV2CustomMetric(sparkContext, customMetric) - }.toMap + override lazy val customMetrics: Map[String, SQLMetric] = + createCustomMetrics(write.supportedCustomMetrics()) def table: SupportsWrite def refreshCache: () => Unit @@ -75,12 +77,7 @@ sealed trait V1FallbackWriters extends LeafV2CommandExec with SupportsV1Write { Nil } finally { - write.reportDriverMetrics().foreach { customTaskMetric => - metrics.get(customTaskMetric.name()).foreach(_.set(customTaskMetric.value())) - } - - val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) - SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, metrics.values.toSeq) + postDriverMetrics(write.reportDriverMetrics()) } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala index 6bb1eb6f4b6d6..2b1b0e6c6abb1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala @@ -30,11 +30,11 @@ import org.apache.spark.sql.catalyst.util.{removeInternalMetadata, CharVarcharUt import org.apache.spark.sql.catalyst.util.RowDeltaUtils.{COPY_OPERATION, DELETE_OPERATION, INSERT_OPERATION, REINSERT_OPERATION, UPDATE_OPERATION} import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column, Identifier, StagedTable, StagingTableCatalog, Table, TableCatalog, TableInfo, TableWritePrivilege} import org.apache.spark.sql.connector.expressions.Transform -import org.apache.spark.sql.connector.metric.CustomMetric +import org.apache.spark.sql.connector.metric.{CustomMetric, NumDeletedRowsMetric} import org.apache.spark.sql.connector.write.{BatchWrite, DataWriter, DataWriterFactory, DeleteSummaryImpl, DeltaWrite, DeltaWriter, MergeSummaryImpl, PhysicalWriteInfoImpl, RowLevelOperation, RowLevelOperationTable, UpdateSummaryImpl, Write, WriterCommitMessage, WriteSummary} import org.apache.spark.sql.connector.write.RowLevelOperation.Command._ import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} -import org.apache.spark.sql.execution.{QueryExecution, SparkPlan, SQLExecution, UnaryExecNode} +import org.apache.spark.sql.execution.{QueryExecution, SparkPlan, UnaryExecNode} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.execution.metric.{CustomMetrics, SQLMetric, SQLMetrics} import org.apache.spark.sql.types.StructType @@ -325,9 +325,9 @@ case class ReplaceDataExec( override def writingTask: WritingSparkTask[_] = { projections.metadataProjection match { case Some(metadataProj) => - DataAndMetadataWritingSparkTask(projections.rowProjection, metadataProj, operationMetrics) + DataAndMetadataWritingSparkTask(projections.rowProjection, metadataProj, sparkMetrics) case None => - DataWithProjectionWritingSparkTask(projections.rowProjection, operationMetrics) + DataWithProjectionWritingSparkTask(projections.rowProjection, sparkMetrics) } } @@ -350,7 +350,7 @@ case class ReplaceDataExec( // One of the metrics couldn't be found, also mark numDeletedRows as not found. -1L } - metrics("numDeletedRows").set(numDeletedRows) + metrics(NumDeletedRowsMetric.NAME).set(numDeletedRows) } super.getWriteSummary(query) } @@ -368,9 +368,9 @@ case class WriteDeltaExec( override lazy val writingTask: WritingSparkTask[_] = { if (projections.metadataProjection.isDefined) { - DeltaWithMetadataWritingSparkTask(projections, operationMetrics) + DeltaWithMetadataWritingSparkTask(projections, sparkMetrics) } else { - DeltaWritingSparkTask(projections, operationMetrics) + DeltaWritingSparkTask(projections, sparkMetrics) } } @@ -387,9 +387,8 @@ case class WriteToDataSourceV2Exec( override def stringArgs: Iterator[Any] = Iterator(batchWrite, query) - override val customMetrics: Map[String, SQLMetric] = writeMetrics.map { customMetric => - customMetric.name() -> SQLMetrics.createV2CustomMetric(sparkContext, customMetric) - }.toMap + override lazy val customMetrics: Map[String, SQLMetric] = + createCustomMetrics(writeMetrics.toArray) override protected def run(): Seq[InternalRow] = { val writtenRows = writeWithV2(batchWrite) @@ -407,32 +406,18 @@ trait V2ExistingTableWriteExec extends V2TableWriteExec { override def stringArgs: Iterator[Any] = Iterator(query, write) - override val customMetrics: Map[String, SQLMetric] = - write.supportedCustomMetrics().map { customMetric => - customMetric.name() -> SQLMetrics.createV2CustomMetric(sparkContext, customMetric) - }.toMap + override lazy val customMetrics: Map[String, SQLMetric] = + createCustomMetrics(write.supportedCustomMetrics()) override protected def run(): Seq[InternalRow] = { val writtenRows = try { writeWithV2(write.toBatch) } finally { - postDriverMetrics() + postDriverMetrics(write.reportDriverMetrics()) } refreshCache() writtenRows } - - protected def postDriverMetrics(): Unit = { - val driveSQLMetrics = write.reportDriverMetrics().map(customTaskMetric => { - val metric = metrics(customTaskMetric.name()) - metric.set(customTaskMetric.value()) - metric - }) - - val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) - SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, - driveSQLMetrics.toImmutableArraySeq) - } } /** @@ -441,14 +426,15 @@ trait V2ExistingTableWriteExec extends V2TableWriteExec { trait RowLevelWriteExec extends V2ExistingTableWriteExec { def rowLevelCommand: RowLevelOperation.Command - override lazy val operationMetrics: Map[String, SQLMetric] = rowLevelCommand match { + override protected lazy val sparkMetrics: Map[String, SQLMetric] = rowLevelCommand match { case UPDATE => Map( "numUpdatedRows" -> SQLMetrics.createMetric(sparkContext, "number of updated rows"), "numCopiedRows" -> SQLMetrics.createMetric(sparkContext, "number of copied rows")) case DELETE => Map( - "numDeletedRows" -> SQLMetrics.createMetric(sparkContext, "number of deleted rows"), + NumDeletedRowsMetric.NAME -> + SQLMetrics.createMetric(sparkContext, "number of deleted rows"), "numCopiedRows" -> SQLMetrics.createMetric(sparkContext, "number of copied rows")) case _ => Map.empty } @@ -477,12 +463,12 @@ trait RowLevelWriteExec extends V2ExistingTableWriteExec { } case UPDATE => Some(UpdateSummaryImpl( - getMetricValue(operationMetrics, "numUpdatedRows"), - getMetricValue(operationMetrics, "numCopiedRows"))) + getMetricValue(sparkMetrics, "numUpdatedRows"), + getMetricValue(sparkMetrics, "numCopiedRows"))) case DELETE => Some(DeleteSummaryImpl( - getMetricValue(operationMetrics, "numDeletedRows"), - getMetricValue(operationMetrics, "numCopiedRows"))) + getMetricValue(sparkMetrics, NumDeletedRowsMetric.NAME), + getMetricValue(sparkMetrics, "numCopiedRows"))) } } } @@ -490,7 +476,12 @@ trait RowLevelWriteExec extends V2ExistingTableWriteExec { /** * The base physical plan for writing data into data source v2. */ -trait V2TableWriteExec extends V2CommandExec with UnaryExecNode with AdaptiveSparkPlanHelper { +trait V2TableWriteExec + extends V2CommandExec + with UnaryExecNode + with AdaptiveSparkPlanHelper + with SupportsCustomDriverMetrics { + def query: SparkPlan def writingTask: WritingSparkTask[_] = DataWritingSparkTask @@ -499,7 +490,7 @@ trait V2TableWriteExec extends V2CommandExec with UnaryExecNode with AdaptiveSpa override def child: SparkPlan = query override def output: Seq[Attribute] = Nil - protected val customMetrics: Map[String, SQLMetric] = Map.empty + override def customMetrics: Map[String, SQLMetric] = Map.empty protected def operationMetrics: Map[String, SQLMetric] = Map.empty override lazy val metrics = customMetrics ++ operationMetrics @@ -674,7 +665,7 @@ trait WritingSparkTask[W <: DataWriter[InternalRow]] extends Logging with Serial case class DataAndMetadataWritingSparkTask( dataProj: ProjectingInternalRow, metadataProj: ProjectingInternalRow, - operationMetrics: Map[String, SQLMetric]) + sparkMetrics: Map[String, SQLMetric] = Map.empty) extends WritingSparkTask[DataWriter[InternalRow]] { override protected def write( @@ -708,14 +699,14 @@ case class DataAndMetadataWritingSparkTask( } } - operationMetrics.get("numUpdatedRows").foreach(_.add(numUpdatedRows)) - operationMetrics.get("numCopiedRows").foreach(_.add(numCopiedRows)) + sparkMetrics.get("numUpdatedRows").foreach(_.add(numUpdatedRows)) + sparkMetrics.get("numCopiedRows").foreach(_.add(numCopiedRows)) } } case class DataWithProjectionWritingSparkTask( dataProj: ProjectingInternalRow, - operationMetrics: Map[String, SQLMetric]) + sparkMetrics: Map[String, SQLMetric] = Map.empty) extends WritingSparkTask[DataWriter[InternalRow]] { override protected def write( @@ -747,8 +738,8 @@ case class DataWithProjectionWritingSparkTask( } } - operationMetrics.get("numUpdatedRows").foreach(_.add(numUpdatedRows)) - operationMetrics.get("numCopiedRows").foreach(_.add(numCopiedRows)) + sparkMetrics.get("numUpdatedRows").foreach(_.add(numUpdatedRows)) + sparkMetrics.get("numCopiedRows").foreach(_.add(numCopiedRows)) } } @@ -761,7 +752,7 @@ object DataWritingSparkTask extends WritingSparkTask[DataWriter[InternalRow]] { case class DeltaWritingSparkTask( projections: WriteDeltaProjections, - operationMetrics: Map[String, SQLMetric]) + sparkMetrics: Map[String, SQLMetric] = Map.empty) extends WritingSparkTask[DeltaWriter[InternalRow]] { private lazy val rowProjection = projections.rowProjection.orNull @@ -804,14 +795,14 @@ case class DeltaWritingSparkTask( } } - operationMetrics.get("numUpdatedRows").foreach(_.add(numUpdatedRows)) - operationMetrics.get("numDeletedRows").foreach(_.add(numDeletedRows)) + sparkMetrics.get("numUpdatedRows").foreach(_.add(numUpdatedRows)) + sparkMetrics.get("numDeletedRows").foreach(_.add(numDeletedRows)) } } case class DeltaWithMetadataWritingSparkTask( projections: WriteDeltaProjections, - operationMetrics: Map[String, SQLMetric]) + sparkMetrics: Map[String, SQLMetric] = Map.empty) extends WritingSparkTask[DeltaWriter[InternalRow]] { private lazy val rowProjection = projections.rowProjection.orNull @@ -858,8 +849,8 @@ case class DeltaWithMetadataWritingSparkTask( } } - operationMetrics.get("numUpdatedRows").foreach(_.add(numUpdatedRows)) - operationMetrics.get("numDeletedRows").foreach(_.add(numDeletedRows)) + sparkMetrics.get("numUpdatedRows").foreach(_.add(numUpdatedRows)) + sparkMetrics.get("numDeletedRows").foreach(_.add(numDeletedRows)) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTests.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTests.scala index 26f64ceb33fe3..fb961657920c2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTests.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTests.scala @@ -18,6 +18,9 @@ package org.apache.spark.sql.connector import org.apache.spark.sql._ +import org.apache.spark.sql.QueryTest.withQueryExecutionsCaptured +import org.apache.spark.sql.connector.metric.NumDeletedRowsMetric +import org.apache.spark.sql.execution.datasources.v2.{DeleteFromTableExec, TruncateTableExec} import org.apache.spark.sql.internal.SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION import org.apache.spark.sql.sources.SimpleScanSource @@ -28,12 +31,30 @@ trait DeleteFromTests extends DatasourceV2SQLBase { protected val catalogAndNamespace: String + /** + * Runs `thunk` (which is expected to execute a single metadata-only DELETE or TRUNCATE) and + * asserts the number of deleted rows reported via the connector's driver metrics. + */ + protected def checkDeleteMetrics(numDeletedRows: Long)(thunk: => Unit): Unit = { + val plans = withQueryExecutionsCaptured(spark)(thunk).map(_.executedPlan).filter { + case _: DeleteFromTableExec | _: TruncateTableExec => true + case _ => false + } + assert(plans.size === 1, + s"expected exactly one metadata-only delete or truncate plan, got $plans") + val actual = plans.head.metrics(NumDeletedRowsMetric.NAME).value + assert(actual === numDeletedRows, + s"expected numDeletedRows=$numDeletedRows, got $actual") + } + test("DeleteFrom with v2 filtering: basic - delete all") { val t = s"${catalogAndNamespace}tbl" withTable(t) { sql(s"CREATE TABLE $t (id bigint, data string, p int) USING foo PARTITIONED BY (id, p)") sql(s"INSERT INTO $t VALUES (2L, 'a', 2), (2L, 'b', 3), (3L, 'c', 3)") - sql(s"DELETE FROM $t") + checkDeleteMetrics(numDeletedRows = 3) { + sql(s"DELETE FROM $t") + } checkAnswer(spark.table(t), Seq()) } } @@ -43,7 +64,9 @@ trait DeleteFromTests extends DatasourceV2SQLBase { withTable(t) { sql(s"CREATE TABLE $t (id bigint, data string, p int) USING foo PARTITIONED BY (id, p)") sql(s"INSERT INTO $t VALUES (2L, 'a', 2), (2L, 'b', 3), (3L, 'c', 3)") - sql(s"DELETE FROM $t WHERE id = 2") + checkDeleteMetrics(numDeletedRows = 2) { + sql(s"DELETE FROM $t WHERE id = 2") + } checkAnswer(spark.table(t), Seq( Row(3, "c", 3))) } @@ -54,7 +77,9 @@ trait DeleteFromTests extends DatasourceV2SQLBase { withTable(t) { sql(s"CREATE TABLE $t (id bigint, data string, p int) USING foo PARTITIONED BY (id, p)") sql(s"INSERT INTO $t VALUES (2L, 'a', 2), (2L, 'b', 3), (3L, 'c', 3)") - sql(s"DELETE FROM $t AS tbl WHERE tbl.id = 2") + checkDeleteMetrics(numDeletedRows = 2) { + sql(s"DELETE FROM $t AS tbl WHERE tbl.id = 2") + } checkAnswer(spark.table(t), Seq( Row(3, "c", 3))) } @@ -65,7 +90,9 @@ trait DeleteFromTests extends DatasourceV2SQLBase { withTable(t) { sql(s"CREATE TABLE $t (id bigint, data string, p int) USING foo PARTITIONED BY (id, p)") sql(s"INSERT INTO $t VALUES (2L, 'a', 2), (2L, 'b', 3), (3L, 'c', 3)") - sql(s"DELETE FROM $t AS tbl WHERE tbl.ID = 2") + checkDeleteMetrics(numDeletedRows = 2) { + sql(s"DELETE FROM $t AS tbl WHERE tbl.ID = 2") + } checkAnswer(spark.table(t), Seq( Row(3, "c", 3))) } @@ -129,7 +156,9 @@ trait DeleteFromTests extends DatasourceV2SQLBase { sql(s"CACHE TABLE view AS SELECT id FROM $t") assert(spark.table(view).count() == 3) - sql(s"DELETE FROM $t WHERE id = 2") + checkDeleteMetrics(numDeletedRows = 2) { + sql(s"DELETE FROM $t WHERE id = 2") + } assert(spark.table(view).count() == 1) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/TruncateTableSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/TruncateTableSuiteBase.scala index e9d5fe1e3fb15..472d035e6fec4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/TruncateTableSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/TruncateTableSuiteBase.scala @@ -18,10 +18,13 @@ package org.apache.spark.sql.execution.command import org.apache.spark.sql.{AnalysisException, QueryTest, Row} +import org.apache.spark.sql.QueryTest.withQueryExecutionsCaptured import org.apache.spark.sql.catalyst.analysis.NoSuchPartitionException import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.util.quoteIdentifier import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME +import org.apache.spark.sql.connector.metric.NumDeletedRowsMetric +import org.apache.spark.sql.execution.datasources.v2.TruncateTableExec import org.apache.spark.sql.internal.SQLConf /** @@ -37,6 +40,23 @@ import org.apache.spark.sql.internal.SQLConf trait TruncateTableSuiteBase extends QueryTest with DDLCommandTestUtils { override val command = "TRUNCATE TABLE" + /** + * Runs `thunk` (a TRUNCATE TABLE statement) and, when a v2 [[TruncateTableExec]] plan is + * produced, asserts the number of deleted rows reported via the connector's driver metrics. + * On the v1 command path no [[TruncateTableExec]] is emitted, so the assertion is skipped. + */ + protected def checkTruncateMetrics(numDeletedRows: Long)(thunk: => Unit): Unit = { + val plans = withQueryExecutionsCaptured(spark)(thunk).map(_.executedPlan).collect { + case t: TruncateTableExec => t + } + if (plans.nonEmpty) { + assert(plans.size === 1, s"expected exactly one TruncateTableExec, got $plans") + val actual = plans.head.metrics(NumDeletedRowsMetric.NAME).value + assert(actual === numDeletedRows, + s"expected numDeletedRows=$numDeletedRows, got $actual") + } + } + test("table does not exist") { withNamespaceAndTable("ns", "does_not_exist") { t => val parsed = CatalystSqlParser.parseMultipartIdentifier(t) @@ -53,7 +73,9 @@ trait TruncateTableSuiteBase extends QueryTest with DDLCommandTestUtils { sql(s"CREATE TABLE $t (c0 INT, c1 INT) $defaultUsing") sql(s"INSERT INTO $t SELECT 0, 1") - sql(s"TRUNCATE TABLE $t") + checkTruncateMetrics(numDeletedRows = 1) { + sql(s"TRUNCATE TABLE $t") + } QueryTest.checkAnswer(sql(s"SELECT * FROM $t"), Nil) } } @@ -159,7 +181,9 @@ trait TruncateTableSuiteBase extends QueryTest with DDLCommandTestUtils { checkAnswer( sql(s"SELECT width, length, height FROM $t"), Seq(Row(0, 0, 0), Row(1, 1, 1), Row(1, 2, 3))) - sql(s"TRUNCATE TABLE $t") + checkTruncateMetrics(numDeletedRows = 3) { + sql(s"TRUNCATE TABLE $t") + } checkAnswer(sql(s"SELECT width, length, height FROM $t"), Nil) checkPartitions(t, Map("width" -> "0", "length" -> "0"), @@ -203,7 +227,9 @@ trait TruncateTableSuiteBase extends QueryTest with DDLCommandTestUtils { sql(s"CACHE TABLE $t") assert(spark.catalog.isCached(t)) QueryTest.checkAnswer(sql(s"SELECT * FROM $t"), Row(0) :: Nil) - sql(s"TRUNCATE TABLE $t") + checkTruncateMetrics(numDeletedRows = 1) { + sql(s"TRUNCATE TABLE $t") + } assert(spark.catalog.isCached(t)) QueryTest.checkAnswer(sql(s"SELECT * FROM $t"), Nil) } From 8904a5ff2d33703de1cdb05c233403f509d9636c Mon Sep 17 00:00:00 2001 From: Ziya Mukhtarov Date: Thu, 23 Apr 2026 14:35:06 +0000 Subject: [PATCH 2/6] Remove metadata-only DELETE Metrics --- .../metric/NumDeletedRowsMetric.java | 48 ------------------- .../connector/catalog/InMemoryBaseTable.scala | 36 +------------- .../sql/connector/catalog/InMemoryTable.scala | 6 +-- .../catalog/InMemoryTableWithV2Filter.scala | 8 +--- .../v2/WriteToDataSourceV2Exec.scala | 17 ++++--- .../spark/sql/connector/DeleteFromTests.scala | 39 ++------------- .../command/TruncateTableSuiteBase.scala | 32 ++----------- 7 files changed, 21 insertions(+), 165 deletions(-) delete mode 100644 sql/catalyst/src/main/java/org/apache/spark/sql/connector/metric/NumDeletedRowsMetric.java diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/metric/NumDeletedRowsMetric.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/metric/NumDeletedRowsMetric.java deleted file mode 100644 index e8f89ceca801e..0000000000000 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/metric/NumDeletedRowsMetric.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.connector.metric; - -import org.apache.spark.annotation.Evolving; - -/** - * Standard {@link CustomMetric} for the number of rows deleted by a DELETE or TRUNCATE - * operation. Connectors implementing - * {@link org.apache.spark.sql.connector.catalog.SupportsDeleteV2} or - * {@link org.apache.spark.sql.connector.catalog.TruncatableTable} can declare this metric from - * their {@code supportedCustomMetrics()} to have Spark surface the value uniformly in the SQL UI - * and elsewhere. - * - * @since 4.2.0 - */ -@Evolving -public class NumDeletedRowsMetric extends CustomSumMetric { - /** - * Well-known metric name used across connectors and Spark's row-level DELETE path. - */ - public static final String NAME = "numDeletedRows"; - - @Override - public String name() { - return NAME; - } - - @Override - public String description() { - return "number of deleted rows"; - } -} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala index 6d802786c83fc..fd2c0f6e9c2ec 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.connector.catalog.constraints.Constraint import org.apache.spark.sql.connector.distributions.{Distribution, Distributions} import org.apache.spark.sql.connector.expressions._ import org.apache.spark.sql.connector.expressions.{Literal => V2Literal} -import org.apache.spark.sql.connector.metric.{CustomMetric, CustomSumMetric, CustomTaskMetric, NumDeletedRowsMetric} +import org.apache.spark.sql.connector.metric.{CustomMetric, CustomSumMetric, CustomTaskMetric} import org.apache.spark.sql.connector.read._ import org.apache.spark.sql.connector.read.colstats.{ColumnStatistics, Histogram, HistogramBin} import org.apache.spark.sql.connector.read.partitioning.{KeyGroupedPartitioning, Partitioning, UnknownPartitioning} @@ -1125,40 +1125,6 @@ class RowsReadCustomMetric extends CustomSumMetric { override def description(): String = "number of rows read" } -/** - * Mixin for [[InMemoryBaseTable]] subclasses that implement [[TruncatableTable]] (directly or via - * `SupportsDelete` / `SupportsDeleteV2`) and want to surface the number of rows removed by the - * most recent metadata-only DELETE or TRUNCATE. Subclasses call [[recordDeletedRows]] from their - * `deleteWhere` implementation with the set of partition keys being removed. - */ -trait InMemoryDeleteRowCountMetric extends TruncatableTable { self: InMemoryBaseTable => - - private var lastDeletedRowCount: Long = -1L - - protected def recordDeletedRows(keysToDelete: Iterable[Seq[Any]]): Unit = { - lastDeletedRowCount = keysToDelete.iterator - .flatMap(dataMap.get) - .flatten - .map(_.rows.size.toLong) - .sum - } - - override def supportedCustomMetrics(): Array[CustomMetric] = { - Array(new NumDeletedRowsMetric) - } - - override def reportDriverMetrics(): Array[CustomTaskMetric] = { - if (lastDeletedRowCount < 0) { - Array.empty - } else { - Array(new CustomTaskMetric { - override def name(): String = NumDeletedRowsMetric.NAME - override def value(): Long = lastDeletedRowCount - }) - } - } -} - case class Commit(id: Long, writeSummary: Option[WriteSummary] = None) sealed trait Operation diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTable.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTable.scala index ea2d90bf06e1e..d5738475031dc 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTable.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTable.scala @@ -49,7 +49,7 @@ class InMemoryTable( override val id: String = UUID.randomUUID().toString) extends InMemoryBaseTable(name, columns, partitioning, properties, constraints, distribution, ordering, numPartitions, advisoryPartitionSize, isDistributionStrictlyRequired, - numRowsPerSplit) with SupportsDelete with InMemoryDeleteRowCountMetric { + numRowsPerSplit) with SupportsDelete { def this( name: String, @@ -69,10 +69,8 @@ class InMemoryTable( override def deleteWhere(filters: Array[Filter]): Unit = dataMap.synchronized { import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper - val keysToDelete = InMemoryTable + dataMap --= InMemoryTable .filtersToKeys(dataMap.keys, partCols.map(_.toSeq.quoted).toImmutableArraySeq, filters) - recordDeletedRows(keysToDelete) - dataMap --= keysToDelete increaseVersion() } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableWithV2Filter.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableWithV2Filter.scala index 4dbef7bc08f1b..f2827faf59435 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableWithV2Filter.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableWithV2Filter.scala @@ -34,9 +34,7 @@ class InMemoryTableWithV2Filter( columns: Array[Column], partitioning: Array[Transform], properties: util.Map[String, String]) - extends InMemoryBaseTable(name, columns, partitioning, properties) - with SupportsDeleteV2 - with InMemoryDeleteRowCountMetric { + extends InMemoryBaseTable(name, columns, partitioning, properties) with SupportsDeleteV2 { override def canDeleteWhere(predicates: Array[Predicate]): Boolean = { InMemoryTableWithV2Filter.supportsPredicates(predicates) @@ -44,10 +42,8 @@ class InMemoryTableWithV2Filter( override def deleteWhere(filters: Array[Predicate]): Unit = dataMap.synchronized { import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper - val keysToDelete = InMemoryTableWithV2Filter + dataMap --= InMemoryTableWithV2Filter .filtersToKeys(dataMap.keys, partCols.map(_.toSeq.quoted).toImmutableArraySeq, filters) - recordDeletedRows(keysToDelete) - dataMap --= keysToDelete } override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala index 2b1b0e6c6abb1..c02eb19e2b855 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.util.{removeInternalMetadata, CharVarcharUt import org.apache.spark.sql.catalyst.util.RowDeltaUtils.{COPY_OPERATION, DELETE_OPERATION, INSERT_OPERATION, REINSERT_OPERATION, UPDATE_OPERATION} import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column, Identifier, StagedTable, StagingTableCatalog, Table, TableCatalog, TableInfo, TableWritePrivilege} import org.apache.spark.sql.connector.expressions.Transform -import org.apache.spark.sql.connector.metric.{CustomMetric, NumDeletedRowsMetric} +import org.apache.spark.sql.connector.metric.CustomMetric import org.apache.spark.sql.connector.write.{BatchWrite, DataWriter, DataWriterFactory, DeleteSummaryImpl, DeltaWrite, DeltaWriter, MergeSummaryImpl, PhysicalWriteInfoImpl, RowLevelOperation, RowLevelOperationTable, UpdateSummaryImpl, Write, WriterCommitMessage, WriteSummary} import org.apache.spark.sql.connector.write.RowLevelOperation.Command._ import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} @@ -350,7 +350,7 @@ case class ReplaceDataExec( // One of the metrics couldn't be found, also mark numDeletedRows as not found. -1L } - metrics(NumDeletedRowsMetric.NAME).set(numDeletedRows) + metrics("numDeletedRows").set(numDeletedRows) } super.getWriteSummary(query) } @@ -433,8 +433,7 @@ trait RowLevelWriteExec extends V2ExistingTableWriteExec { "numCopiedRows" -> SQLMetrics.createMetric(sparkContext, "number of copied rows")) case DELETE => Map( - NumDeletedRowsMetric.NAME -> - SQLMetrics.createMetric(sparkContext, "number of deleted rows"), + "numDeletedRows" -> SQLMetrics.createMetric(sparkContext, "number of deleted rows"), "numCopiedRows" -> SQLMetrics.createMetric(sparkContext, "number of copied rows")) case _ => Map.empty } @@ -467,7 +466,7 @@ trait RowLevelWriteExec extends V2ExistingTableWriteExec { getMetricValue(sparkMetrics, "numCopiedRows"))) case DELETE => Some(DeleteSummaryImpl( - getMetricValue(sparkMetrics, NumDeletedRowsMetric.NAME), + getMetricValue(sparkMetrics, "numDeletedRows"), getMetricValue(sparkMetrics, "numCopiedRows"))) } } @@ -665,7 +664,7 @@ trait WritingSparkTask[W <: DataWriter[InternalRow]] extends Logging with Serial case class DataAndMetadataWritingSparkTask( dataProj: ProjectingInternalRow, metadataProj: ProjectingInternalRow, - sparkMetrics: Map[String, SQLMetric] = Map.empty) + sparkMetrics: Map[String, SQLMetric]) extends WritingSparkTask[DataWriter[InternalRow]] { override protected def write( @@ -706,7 +705,7 @@ case class DataAndMetadataWritingSparkTask( case class DataWithProjectionWritingSparkTask( dataProj: ProjectingInternalRow, - sparkMetrics: Map[String, SQLMetric] = Map.empty) + sparkMetrics: Map[String, SQLMetric]) extends WritingSparkTask[DataWriter[InternalRow]] { override protected def write( @@ -752,7 +751,7 @@ object DataWritingSparkTask extends WritingSparkTask[DataWriter[InternalRow]] { case class DeltaWritingSparkTask( projections: WriteDeltaProjections, - sparkMetrics: Map[String, SQLMetric] = Map.empty) + sparkMetrics: Map[String, SQLMetric]) extends WritingSparkTask[DeltaWriter[InternalRow]] { private lazy val rowProjection = projections.rowProjection.orNull @@ -802,7 +801,7 @@ case class DeltaWritingSparkTask( case class DeltaWithMetadataWritingSparkTask( projections: WriteDeltaProjections, - sparkMetrics: Map[String, SQLMetric] = Map.empty) + sparkMetrics: Map[String, SQLMetric]) extends WritingSparkTask[DeltaWriter[InternalRow]] { private lazy val rowProjection = projections.rowProjection.orNull diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTests.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTests.scala index fb961657920c2..26f64ceb33fe3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTests.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTests.scala @@ -18,9 +18,6 @@ package org.apache.spark.sql.connector import org.apache.spark.sql._ -import org.apache.spark.sql.QueryTest.withQueryExecutionsCaptured -import org.apache.spark.sql.connector.metric.NumDeletedRowsMetric -import org.apache.spark.sql.execution.datasources.v2.{DeleteFromTableExec, TruncateTableExec} import org.apache.spark.sql.internal.SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION import org.apache.spark.sql.sources.SimpleScanSource @@ -31,30 +28,12 @@ trait DeleteFromTests extends DatasourceV2SQLBase { protected val catalogAndNamespace: String - /** - * Runs `thunk` (which is expected to execute a single metadata-only DELETE or TRUNCATE) and - * asserts the number of deleted rows reported via the connector's driver metrics. - */ - protected def checkDeleteMetrics(numDeletedRows: Long)(thunk: => Unit): Unit = { - val plans = withQueryExecutionsCaptured(spark)(thunk).map(_.executedPlan).filter { - case _: DeleteFromTableExec | _: TruncateTableExec => true - case _ => false - } - assert(plans.size === 1, - s"expected exactly one metadata-only delete or truncate plan, got $plans") - val actual = plans.head.metrics(NumDeletedRowsMetric.NAME).value - assert(actual === numDeletedRows, - s"expected numDeletedRows=$numDeletedRows, got $actual") - } - test("DeleteFrom with v2 filtering: basic - delete all") { val t = s"${catalogAndNamespace}tbl" withTable(t) { sql(s"CREATE TABLE $t (id bigint, data string, p int) USING foo PARTITIONED BY (id, p)") sql(s"INSERT INTO $t VALUES (2L, 'a', 2), (2L, 'b', 3), (3L, 'c', 3)") - checkDeleteMetrics(numDeletedRows = 3) { - sql(s"DELETE FROM $t") - } + sql(s"DELETE FROM $t") checkAnswer(spark.table(t), Seq()) } } @@ -64,9 +43,7 @@ trait DeleteFromTests extends DatasourceV2SQLBase { withTable(t) { sql(s"CREATE TABLE $t (id bigint, data string, p int) USING foo PARTITIONED BY (id, p)") sql(s"INSERT INTO $t VALUES (2L, 'a', 2), (2L, 'b', 3), (3L, 'c', 3)") - checkDeleteMetrics(numDeletedRows = 2) { - sql(s"DELETE FROM $t WHERE id = 2") - } + sql(s"DELETE FROM $t WHERE id = 2") checkAnswer(spark.table(t), Seq( Row(3, "c", 3))) } @@ -77,9 +54,7 @@ trait DeleteFromTests extends DatasourceV2SQLBase { withTable(t) { sql(s"CREATE TABLE $t (id bigint, data string, p int) USING foo PARTITIONED BY (id, p)") sql(s"INSERT INTO $t VALUES (2L, 'a', 2), (2L, 'b', 3), (3L, 'c', 3)") - checkDeleteMetrics(numDeletedRows = 2) { - sql(s"DELETE FROM $t AS tbl WHERE tbl.id = 2") - } + sql(s"DELETE FROM $t AS tbl WHERE tbl.id = 2") checkAnswer(spark.table(t), Seq( Row(3, "c", 3))) } @@ -90,9 +65,7 @@ trait DeleteFromTests extends DatasourceV2SQLBase { withTable(t) { sql(s"CREATE TABLE $t (id bigint, data string, p int) USING foo PARTITIONED BY (id, p)") sql(s"INSERT INTO $t VALUES (2L, 'a', 2), (2L, 'b', 3), (3L, 'c', 3)") - checkDeleteMetrics(numDeletedRows = 2) { - sql(s"DELETE FROM $t AS tbl WHERE tbl.ID = 2") - } + sql(s"DELETE FROM $t AS tbl WHERE tbl.ID = 2") checkAnswer(spark.table(t), Seq( Row(3, "c", 3))) } @@ -156,9 +129,7 @@ trait DeleteFromTests extends DatasourceV2SQLBase { sql(s"CACHE TABLE view AS SELECT id FROM $t") assert(spark.table(view).count() == 3) - checkDeleteMetrics(numDeletedRows = 2) { - sql(s"DELETE FROM $t WHERE id = 2") - } + sql(s"DELETE FROM $t WHERE id = 2") assert(spark.table(view).count() == 1) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/TruncateTableSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/TruncateTableSuiteBase.scala index 472d035e6fec4..e9d5fe1e3fb15 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/TruncateTableSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/TruncateTableSuiteBase.scala @@ -18,13 +18,10 @@ package org.apache.spark.sql.execution.command import org.apache.spark.sql.{AnalysisException, QueryTest, Row} -import org.apache.spark.sql.QueryTest.withQueryExecutionsCaptured import org.apache.spark.sql.catalyst.analysis.NoSuchPartitionException import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.util.quoteIdentifier import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME -import org.apache.spark.sql.connector.metric.NumDeletedRowsMetric -import org.apache.spark.sql.execution.datasources.v2.TruncateTableExec import org.apache.spark.sql.internal.SQLConf /** @@ -40,23 +37,6 @@ import org.apache.spark.sql.internal.SQLConf trait TruncateTableSuiteBase extends QueryTest with DDLCommandTestUtils { override val command = "TRUNCATE TABLE" - /** - * Runs `thunk` (a TRUNCATE TABLE statement) and, when a v2 [[TruncateTableExec]] plan is - * produced, asserts the number of deleted rows reported via the connector's driver metrics. - * On the v1 command path no [[TruncateTableExec]] is emitted, so the assertion is skipped. - */ - protected def checkTruncateMetrics(numDeletedRows: Long)(thunk: => Unit): Unit = { - val plans = withQueryExecutionsCaptured(spark)(thunk).map(_.executedPlan).collect { - case t: TruncateTableExec => t - } - if (plans.nonEmpty) { - assert(plans.size === 1, s"expected exactly one TruncateTableExec, got $plans") - val actual = plans.head.metrics(NumDeletedRowsMetric.NAME).value - assert(actual === numDeletedRows, - s"expected numDeletedRows=$numDeletedRows, got $actual") - } - } - test("table does not exist") { withNamespaceAndTable("ns", "does_not_exist") { t => val parsed = CatalystSqlParser.parseMultipartIdentifier(t) @@ -73,9 +53,7 @@ trait TruncateTableSuiteBase extends QueryTest with DDLCommandTestUtils { sql(s"CREATE TABLE $t (c0 INT, c1 INT) $defaultUsing") sql(s"INSERT INTO $t SELECT 0, 1") - checkTruncateMetrics(numDeletedRows = 1) { - sql(s"TRUNCATE TABLE $t") - } + sql(s"TRUNCATE TABLE $t") QueryTest.checkAnswer(sql(s"SELECT * FROM $t"), Nil) } } @@ -181,9 +159,7 @@ trait TruncateTableSuiteBase extends QueryTest with DDLCommandTestUtils { checkAnswer( sql(s"SELECT width, length, height FROM $t"), Seq(Row(0, 0, 0), Row(1, 1, 1), Row(1, 2, 3))) - checkTruncateMetrics(numDeletedRows = 3) { - sql(s"TRUNCATE TABLE $t") - } + sql(s"TRUNCATE TABLE $t") checkAnswer(sql(s"SELECT width, length, height FROM $t"), Nil) checkPartitions(t, Map("width" -> "0", "length" -> "0"), @@ -227,9 +203,7 @@ trait TruncateTableSuiteBase extends QueryTest with DDLCommandTestUtils { sql(s"CACHE TABLE $t") assert(spark.catalog.isCached(t)) QueryTest.checkAnswer(sql(s"SELECT * FROM $t"), Row(0) :: Nil) - checkTruncateMetrics(numDeletedRows = 1) { - sql(s"TRUNCATE TABLE $t") - } + sql(s"TRUNCATE TABLE $t") assert(spark.catalog.isCached(t)) QueryTest.checkAnswer(sql(s"SELECT * FROM $t"), Nil) } From 02b0dc0ead201136d64c8272eedf6313333ff7a2 Mon Sep 17 00:00:00 2001 From: Ziya Mukhtarov Date: Thu, 23 Apr 2026 14:47:05 +0000 Subject: [PATCH 3/6] Fix --- .../sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala | 3 --- 1 file changed, 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala index c02eb19e2b855..8fd46f0bb1212 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala @@ -490,9 +490,6 @@ trait V2TableWriteExec override def output: Seq[Attribute] = Nil override def customMetrics: Map[String, SQLMetric] = Map.empty - protected def operationMetrics: Map[String, SQLMetric] = Map.empty - - override lazy val metrics = customMetrics ++ operationMetrics protected def writeWithV2(batchWrite: BatchWrite): Seq[InternalRow] = { val rdd: RDD[InternalRow] = { From 48db8bc31b7b3c38dde97afa4e92c7b0b409ace0 Mon Sep 17 00:00:00 2001 From: Ziya Mukhtarov Date: Tue, 28 Apr 2026 05:15:17 +0000 Subject: [PATCH 4/6] Clean-up --- .../datasources/v2/SupportsCustomDriverMetrics.scala | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/SupportsCustomDriverMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/SupportsCustomDriverMetrics.scala index 2c6cd91d42392..72e99ce6ec16e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/SupportsCustomDriverMetrics.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/SupportsCustomDriverMetrics.scala @@ -28,8 +28,7 @@ import org.apache.spark.util.ArrayImplicits._ * operation has executed they call [[postDriverMetrics]] with the connector's reported values so * they are visible in the SQL UI. * - * Nodes that also expose Spark-owned metrics (for example a node-computed row counter or a - * row-level operation's numDeletedRows) should supply them via [[sparkMetrics]]. Names in + * Nodes that also expose Spark-owned metrics supply them via [[sparkMetrics]]. Names in * [[sparkMetrics]] are reserved: if the connector happens to report a value under the same name, * Spark's value wins and the connector's is dropped. */ @@ -41,8 +40,8 @@ trait SupportsCustomDriverMetrics { self: SparkPlan => def customMetrics: Map[String, SQLMetric] /** - * Spark-owned metrics that should appear alongside the connector-declared ones. Defaults to - * empty. Values under these names are owned by Spark and take precedence on a name collision. + * Spark-owned metrics that should appear alongside the connector-declared ones. Values under + * these names are owned by Spark and take precedence on a name collision. */ protected def sparkMetrics: Map[String, SQLMetric] = Map.empty From 165dc360cb323c525dc3511cdc630b7abe2e5480 Mon Sep 17 00:00:00 2001 From: Ziya Mukhtarov Date: Tue, 28 Apr 2026 16:38:59 +0000 Subject: [PATCH 5/6] Address comments --- .../spark/sql/connector/catalog/TruncatableTable.java | 4 ++++ .../sql/execution/datasources/v2/DeleteFromTableExec.scala | 7 +++++-- .../datasources/v2/SupportsCustomDriverMetrics.scala | 5 ++--- 3 files changed, 11 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TruncatableTable.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TruncatableTable.java index a1d8f59248b92..058e105758c72 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TruncatableTable.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TruncatableTable.java @@ -40,6 +40,8 @@ public interface TruncatableTable extends Table { /** * Returns an array of supported custom metrics with name and description. * By default it returns empty array. + * + * @since 4.2.0 */ default CustomMetric[] supportedCustomMetrics() { return new CustomMetric[]{}; @@ -49,6 +51,8 @@ default CustomMetric[] supportedCustomMetrics() { * Returns an array of custom metrics which are collected with values at the driver side only. * Note that these metrics must be included in the supported custom metrics reported by * `supportedCustomMetrics`. + * + * @since 4.2.0 */ default CustomTaskMetric[] reportDriverMetrics() { return new CustomTaskMetric[]{}; diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DeleteFromTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DeleteFromTableExec.scala index aa0746184ca42..2ef182206b919 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DeleteFromTableExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DeleteFromTableExec.scala @@ -34,8 +34,11 @@ case class DeleteFromTableExec( createCustomMetrics(table.supportedCustomMetrics()) override protected def run(): Seq[InternalRow] = { - table.deleteWhere(condition) - postDriverMetrics(table.reportDriverMetrics()) + try { + table.deleteWhere(condition) + } finally { + postDriverMetrics(table.reportDriverMetrics()) + } refreshCache() Seq.empty } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/SupportsCustomDriverMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/SupportsCustomDriverMetrics.scala index 72e99ce6ec16e..dc2dd2fb02a1e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/SupportsCustomDriverMetrics.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/SupportsCustomDriverMetrics.scala @@ -52,7 +52,7 @@ trait SupportsCustomDriverMetrics { self: SparkPlan => */ protected def createCustomMetrics(metrics: Array[CustomMetric]): Map[String, SQLMetric] = { metrics.map { m => - m.name() -> SQLMetrics.createV2CustomMetric(sparkContext, m) + m.name -> SQLMetrics.createV2CustomMetric(sparkContext, m) }.toMap } @@ -75,7 +75,6 @@ trait SupportsCustomDriverMetrics { self: SparkPlan => } } val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) - SQLMetrics.postDriverMetricUpdates( - sparkContext, executionId, updated.toImmutableArraySeq) + SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, updated.toImmutableArraySeq) } } From 3ab64c80a1cc637e285ff3672cbc109594315979 Mon Sep 17 00:00:00 2001 From: Ziya Mukhtarov Date: Thu, 30 Apr 2026 16:31:22 +0000 Subject: [PATCH 6/6] Wrap TruncateTableExec.run in try-finally --- .../sql/execution/datasources/v2/TruncateTableExec.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TruncateTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TruncateTableExec.scala index 084e2d54b5029..0bcfc4f364182 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TruncateTableExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TruncateTableExec.scala @@ -37,10 +37,11 @@ case class TruncateTableExec( override def output: Seq[Attribute] = Seq.empty override protected def run(): Seq[InternalRow] = { - if (table.truncateTable()) { + try { + if (table.truncateTable()) refreshCache() + Seq.empty + } finally { postDriverMetrics(table.reportDriverMetrics()) - refreshCache() } - Seq.empty } }