diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteDeleteFromTable.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteDeleteFromTable.scala index 13cfc6b73ccbc..f8881e2077103 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteDeleteFromTable.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteDeleteFromTable.scala @@ -86,7 +86,7 @@ object RewriteDeleteFromTable extends RewriteRowLevelCommand { // build a plan to replace read groups in the table val writeRelation = relation.copy(table = operationTable) - val query = addOperationColumn(WRITE_WITH_METADATA_OPERATION, remainingRowsPlan) + val query = addOperationColumn(COPY_OPERATION, remainingRowsPlan) val projections = buildReplaceDataProjections(query, relation.output, metadataAttrs) val groupFilterCond = if (groupFilterEnabled) Some(cond) else None ReplaceData(writeRelation, cond, query, relation, projections, groupFilterCond) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala index 8ff734c7a9a09..f21f53a28300d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.plans.{FullOuter, Inner, JoinType, LeftAnti, LeftOuter, RightOuter} import org.apache.spark.sql.catalyst.plans.logical.{AppendData, DeleteAction, Filter, HintInfo, InsertAction, Join, JoinHint, LogicalPlan, MergeAction, MergeIntoTable, MergeRows, NO_BROADCAST_AND_REPLICATION, Project, ReplaceData, UpdateAction, WriteDelta} import org.apache.spark.sql.catalyst.plans.logical.MergeRows.{Copy, Delete, Discard, Insert, Instruction, Keep, ROW_ID, Split, Update} -import org.apache.spark.sql.catalyst.util.RowDeltaUtils.{OPERATION_COLUMN, WRITE_OPERATION, WRITE_WITH_METADATA_OPERATION} +import org.apache.spark.sql.catalyst.util.RowDeltaUtils.{COPY_OPERATION, INSERT_OPERATION, OPERATION_COLUMN, UPDATE_OPERATION} import org.apache.spark.sql.connector.catalog.SupportsRowLevelOperations import org.apache.spark.sql.connector.write.{RowLevelOperationTable, SupportsDelta} import org.apache.spark.sql.connector.write.RowLevelOperation.Command.MERGE @@ -202,7 +202,7 @@ object RewriteMergeIntoTable extends RewriteRowLevelCommand with PredicateHelper // that's why an extra unconditional instruction that would produce the original row is added // as the last MATCHED and NOT MATCHED BY SOURCE instruction // this logic is specific to data sources that replace groups of data - val carryoverRowsOutput = Literal(WRITE_WITH_METADATA_OPERATION) +: targetTable.output + val carryoverRowsOutput = Literal(COPY_OPERATION) +: targetTable.output val keepCarryoverRowsInstruction = Keep(Copy, TrueLiteral, carryoverRowsOutput) val matchedInstructions = matchedActions.map { action => @@ -439,7 +439,7 @@ object RewriteMergeIntoTable extends RewriteRowLevelCommand with PredicateHelper case UpdateAction(cond, assignments, _) => val rowValues = assignments.map(_.value) val metadataValues = nullifyMetadataOnUpdate(metadataAttrs) - val output = Seq(Literal(WRITE_WITH_METADATA_OPERATION)) ++ rowValues ++ metadataValues + val output = Seq(Literal(UPDATE_OPERATION)) ++ rowValues ++ metadataValues Keep(Update, cond.getOrElse(TrueLiteral), output) case DeleteAction(cond) => @@ -448,7 +448,7 @@ object RewriteMergeIntoTable extends RewriteRowLevelCommand with PredicateHelper case InsertAction(cond, assignments) => val rowValues = assignments.map(_.value) val metadataValues = metadataAttrs.map(attr => Literal(null, attr.dataType)) - val output = Seq(Literal(WRITE_OPERATION)) ++ rowValues ++ metadataValues + val output = Seq(Literal(INSERT_OPERATION)) ++ rowValues ++ metadataValues Keep(Insert, cond.getOrElse(TrueLiteral), output) case other => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteRowLevelCommand.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteRowLevelCommand.scala index c5b81dec87c96..48c48eb323bd7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteRowLevelCommand.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteRowLevelCommand.scala @@ -21,8 +21,8 @@ import scala.collection.mutable import org.apache.spark.SparkException import org.apache.spark.sql.catalyst.ProjectingInternalRow -import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, AttributeSet, Expression, ExprId, Literal, MetadataAttribute, NamedExpression, V2ExpressionUtils} -import org.apache.spark.sql.catalyst.plans.logical.{Assignment, Expand, LogicalPlan, MergeRows, Project} +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, AttributeSet, Expression, ExprId, If, Literal, MetadataAttribute, NamedExpression, V2ExpressionUtils} +import org.apache.spark.sql.catalyst.plans.logical.{Assignment, Expand, LogicalPlan, MergeRows, Project, Union} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.util.{ReplaceDataProjections, WriteDeltaProjections} import org.apache.spark.sql.catalyst.util.RowDeltaUtils._ @@ -38,11 +38,11 @@ import org.apache.spark.util.ArrayImplicits._ trait RewriteRowLevelCommand extends Rule[LogicalPlan] { - private final val DELTA_OPERATIONS_WITH_ROW = - Set(UPDATE_OPERATION, REINSERT_OPERATION, INSERT_OPERATION) - private final val DELTA_OPERATIONS_WITH_METADATA = - Set(DELETE_OPERATION, UPDATE_OPERATION, REINSERT_OPERATION) - private final val DELTA_OPERATIONS_WITH_ROW_ID = + private final val OPERATIONS_WITH_ROW = + Set(UPDATE_OPERATION, REINSERT_OPERATION, INSERT_OPERATION, COPY_OPERATION) + private final val OPERATIONS_WITH_METADATA = + Set(DELETE_OPERATION, UPDATE_OPERATION, REINSERT_OPERATION, COPY_OPERATION) + private final val OPERATIONS_WITH_ROW_ID = Set(DELETE_OPERATION, UPDATE_OPERATION) protected def groupFilterEnabled: Boolean = conf.runtimeRowLevelOperationGroupFilterEnabled @@ -191,11 +191,11 @@ trait RewriteRowLevelCommand extends Rule[LogicalPlan] { metadataAttrs: Seq[Attribute]): ReplaceDataProjections = { val outputs = extractOutputs(plan) - val outputsWithRow = filterOutputs(outputs, Set(WRITE_WITH_METADATA_OPERATION, WRITE_OPERATION)) + val outputsWithRow = filterOutputs(outputs, OPERATIONS_WITH_ROW) val rowProjection = newLazyProjection(plan, outputsWithRow, rowAttrs) val metadataProjection = if (metadataAttrs.nonEmpty) { - val outputsWithMetadata = filterOutputs(outputs, Set(WRITE_WITH_METADATA_OPERATION)) + val outputsWithMetadata = filterOutputs(outputs, OPERATIONS_WITH_METADATA) Some(newLazyProjection(plan, outputsWithMetadata, metadataAttrs)) } else { None @@ -212,17 +212,17 @@ trait RewriteRowLevelCommand extends Rule[LogicalPlan] { val outputs = extractOutputs(plan) val rowProjection = if (rowAttrs.nonEmpty) { - val outputsWithRow = filterOutputs(outputs, DELTA_OPERATIONS_WITH_ROW) + val outputsWithRow = filterOutputs(outputs, OPERATIONS_WITH_ROW) Some(newLazyProjection(plan, outputsWithRow, rowAttrs)) } else { None } - val outputsWithRowId = filterOutputs(outputs, DELTA_OPERATIONS_WITH_ROW_ID) + val outputsWithRowId = filterOutputs(outputs, OPERATIONS_WITH_ROW_ID) val rowIdProjection = newLazyRowIdProjection(plan, outputsWithRowId, rowIdAttrs) val metadataProjection = if (metadataAttrs.nonEmpty) { - val outputsWithMetadata = filterOutputs(outputs, DELTA_OPERATIONS_WITH_METADATA) + val outputsWithMetadata = filterOutputs(outputs, OPERATIONS_WITH_METADATA) Some(newLazyProjection(plan, outputsWithMetadata, metadataAttrs)) } else { None @@ -236,6 +236,7 @@ trait RewriteRowLevelCommand extends Rule[LogicalPlan] { case p: Project => Seq(p.projectList) case e: Expand => e.projections case m: MergeRows => m.outputs + case u: Union => u.children.flatMap(extractOutputs) case _ => throw SparkException.internalError("Can't extract outputs from plan: " + plan) } } @@ -243,11 +244,13 @@ trait RewriteRowLevelCommand extends Rule[LogicalPlan] { private def filterOutputs( outputs: Seq[Seq[Expression]], operations: Set[Int]): Seq[Seq[Expression]] = { - outputs.filter { - case Literal(operation: Integer, _) +: _ => operations.contains(operation) - case Alias(Literal(operation: Integer, _), _) +: _ => operations.contains(operation) + def matches(expr: Expression): Boolean = expr match { + case Literal(operation: Integer, _) => operations.contains(operation) + case Alias(child, _) => matches(child) + case If(_, trueValue, falseValue) => matches(trueValue) && matches(falseValue) case other => throw SparkException.internalError("Can't determine operation: " + other) } + outputs.filter(output => matches(output.head)) } private def newLazyProjection( diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteUpdateTable.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteUpdateTable.scala index caf7579da889a..3c41b6bfa5683 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteUpdateTable.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteUpdateTable.scala @@ -72,12 +72,10 @@ object RewriteUpdateTable extends RewriteRowLevelCommand { val readRelation = buildRelationWithAttrs(relation, operationTable, metadataAttrs) // build a plan with updated and copied over records - val updatedAndRemainingRowsPlan = buildReplaceDataUpdateProjection( - readRelation, assignments, cond) + val query = buildReplaceDataUpdateProjection(readRelation, assignments, cond) // build a plan to replace read groups in the table val writeRelation = relation.copy(table = operationTable) - val query = addOperationColumn(WRITE_WITH_METADATA_OPERATION, updatedAndRemainingRowsPlan) val projections = buildReplaceDataProjections(query, relation.output, metadataAttrs) val groupFilterCond = if (groupFilterEnabled) Some(cond) else None ReplaceData(writeRelation, cond, query, relation, projections, groupFilterCond) @@ -105,14 +103,14 @@ object RewriteUpdateTable extends RewriteRowLevelCommand { // build a plan that contains unmatched rows in matched groups that must be copied over val remainingRowFilter = Not(EqualNullSafe(cond, Literal.TrueLiteral)) - val remainingRowsPlan = Filter(remainingRowFilter, readRelation) + val remainingRowsPlan = addOperationColumn(COPY_OPERATION, + Filter(remainingRowFilter, readRelation)) // the new state is a union of updated and copied over records - val updatedAndRemainingRowsPlan = Union(updatedRowsPlan, remainingRowsPlan) + val query = Union(updatedRowsPlan, remainingRowsPlan) // build a plan to replace read groups in the table val writeRelation = relation.copy(table = operationTable) - val query = addOperationColumn(WRITE_WITH_METADATA_OPERATION, updatedAndRemainingRowsPlan) val projections = buildReplaceDataProjections(query, relation.output, metadataAttrs) val groupFilterCond = if (groupFilterEnabled) Some(cond) else None ReplaceData(writeRelation, cond, query, relation, projections, groupFilterCond) @@ -143,7 +141,9 @@ object RewriteUpdateTable extends RewriteRowLevelCommand { } } - Project(updatedValues, plan) + val writeOp = If(cond, Literal(UPDATE_OPERATION), Literal(COPY_OPERATION)) + val operationCol = Alias(writeOp, OPERATION_COLUMN)() + Project(operationCol +: updatedValues, plan) } // build a rewrite plan for sources that support row deltas diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/RowDeltaUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/RowDeltaUtils.scala index 72baad069b180..8b86d530550b9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/RowDeltaUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/RowDeltaUtils.scala @@ -26,7 +26,6 @@ object RowDeltaUtils { final val UPDATE_OPERATION: Int = 2 final val INSERT_OPERATION: Int = 3 final val REINSERT_OPERATION: Int = 4 - final val WRITE_OPERATION: Int = 5 - final val WRITE_WITH_METADATA_OPERATION: Int = 6 + final val COPY_OPERATION: Int = 5 final val ORIGINAL_ROW_ID_VALUE_PREFIX: String = "__original_row_id_" } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTable.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTable.scala index 2f7cad5992153..91e899bc1169e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTable.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTable.scala @@ -51,6 +51,8 @@ class InMemoryRowLevelOperationTable( private final val INDEX_COLUMN_REF = FieldReference(IndexColumn.name) private final val SUPPORTS_DELTAS = "supports-deltas" private final val SPLIT_UPDATES = "split-updates" + private final val NO_METADATA = "no-metadata" + private final val noMetadata = properties.getOrDefault(NO_METADATA, "false") == "true" // used in row-level operation tests to verify replaced partitions var replacedPartitions: Seq[Seq[Any]] = Seq.empty @@ -73,7 +75,11 @@ class InMemoryRowLevelOperationTable( var configuredScan: InMemoryBatchScan = _ override def requiredMetadataAttributes(): Array[NamedReference] = { - Array(PARTITION_COLUMN_REF, INDEX_COLUMN_REF) + if (noMetadata) { + Array.empty + } else { + Array(PARTITION_COLUMN_REF, INDEX_COLUMN_REF) + } } override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = { @@ -89,22 +95,29 @@ class InMemoryRowLevelOperationTable( override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = { lastWriteInfo = info new WriteBuilder { - override def build(): Write = new Write with RequiresDistributionAndOrdering { - override def requiredDistribution: Distribution = { - Distributions.clustered(Array(PARTITION_COLUMN_REF)) + override def build(): Write = if (noMetadata) { + new Write { + override def toBatch: BatchWrite = PartitionBasedReplaceData(configuredScan) + override def description: String = "InMemoryWrite" } - - override def requiredOrdering: Array[SortOrder] = { - Array[SortOrder]( - LogicalExpressions.sort( - PARTITION_COLUMN_REF, - SortDirection.ASCENDING, - SortDirection.ASCENDING.defaultNullOrdering())) + } else { + new Write with RequiresDistributionAndOrdering { + override def requiredDistribution: Distribution = { + Distributions.clustered(Array(PARTITION_COLUMN_REF)) + } + + override def requiredOrdering: Array[SortOrder] = { + Array[SortOrder]( + LogicalExpressions.sort( + PARTITION_COLUMN_REF, + SortDirection.ASCENDING, + SortDirection.ASCENDING.defaultNullOrdering())) + } + + override def toBatch: BatchWrite = PartitionBasedReplaceData(configuredScan) + + override def description: String = "InMemoryWrite" } - - override def toBatch: BatchWrite = PartitionBasedReplaceData(configuredScan) - - override def description: String = "InMemoryWrite" } } } @@ -138,7 +151,11 @@ class InMemoryRowLevelOperationTable( private final val PK_COLUMN_REF = FieldReference("pk") override def requiredMetadataAttributes(): Array[NamedReference] = { - Array(PARTITION_COLUMN_REF, INDEX_COLUMN_REF) + if (noMetadata) { + Array.empty + } else { + Array(PARTITION_COLUMN_REF, INDEX_COLUMN_REF) + } } override def rowId(): Array[NamedReference] = Array(PK_COLUMN_REF) @@ -150,22 +167,28 @@ class InMemoryRowLevelOperationTable( override def newWriteBuilder(info: LogicalWriteInfo): DeltaWriteBuilder = { lastWriteInfo = info new DeltaWriteBuilder { - override def build(): DeltaWrite = new DeltaWrite with RequiresDistributionAndOrdering { - - override def requiredDistribution(): Distribution = { - Distributions.clustered(Array(PARTITION_COLUMN_REF)) + override def build(): DeltaWrite = if (noMetadata) { + new DeltaWrite { + override def toBatch: DeltaBatchWrite = TestDeltaBatchWrite } - - override def requiredOrdering(): Array[SortOrder] = { - Array[SortOrder]( - LogicalExpressions.sort( - PARTITION_COLUMN_REF, - SortDirection.ASCENDING, - SortDirection.ASCENDING.defaultNullOrdering()) - ) + } else { + new DeltaWrite with RequiresDistributionAndOrdering { + + override def requiredDistribution(): Distribution = { + Distributions.clustered(Array(PARTITION_COLUMN_REF)) + } + + override def requiredOrdering(): Array[SortOrder] = { + Array[SortOrder]( + LogicalExpressions.sort( + PARTITION_COLUMN_REF, + SortDirection.ASCENDING, + SortDirection.ASCENDING.defaultNullOrdering()) + ) + } + + override def toBatch: DeltaBatchWrite = TestDeltaBatchWrite } - - override def toBatch: DeltaBatchWrite = TestDeltaBatchWrite } } } @@ -208,7 +231,8 @@ private class DeltaBufferWriter(schema: StructType) extends BufferWriter(schema) override def delete(meta: InternalRow, id: InternalRow): Unit = { val pk = id.getInt(0) buffer.deletes += pk - val logEntry = new GenericInternalRow(Array[Any](DELETE, pk, meta.copy(), null)) + val metaCopy = if (meta != null) meta.copy() else null + val logEntry = new GenericInternalRow(Array[Any](DELETE, pk, metaCopy, null)) buffer.log += logEntry } @@ -216,13 +240,15 @@ private class DeltaBufferWriter(schema: StructType) extends BufferWriter(schema) val pk = id.getInt(0) buffer.deletes += pk buffer.rows.append(row.copy()) - val logEntry = new GenericInternalRow(Array[Any](UPDATE, pk, meta.copy(), row.copy())) + val metaCopy = if (meta != null) meta.copy() else null + val logEntry = new GenericInternalRow(Array[Any](UPDATE, pk, metaCopy, row.copy())) buffer.log += logEntry } override def reinsert(meta: InternalRow, row: InternalRow): Unit = { buffer.rows.append(row.copy()) - val logEntry = new GenericInternalRow(Array[Any](REINSERT, null, meta.copy(), row.copy())) + val metaCopy = if (meta != null) meta.copy() else null + val logEntry = new GenericInternalRow(Array[Any](REINSERT, null, metaCopy, row.copy())) buffer.log += logEntry } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala index 5a2da729c1b52..98e0c6f66deaf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.analysis.NoSuchTableException import org.apache.spark.sql.catalyst.expressions.{Attribute, Literal} import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, OverwriteByExpression, TableSpec, UnaryNode} import org.apache.spark.sql.catalyst.util.{removeInternalMetadata, CharVarcharUtils, ReplaceDataProjections, WriteDeltaProjections} -import org.apache.spark.sql.catalyst.util.RowDeltaUtils.{DELETE_OPERATION, INSERT_OPERATION, REINSERT_OPERATION, UPDATE_OPERATION, WRITE_OPERATION, WRITE_WITH_METADATA_OPERATION} +import org.apache.spark.sql.catalyst.util.RowDeltaUtils.{COPY_OPERATION, DELETE_OPERATION, INSERT_OPERATION, REINSERT_OPERATION, UPDATE_OPERATION} import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column, Identifier, StagedTable, StagingTableCatalog, Table, TableCatalog, TableInfo, TableWritePrivilege} import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.connector.metric.CustomMetric @@ -311,14 +311,14 @@ case class ReplaceDataExec( query: SparkPlan, refreshCache: () => Unit, projections: ReplaceDataProjections, - write: Write) extends V2ExistingTableWriteExec { + write: Write) extends RowLevelWriteExec { override def writingTask: WritingSparkTask[_] = { - projections match { - case ReplaceDataProjections(dataProj, Some(metadataProj)) => - DataAndMetadataWritingSparkTask(dataProj, metadataProj) - case _ => - DataWritingSparkTask + projections.metadataProjection match { + case Some(metadataProj) => + DataAndMetadataWritingSparkTask(projections.rowProjection, metadataProj) + case None => + DataWithProjectionWritingSparkTask(projections.rowProjection) } } @@ -334,7 +334,7 @@ case class WriteDeltaExec( query: SparkPlan, refreshCache: () => Unit, projections: WriteDeltaProjections, - write: DeltaWrite) extends V2ExistingTableWriteExec { + write: DeltaWrite) extends RowLevelWriteExec { override lazy val writingTask: WritingSparkTask[_] = { if (projections.metadataProjection.isDefined) { @@ -405,6 +405,33 @@ trait V2ExistingTableWriteExec extends V2TableWriteExec { } } +/** + * A trait for row-level write operations (UPDATE, DELETE, MERGE). + */ +trait RowLevelWriteExec extends V2ExistingTableWriteExec { + /** + * Returns the value of the named metric, or -1 if the metric is not found. + */ + private def getMetricValue(metrics: Map[String, SQLMetric], name: String): Long = { + metrics.get(name).map(_.value).getOrElse(-1L) + } + + override protected def getWriteSummary(query: SparkPlan): Option[WriteSummary] = { + collectFirst(query) { case m: MergeRowsExec => m }.map { n => + val metrics = n.metrics + MergeSummaryImpl( + getMetricValue(metrics, "numTargetRowsCopied"), + getMetricValue(metrics, "numTargetRowsDeleted"), + getMetricValue(metrics, "numTargetRowsUpdated"), + getMetricValue(metrics, "numTargetRowsInserted"), + getMetricValue(metrics, "numTargetRowsMatchedUpdated"), + getMetricValue(metrics, "numTargetRowsMatchedDeleted"), + getMetricValue(metrics, "numTargetRowsNotMatchedBySourceUpdated"), + getMetricValue(metrics, "numTargetRowsNotMatchedBySourceDeleted")) + } + } +} + /** * The base physical plan for writing data into data source v2. */ @@ -489,21 +516,7 @@ trait V2TableWriteExec extends V2CommandExec with UnaryExecNode with AdaptiveSpa Nil } - private def getWriteSummary(query: SparkPlan): Option[WriteSummary] = { - collectFirst(query) { case m: MergeRowsExec => m }.map { n => - val metrics = n.metrics - MergeSummaryImpl( - metrics.get("numTargetRowsCopied").map(_.value).getOrElse(-1L), - metrics.get("numTargetRowsDeleted").map(_.value).getOrElse(-1L), - metrics.get("numTargetRowsUpdated").map(_.value).getOrElse(-1L), - metrics.get("numTargetRowsInserted").map(_.value).getOrElse(-1L), - metrics.get("numTargetRowsMatchedUpdated").map(_.value).getOrElse(-1L), - metrics.get("numTargetRowsMatchedDeleted").map(_.value).getOrElse(-1L), - metrics.get("numTargetRowsNotMatchedBySourceUpdated").map(_.value).getOrElse(-1L), - metrics.get("numTargetRowsNotMatchedBySourceDeleted").map(_.value).getOrElse(-1L) - ) - } - } + protected def getWriteSummary(query: SparkPlan): Option[WriteSummary] = None } trait WritingSparkTask[W <: DataWriter[InternalRow]] extends Logging with Serializable { @@ -612,12 +625,33 @@ case class DataAndMetadataWritingSparkTask( val operation = row.getInt(0) operation match { - case WRITE_WITH_METADATA_OPERATION => + case UPDATE_OPERATION | COPY_OPERATION => dataProj.project(row) metadataProj.project(row) writer.write(metadataProj, dataProj) - case WRITE_OPERATION => + case INSERT_OPERATION => + dataProj.project(row) + writer.write(dataProj) + + case other => + throw new SparkException(s"Unexpected operation ID: $other") + } + } + } +} + +case class DataWithProjectionWritingSparkTask( + dataProj: ProjectingInternalRow) extends WritingSparkTask[DataWriter[InternalRow]] { + + override protected def write( + writer: DataWriter[InternalRow], iter: java.util.Iterator[InternalRow]): Unit = { + while (iter.hasNext) { + val row = iter.next() + val operation = row.getInt(0) + + operation match { + case UPDATE_OPERATION | COPY_OPERATION | INSERT_OPERATION => dataProj.project(row) writer.write(dataProj) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DeltaBasedNoMetadataDeleteFromTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DeltaBasedNoMetadataDeleteFromTableSuite.scala new file mode 100644 index 0000000000000..73407d640923a --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DeltaBasedNoMetadataDeleteFromTableSuite.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector + +class DeltaBasedNoMetadataDeleteFromTableSuite extends DeleteFromTableSuiteBase { + + override protected def extraTableProps: java.util.Map[String, String] = { + val props = new java.util.HashMap[String, String]() + props.put("supports-deltas", "true") + props.put("no-metadata", "true") + props + } + + override def enforceCheckConstraintOnDelete: Boolean = false +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DeltaBasedNoMetadataMergeIntoTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DeltaBasedNoMetadataMergeIntoTableSuite.scala new file mode 100644 index 0000000000000..d6e1484253135 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DeltaBasedNoMetadataMergeIntoTableSuite.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector + +class DeltaBasedNoMetadataMergeIntoTableSuite extends MergeIntoTableSuiteBase { + + override protected def deltaMerge = true + + override protected def extraTableProps: java.util.Map[String, String] = { + val props = new java.util.HashMap[String, String]() + props.put("supports-deltas", "true") + props.put("no-metadata", "true") + props + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DeltaBasedNoMetadataUpdateTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DeltaBasedNoMetadataUpdateTableSuite.scala new file mode 100644 index 0000000000000..15ff7688a26b1 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DeltaBasedNoMetadataUpdateTableSuite.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector + +class DeltaBasedNoMetadataUpdateTableSuite extends UpdateTableSuiteBase { + + override protected def extraTableProps: java.util.Map[String, String] = { + val props = new java.util.HashMap[String, String]() + props.put("supports-deltas", "true") + props.put("no-metadata", "true") + props + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/GroupBasedNoMetadataDeleteFromTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/GroupBasedNoMetadataDeleteFromTableSuite.scala new file mode 100644 index 0000000000000..6ac4f6e32fb18 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/GroupBasedNoMetadataDeleteFromTableSuite.scala @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector + +class GroupBasedNoMetadataDeleteFromTableSuite extends DeleteFromTableSuiteBase { + + override protected def extraTableProps: java.util.Map[String, String] = { + val props = new java.util.HashMap[String, String]() + props.put("no-metadata", "true") + props + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/GroupBasedNoMetadataMergeIntoTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/GroupBasedNoMetadataMergeIntoTableSuite.scala new file mode 100644 index 0000000000000..5feadcdec23e3 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/GroupBasedNoMetadataMergeIntoTableSuite.scala @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector + +class GroupBasedNoMetadataMergeIntoTableSuite extends MergeIntoTableSuiteBase { + + override protected def extraTableProps: java.util.Map[String, String] = { + val props = new java.util.HashMap[String, String]() + props.put("no-metadata", "true") + props + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/GroupBasedNoMetadataUpdateTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/GroupBasedNoMetadataUpdateTableSuite.scala new file mode 100644 index 0000000000000..31db56b8d5594 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/GroupBasedNoMetadataUpdateTableSuite.scala @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector + +class GroupBasedNoMetadataUpdateTableSuite extends UpdateTableSuiteBase { + + override protected def extraTableProps: java.util.Map[String, String] = { + val props = new java.util.HashMap[String, String]() + props.put("no-metadata", "true") + props + } +}