Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -105,4 +105,45 @@ default String description() {
default NamedReference[] requiredMetadataAttributes() {
return new NamedReference[0];
}


/**
* Controls whether to send only the required data columns to the connector rather than the
* full row.
* <p>
* When true, Spark narrows the data column schema ({@link LogicalWriteInfo#schema()}) to only
* the columns declared via {@link #requiredDataAttributes()}. Metadata columns (from
* {@link #requiredMetadataAttributes()}) and row ID columns (from
* {@link SupportsDelta#rowId()}) are unaffected and always projected separately.
* <p>
* If {@link #requiredDataAttributes()} returns a non-empty array, the write schema is exactly
* those columns in declared order. The connector must include all columns it wants to receive,
* including the columns being updated. If {@link #requiredDataAttributes()} returns an empty
* array, Spark sends only the non-identity assigned columns (heuristic path).
*
* @since 4.2.0
*/
default boolean supportsColumnUpdates() {
return false;
}

/**
* Returns data column references required to perform this row-level operation.
* <p>
* This method is only consulted by Spark when {@link #supportsColumnUpdates()} returns
* {@code true}. If {@code supportsColumnUpdates()} returns {@code false}, the returned array
* is ignored and the full table row is sent (the default behavior).
* <p>
* When non-empty, the returned columns become the write schema in declared order.
* The connector must declare all columns it wants to receive, including the columns being
* updated. Use {@link RowLevelOperationInfo#updatedColumns()} to learn which columns are being
* assigned, then add any extra columns needed for row lookup or routing (e.g., primary key).
* <p>
* When empty (the default), Spark falls back to sending only the non-identity assigned columns.
*
* @since 4.2.0
*/
default NamedReference[] requiredDataAttributes() {
return new NamedReference[0];
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.connector.write;

import org.apache.spark.annotation.Experimental;
import org.apache.spark.sql.connector.expressions.NamedReference;
import org.apache.spark.sql.connector.write.RowLevelOperation.Command;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;

Expand All @@ -37,4 +38,20 @@ public interface RowLevelOperationInfo {
* Returns the row-level SQL command (e.g. DELETE, UPDATE, MERGE).
*/
Command command();

/**
* Returns the columns being updated in an UPDATE statement, as non-identity assignments.
*
* <p>For DELETE and MERGE, returns an empty array.
*
* <p>Connectors can use this to decide what {@link RowLevelOperation#requiredDataAttributes()}
* to declare. For instance, a connector that needs its primary key for row lookup can check
* whether pk is already in the updated columns list and, if not, add it to
* requiredDataAttributes().
*
* @since 4.2.0
*/
default NamedReference[] updatedColumns() {
return new NamedReference[0];
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@ import scala.collection.mutable
import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.ProjectingInternalRow
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, AttributeSet, Expression, ExprId, If, Literal, MetadataAttribute, NamedExpression, V2ExpressionUtils}
import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
import org.apache.spark.sql.catalyst.plans.logical.{Assignment, Expand, LogicalPlan, MergeRows, Project, Union}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.{ReplaceDataProjections, WriteDeltaProjections}
import org.apache.spark.sql.catalyst.util.RowDeltaUtils._
import org.apache.spark.sql.connector.catalog.SupportsRowLevelOperations
import org.apache.spark.sql.connector.expressions.FieldReference
import org.apache.spark.sql.connector.expressions.{FieldReference, NamedReference}
import org.apache.spark.sql.connector.write.{RowLevelOperation, RowLevelOperationInfoImpl, RowLevelOperationTable, SupportsDelta}
import org.apache.spark.sql.connector.write.RowLevelOperation.Command
import org.apache.spark.sql.errors.QueryCompilationErrors
Expand All @@ -50,20 +51,35 @@ trait RewriteRowLevelCommand extends Rule[LogicalPlan] {
protected def buildOperationTable(
table: SupportsRowLevelOperations,
command: Command,
options: CaseInsensitiveStringMap): RowLevelOperationTable = {
val info = RowLevelOperationInfoImpl(command, options)
options: CaseInsensitiveStringMap,
updatedColumns: Seq[NamedReference] = Nil): RowLevelOperationTable = {
val info = RowLevelOperationInfoImpl(command, options, updatedColumns)
val operation = table.newRowLevelOperationBuilder(info).build()
RowLevelOperationTable(table, operation)
}

// Builds a DataSourceV2Relation for a row-level operation, optionally narrowing its output.
//
// When dataAttrs is non-empty, the relation output is narrowed to include only columns
// required for a column-update write. When dataAttrs is empty, the full relation.output is
// preserved.
protected def buildRelationWithAttrs(
relation: DataSourceV2Relation,
table: RowLevelOperationTable,
metadataAttrs: Seq[AttributeReference],
rowIdAttrs: Seq[AttributeReference] = Nil): DataSourceV2Relation = {

val attrs = dedupAttrs(relation.output ++ rowIdAttrs ++ metadataAttrs)
relation.copy(table = table, output = attrs)
rowIdAttrs: Seq[AttributeReference] = Nil,
dataAttrs: Seq[AttributeReference] = Nil,
cond: Expression = TrueLiteral): DataSourceV2Relation = {

if (dataAttrs.nonEmpty) {
val required =
AttributeSet(dataAttrs) ++ AttributeSet(Seq(cond)) ++ AttributeSet(rowIdAttrs)
val narrowOutput = relation.output.filter(required.contains)
relation.copy(table = table, output = dedupAttrs(narrowOutput ++ rowIdAttrs ++ metadataAttrs))
Comment on lines +75 to +78
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can an attribute in required be missing from relation.output?
rowIdAttrs seems to be added 2 times.
If we already have a dedupAttrs() then probably doesn't make sense build AttributeSets.

} else {
val attrs = dedupAttrs(relation.output ++ rowIdAttrs ++ metadataAttrs)
relation.copy(table = table, output = attrs)
}
}

protected def dedupAttrs(attrs: Seq[AttributeReference]): Seq[AttributeReference] = {
Expand All @@ -87,6 +103,14 @@ trait RewriteRowLevelCommand extends Rule[LogicalPlan] {
relation)
}

protected def resolveRequiredDataAttrs(
relation: DataSourceV2Relation,
operation: RowLevelOperation): Seq[AttributeReference] = {
V2ExpressionUtils.resolveRefs[AttributeReference](
operation.requiredDataAttributes.toImmutableArraySeq,
relation)
}

protected def resolveRowIdAttrs(
relation: DataSourceV2Relation,
operation: SupportsDelta): Seq[AttributeReference] = {
Expand Down Expand Up @@ -211,11 +235,13 @@ trait RewriteRowLevelCommand extends Rule[LogicalPlan] {
metadataAttrs: Seq[Attribute]): WriteDeltaProjections = {
val outputs = extractOutputs(plan)

// Always produce Some(rowProjection) even for empty rowAttrs (identity-only column updates).
// Physical execution calls rowProjection.project(row) unconditionally; None causes NPE.
val rowProjection = if (rowAttrs.nonEmpty) {
val outputsWithRow = filterOutputs(outputs, OPERATIONS_WITH_ROW)
Some(newLazyProjection(plan, outputsWithRow, rowAttrs))
} else {
None
Some(ProjectingInternalRow(StructType(Nil), Nil))
}

val outputsWithRowId = filterOutputs(outputs, OPERATIONS_WITH_ROW_ID)
Expand Down
Loading