diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TruncatableTable.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TruncatableTable.java index 4bc2aa6e18ae..058e105758c7 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TruncatableTable.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TruncatableTable.java @@ -18,6 +18,8 @@ package org.apache.spark.sql.connector.catalog; import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.connector.metric.CustomMetric; +import org.apache.spark.sql.connector.metric.CustomTaskMetric; /** * Represents a table which can be atomically truncated. @@ -34,4 +36,25 @@ public interface TruncatableTable extends Table { * @since 3.2.0 */ boolean truncateTable(); + + /** + * Returns an array of supported custom metrics with name and description. + * By default it returns empty array. + * + * @since 4.2.0 + */ + default CustomMetric[] supportedCustomMetrics() { + return new CustomMetric[]{}; + } + + /** + * Returns an array of custom metrics which are collected with values at the driver side only. + * Note that these metrics must be included in the supported custom metrics reported by + * `supportedCustomMetrics`. + * + * @since 4.2.0 + */ + default CustomTaskMetric[] reportDriverMetrics() { + return new CustomTaskMetric[]{}; + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala index 28f4d12d366b..e9a18833ed9a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala @@ -129,7 +129,7 @@ case class BatchScanExec( new DataSourceRDD( sparkContext, filteredPartitions, readerFactory, supportsColumnar, customMetrics) } - postDriverMetrics() + postDriverMetrics(scan.reportDriverMetrics()) rdd } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ContinuousScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ContinuousScanExec.scala index e9e5f0f3175c..f4e5db153617 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ContinuousScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ContinuousScanExec.scala @@ -65,7 +65,7 @@ case class ContinuousScanExec( schema, readerFactory, customMetrics) - postDriverMetrics() + postDriverMetrics(scan.reportDriverMetrics()) inputRDD } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala index 0bec91803977..f00d8b9b82cb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala @@ -24,23 +24,22 @@ import org.apache.spark.sql.catalyst.plans.physical import org.apache.spark.sql.catalyst.plans.physical.KeyedPartitioning import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.connector.read.{HasPartitionKey, InputPartition, PartitionReaderFactory, Scan} -import org.apache.spark.sql.execution.{ExplainUtils, LeafExecNode, SafeForKWayMerge, SQLExecution} -import org.apache.spark.sql.execution.metric.SQLMetrics +import org.apache.spark.sql.execution.{ExplainUtils, LeafExecNode, SafeForKWayMerge} +import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.internal.connector.SupportsMetadata import org.apache.spark.sql.vectorized.ColumnarBatch -import org.apache.spark.util.ArrayImplicits._ import org.apache.spark.util.Utils -trait DataSourceV2ScanExecBase extends LeafExecNode with SafeForKWayMerge { +trait DataSourceV2ScanExecBase + extends LeafExecNode + with SafeForKWayMerge + with SupportsCustomDriverMetrics { - lazy val customMetrics = scan.supportedCustomMetrics().map { customMetric => - customMetric.name() -> SQLMetrics.createV2CustomMetric(sparkContext, customMetric) - }.toMap + override lazy val customMetrics: Map[String, SQLMetric] = + createCustomMetrics(scan.supportedCustomMetrics()) - override lazy val metrics = { - Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) ++ - customMetrics - } + override protected lazy val sparkMetrics: Map[String, SQLMetric] = + Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) def scan: Scan @@ -145,18 +144,6 @@ trait DataSourceV2ScanExecBase extends LeafExecNode with SafeForKWayMerge { } } - protected def postDriverMetrics(): Unit = { - val driveSQLMetrics = scan.reportDriverMetrics().map(customTaskMetric => { - val metric = metrics(customTaskMetric.name()) - metric.set(customTaskMetric.value()) - metric - }) - - val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) - SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, - driveSQLMetrics.toImmutableArraySeq) - } - override def doExecuteColumnar(): RDD[ColumnarBatch] = { val numOutputRows = longMetric("numOutputRows") inputRDD.asInstanceOf[RDD[ColumnarBatch]].map { b => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DeleteFromTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DeleteFromTableExec.scala index c6b1bae89b15..ea61bca26660 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DeleteFromTableExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DeleteFromTableExec.scala @@ -23,18 +23,29 @@ import org.apache.spark.sql.catalyst.transactions.TransactionUtils import org.apache.spark.sql.connector.catalog.SupportsDeleteV2 import org.apache.spark.sql.connector.catalog.transactions.Transaction import org.apache.spark.sql.connector.expressions.filter.Predicate +import org.apache.spark.sql.execution.metric.SQLMetric case class DeleteFromTableExec( table: SupportsDeleteV2, condition: Array[Predicate], refreshCache: () => Unit, - transaction: Option[Transaction] = None) extends LeafV2CommandExec with TransactionalExec { + transaction: Option[Transaction] = None) + extends LeafV2CommandExec + with TransactionalExec + with SupportsCustomDriverMetrics { + + override lazy val customMetrics: Map[String, SQLMetric] = + createCustomMetrics(table.supportedCustomMetrics()) override def withTransaction(txn: Option[Transaction]): DeleteFromTableExec = copy(transaction = txn) override protected def run(): Seq[InternalRow] = { - table.deleteWhere(condition) + try { + table.deleteWhere(condition) + } finally { + postDriverMetrics(table.reportDriverMetrics()) + } transaction.foreach(TransactionUtils.commit) refreshCache() Seq.empty diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MicroBatchScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MicroBatchScanExec.scala index f81ca001fbe2..bf958ba5aad4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MicroBatchScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MicroBatchScanExec.scala @@ -55,7 +55,7 @@ case class MicroBatchScanExec( override lazy val inputRDD: RDD[InternalRow] = { val inputRDD = new DataSourceRDD(sparkContext, partitions, readerFactory, supportsColumnar, customMetrics) - postDriverMetrics() + postDriverMetrics(scan.reportDriverMetrics()) inputRDD } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RealTimeStreamScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RealTimeStreamScanExec.scala index 9fff2d91af14..cb4867cf8c6a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RealTimeStreamScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RealTimeStreamScanExec.scala @@ -166,7 +166,7 @@ case class RealTimeStreamScanExec( supportsColumnar, customMetrics ) - postDriverMetrics() + postDriverMetrics(scan.reportDriverMetrics()) inputRDD } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/SupportsCustomDriverMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/SupportsCustomDriverMetrics.scala new file mode 100644 index 000000000000..dc2dd2fb02a1 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/SupportsCustomDriverMetrics.scala @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.sql.connector.metric.{CustomMetric, CustomTaskMetric} +import org.apache.spark.sql.execution.{SparkPlan, SQLExecution} +import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} +import org.apache.spark.util.ArrayImplicits._ + +/** + * A mixin for Spark plan nodes that expose driver-side custom metrics reported by a connector. + * Implementations declare the connector-owned metrics via [[customMetrics]]; after the underlying + * operation has executed they call [[postDriverMetrics]] with the connector's reported values so + * they are visible in the SQL UI. + * + * Nodes that also expose Spark-owned metrics supply them via [[sparkMetrics]]. Names in + * [[sparkMetrics]] are reserved: if the connector happens to report a value under the same name, + * Spark's value wins and the connector's is dropped. + */ +trait SupportsCustomDriverMetrics { self: SparkPlan => + + /** + * The custom metrics the connector supports for this operation, keyed by name. + */ + def customMetrics: Map[String, SQLMetric] + + /** + * Spark-owned metrics that should appear alongside the connector-declared ones. Values under + * these names are owned by Spark and take precedence on a name collision. + */ + protected def sparkMetrics: Map[String, SQLMetric] = Map.empty + + override lazy val metrics: Map[String, SQLMetric] = customMetrics ++ sparkMetrics + + /** + * Converts an array of connector-declared metrics into the map shape [[customMetrics]] uses. + */ + protected def createCustomMetrics(metrics: Array[CustomMetric]): Map[String, SQLMetric] = { + metrics.map { m => + m.name -> SQLMetrics.createV2CustomMetric(sparkContext, m) + }.toMap + } + + /** + * Applies the values reported by the connector to the declared metrics and posts them so the + * SQL UI reflects the final values. Metrics not declared via [[customMetrics]] are ignored. + * Metrics whose name collides with [[sparkMetrics]] are also ignored so Spark-owned values + * are preserved. + */ + protected def postDriverMetrics(taskMetrics: Array[CustomTaskMetric]): Unit = { + val updated = taskMetrics.flatMap { t => + if (sparkMetrics.contains(t.name())) { + // Spark metrics take precedence on collisions. + None + } else { + metrics.get(t.name()).map { metric => + metric.set(t.value()) + metric + } + } + } + val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) + SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, updated.toImmutableArraySeq) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TruncateTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TruncateTableExec.scala index 948dc1bc8c87..0bcfc4f36418 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TruncateTableExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TruncateTableExec.scala @@ -20,18 +20,28 @@ package org.apache.spark.sql.execution.datasources.v2 import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.connector.catalog.TruncatableTable +import org.apache.spark.sql.execution.metric.SQLMetric /** * Physical plan node for table truncation. */ case class TruncateTableExec( table: TruncatableTable, - refreshCache: () => Unit) extends LeafV2CommandExec { + refreshCache: () => Unit) + extends LeafV2CommandExec + with SupportsCustomDriverMetrics { + + override lazy val customMetrics: Map[String, SQLMetric] = + createCustomMetrics(table.supportedCustomMetrics()) override def output: Seq[Attribute] = Seq.empty override protected def run(): Seq[InternalRow] = { - if (table.truncateTable()) refreshCache() - Seq.empty + try { + if (table.truncateTable()) refreshCache() + Seq.empty + } finally { + postDriverMetrics(table.reportDriverMetrics()) + } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala index 3eadffb8f0ae..9e3f5e3e1d4f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala @@ -23,8 +23,8 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.classic.Dataset import org.apache.spark.sql.connector.catalog.SupportsWrite import org.apache.spark.sql.connector.write.V1Write -import org.apache.spark.sql.execution.{SparkPlan, SQLExecution} -import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.sources.InsertableRelation /** @@ -56,13 +56,15 @@ case class OverwriteByExpressionExecV1( write: V1Write) extends V1FallbackWriters /** Some helper interfaces that use V2 write semantics through the V1 writer interface. */ -sealed trait V1FallbackWriters extends LeafV2CommandExec with SupportsV1Write { +sealed trait V1FallbackWriters + extends LeafV2CommandExec + with SupportsV1Write + with SupportsCustomDriverMetrics { + override def output: Seq[Attribute] = Nil - override val metrics: Map[String, SQLMetric] = - write.supportedCustomMetrics().map { customMetric => - customMetric.name() -> SQLMetrics.createV2CustomMetric(sparkContext, customMetric) - }.toMap + override lazy val customMetrics: Map[String, SQLMetric] = + createCustomMetrics(write.supportedCustomMetrics()) def table: SupportsWrite def refreshCache: () => Unit @@ -75,12 +77,7 @@ sealed trait V1FallbackWriters extends LeafV2CommandExec with SupportsV1Write { Nil } finally { - write.reportDriverMetrics().foreach { customTaskMetric => - metrics.get(customTaskMetric.name()).foreach(_.set(customTaskMetric.value())) - } - - val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) - SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, metrics.values.toSeq) + postDriverMetrics(write.reportDriverMetrics()) } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala index 9e579ae779f3..ccfcdc1855f0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala @@ -36,7 +36,7 @@ import org.apache.spark.sql.connector.metric.CustomMetric import org.apache.spark.sql.connector.write.{BatchWrite, DataWriter, DataWriterFactory, DeleteSummaryImpl, DeltaWrite, DeltaWriter, MergeSummaryImpl, PhysicalWriteInfoImpl, RowLevelOperation, RowLevelOperationTable, UpdateSummaryImpl, Write, WriterCommitMessage, WriteSummary} import org.apache.spark.sql.connector.write.RowLevelOperation.Command._ import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} -import org.apache.spark.sql.execution.{QueryExecution, SparkPlan, SQLExecution, UnaryExecNode} +import org.apache.spark.sql.execution.{QueryExecution, SparkPlan, UnaryExecNode} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.execution.metric.{CustomMetrics, SQLMetric, SQLMetrics} import org.apache.spark.sql.types.StructType @@ -354,9 +354,9 @@ case class ReplaceDataExec( override def writingTask: WritingSparkTask[_] = { projections.metadataProjection match { case Some(metadataProj) => - DataAndMetadataWritingSparkTask(projections.rowProjection, metadataProj, operationMetrics) + DataAndMetadataWritingSparkTask(projections.rowProjection, metadataProj, sparkMetrics) case None => - DataWithProjectionWritingSparkTask(projections.rowProjection, operationMetrics) + DataWithProjectionWritingSparkTask(projections.rowProjection, sparkMetrics) } } @@ -400,9 +400,9 @@ case class WriteDeltaExec( override lazy val writingTask: WritingSparkTask[_] = { if (projections.metadataProjection.isDefined) { - DeltaWithMetadataWritingSparkTask(projections, operationMetrics) + DeltaWithMetadataWritingSparkTask(projections, sparkMetrics) } else { - DeltaWritingSparkTask(projections, operationMetrics) + DeltaWritingSparkTask(projections, sparkMetrics) } } @@ -424,9 +424,8 @@ case class WriteToDataSourceV2Exec( override def stringArgs: Iterator[Any] = Iterator(batchWrite, query) - override val customMetrics: Map[String, SQLMetric] = writeMetrics.map { customMetric => - customMetric.name() -> SQLMetrics.createV2CustomMetric(sparkContext, customMetric) - }.toMap + override lazy val customMetrics: Map[String, SQLMetric] = + createCustomMetrics(writeMetrics.toArray) override protected def run(): Seq[InternalRow] = { val writtenRows = writeWithV2(batchWrite) @@ -457,33 +456,19 @@ trait V2ExistingTableWriteExec extends V2TableWriteExec with TransactionalExec { override def nodeName: String = s"${super.nodeName} $tableName" - override val customMetrics: Map[String, SQLMetric] = - write.supportedCustomMetrics().map { customMetric => - customMetric.name() -> SQLMetrics.createV2CustomMetric(sparkContext, customMetric) - }.toMap + override lazy val customMetrics: Map[String, SQLMetric] = + createCustomMetrics(write.supportedCustomMetrics()) override protected def run(): Seq[InternalRow] = { val writtenRows = try { writeWithV2(write.toBatch) } finally { - postDriverMetrics() + postDriverMetrics(write.reportDriverMetrics()) } transaction.foreach(TransactionUtils.commit) refreshCache() writtenRows } - - protected def postDriverMetrics(): Unit = { - val driveSQLMetrics = write.reportDriverMetrics().map(customTaskMetric => { - val metric = metrics(customTaskMetric.name()) - metric.set(customTaskMetric.value()) - metric - }) - - val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) - SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, - driveSQLMetrics.toImmutableArraySeq) - } } /** @@ -492,7 +477,7 @@ trait V2ExistingTableWriteExec extends V2TableWriteExec with TransactionalExec { trait RowLevelWriteExec extends V2ExistingTableWriteExec { def rowLevelCommand: RowLevelOperation.Command - override lazy val operationMetrics: Map[String, SQLMetric] = rowLevelCommand match { + override protected lazy val sparkMetrics: Map[String, SQLMetric] = rowLevelCommand match { case UPDATE => Map( "numUpdatedRows" -> SQLMetrics.createMetric(sparkContext, "number of updated rows"), @@ -528,12 +513,12 @@ trait RowLevelWriteExec extends V2ExistingTableWriteExec { } case UPDATE => Some(UpdateSummaryImpl( - getMetricValue(operationMetrics, "numUpdatedRows"), - getMetricValue(operationMetrics, "numCopiedRows"))) + getMetricValue(sparkMetrics, "numUpdatedRows"), + getMetricValue(sparkMetrics, "numCopiedRows"))) case DELETE => Some(DeleteSummaryImpl( - getMetricValue(operationMetrics, "numDeletedRows"), - getMetricValue(operationMetrics, "numCopiedRows"))) + getMetricValue(sparkMetrics, "numDeletedRows"), + getMetricValue(sparkMetrics, "numCopiedRows"))) } } } @@ -541,7 +526,12 @@ trait RowLevelWriteExec extends V2ExistingTableWriteExec { /** * The base physical plan for writing data into data source v2. */ -trait V2TableWriteExec extends V2CommandExec with UnaryExecNode with AdaptiveSparkPlanHelper { +trait V2TableWriteExec + extends V2CommandExec + with UnaryExecNode + with AdaptiveSparkPlanHelper + with SupportsCustomDriverMetrics { + def query: SparkPlan def writingTask: WritingSparkTask[_] = DataWritingSparkTask @@ -550,10 +540,7 @@ trait V2TableWriteExec extends V2CommandExec with UnaryExecNode with AdaptiveSpa override def child: SparkPlan = query override def output: Seq[Attribute] = Nil - protected val customMetrics: Map[String, SQLMetric] = Map.empty - protected def operationMetrics: Map[String, SQLMetric] = Map.empty - - override lazy val metrics = customMetrics ++ operationMetrics + override def customMetrics: Map[String, SQLMetric] = Map.empty protected def writeWithV2(batchWrite: BatchWrite): Seq[InternalRow] = { val rdd: RDD[InternalRow] = { @@ -725,7 +712,7 @@ trait WritingSparkTask[W <: DataWriter[InternalRow]] extends Logging with Serial case class DataAndMetadataWritingSparkTask( dataProj: ProjectingInternalRow, metadataProj: ProjectingInternalRow, - operationMetrics: Map[String, SQLMetric]) + sparkMetrics: Map[String, SQLMetric]) extends WritingSparkTask[DataWriter[InternalRow]] { override protected def write( @@ -759,14 +746,14 @@ case class DataAndMetadataWritingSparkTask( } } - operationMetrics.get("numUpdatedRows").foreach(_.add(numUpdatedRows)) - operationMetrics.get("numCopiedRows").foreach(_.add(numCopiedRows)) + sparkMetrics.get("numUpdatedRows").foreach(_.add(numUpdatedRows)) + sparkMetrics.get("numCopiedRows").foreach(_.add(numCopiedRows)) } } case class DataWithProjectionWritingSparkTask( dataProj: ProjectingInternalRow, - operationMetrics: Map[String, SQLMetric]) + sparkMetrics: Map[String, SQLMetric]) extends WritingSparkTask[DataWriter[InternalRow]] { override protected def write( @@ -798,8 +785,8 @@ case class DataWithProjectionWritingSparkTask( } } - operationMetrics.get("numUpdatedRows").foreach(_.add(numUpdatedRows)) - operationMetrics.get("numCopiedRows").foreach(_.add(numCopiedRows)) + sparkMetrics.get("numUpdatedRows").foreach(_.add(numUpdatedRows)) + sparkMetrics.get("numCopiedRows").foreach(_.add(numCopiedRows)) } } @@ -812,7 +799,7 @@ object DataWritingSparkTask extends WritingSparkTask[DataWriter[InternalRow]] { case class DeltaWritingSparkTask( projections: WriteDeltaProjections, - operationMetrics: Map[String, SQLMetric]) + sparkMetrics: Map[String, SQLMetric]) extends WritingSparkTask[DeltaWriter[InternalRow]] { private lazy val rowProjection = projections.rowProjection.orNull @@ -855,14 +842,14 @@ case class DeltaWritingSparkTask( } } - operationMetrics.get("numUpdatedRows").foreach(_.add(numUpdatedRows)) - operationMetrics.get("numDeletedRows").foreach(_.add(numDeletedRows)) + sparkMetrics.get("numUpdatedRows").foreach(_.add(numUpdatedRows)) + sparkMetrics.get("numDeletedRows").foreach(_.add(numDeletedRows)) } } case class DeltaWithMetadataWritingSparkTask( projections: WriteDeltaProjections, - operationMetrics: Map[String, SQLMetric]) + sparkMetrics: Map[String, SQLMetric]) extends WritingSparkTask[DeltaWriter[InternalRow]] { private lazy val rowProjection = projections.rowProjection.orNull @@ -909,8 +896,8 @@ case class DeltaWithMetadataWritingSparkTask( } } - operationMetrics.get("numUpdatedRows").foreach(_.add(numUpdatedRows)) - operationMetrics.get("numDeletedRows").foreach(_.add(numDeletedRows)) + sparkMetrics.get("numUpdatedRows").foreach(_.add(numUpdatedRows)) + sparkMetrics.get("numDeletedRows").foreach(_.add(numDeletedRows)) } }