[SPARK-56510][SQL] Fix ReplaceData DML without metadata attributes not projecting out the operation column#55372
[SPARK-56510][SQL] Fix ReplaceData DML without metadata attributes not projecting out the operation column#55372ZiyaZa wants to merge 6 commits into
Conversation
szehon-ho
left a comment
There was a problem hiding this comment.
Good bug fix for the no-metadata-attributes code path in ReplaceData. The core change (introducing DataWithProjectionWritingSparkTask) is correct and well-targeted — it ensures the __row_operation column is projected out even when there are no metadata attributes. A few suggestions below.
| package org.apache.spark.sql.connector | ||
|
|
||
| class GroupBasedNoMetadataDeleteFromTableSuite extends DeleteFromTableSuiteBase { | ||
|
|
There was a problem hiding this comment.
suggestion: Six new test files, each ~10 lines of actual code, is significant boilerplate. Consider adding a noMetadata flag to the existing suites and running them with both configurations (e.g., via a shared trait or parameterization). This would avoid class proliferation and keep the test matrix more maintainable.
There was a problem hiding this comment.
This follows the existing structure for DML suites, having one suite per file. I don't think it is good to have it this way, but without a large diff, I cannot change this.
Note that we still need to have different classes for each configuration, but we could place multiple suites in a single file.
| override def build(): Write = new Write with RequiresDistributionAndOrdering { | ||
| override def requiredDistribution: Distribution = { | ||
| Distributions.clustered(Array(PARTITION_COLUMN_REF)) | ||
| override def build(): Write = if (noMetadata) { |
There was a problem hiding this comment.
question: Is it intentional that the noMetadata path bypasses RequiresDistributionAndOrdering? This exercises a different physical plan (no shuffle/sort). If the goal is just to test the no-metadata code path, consider keeping the distribution/ordering requirements so these tests cover the same physical plan shape as the existing suites.
There was a problem hiding this comment.
It's intentional. Without metadata, we don't have PARTITION_COLUMN_REF. Without it, we can't guarantee partitioning and can't know which column to use for distribution / ordering. This still exercises the code paths for different writing tasks, so it should be enough.
| val pk = id.getInt(0) | ||
| buffer.deletes += pk | ||
| val logEntry = new GenericInternalRow(Array[Any](DELETE, pk, meta.copy(), null)) | ||
| val metaCopy = if (meta != null) meta.copy() else null |
There was a problem hiding this comment.
This null guard is needed because DeltaWritingSparkTask passes null for metadata when requiredMetadataAttributes() is empty. However, the DeltaWriter API methods (delete(meta, id), update(meta, id, row), reinsert(meta, row)) don't document that meta can be null. Third-party connectors could hit the same NPE. Consider adding Javadoc on those API methods to clarify the contract.
There was a problem hiding this comment.
I believe this falls outside the scope of this PR.
|
Let me take a look. |
|
I would explore the possibility of fixing this by introducing correct row operation types for |
|
that is a nice idea (fixing this and #55141 ) |
aokolnychyi
left a comment
There was a problem hiding this comment.
Looks great to me, two minor questions and good to go. Thanks for the patience!
| addOperationColumn(Literal(operation, IntegerType), plan) | ||
| } | ||
|
|
||
| protected def addOperationColumn(operation: Expression, plan: LogicalPlan): LogicalPlan = { |
There was a problem hiding this comment.
Is this being used? Is it needed?
There was a problem hiding this comment.
Good catch, it must have been left from my previous attempts. It's not used anymore, removed it.
| @@ -72,12 +72,13 @@ object RewriteUpdateTable extends RewriteRowLevelCommand { | |||
| val readRelation = buildRelationWithAttrs(relation, operationTable, metadataAttrs) | |||
|
|
|||
| // build a plan with updated and copied over records | |||
| val updatedAndRemainingRowsPlan = buildReplaceDataUpdateProjection( | |||
| readRelation, assignments, cond) | |||
| // the conditional operation column needs to be added in the same Projection as cond is | |||
There was a problem hiding this comment.
Why can't we have this inside buildReplaceDataUpdateProjection and rely on the optimizer to fold this?
If(TrueLiteral, UPDATE_OPERATION, COPY_OPERATION) -> UPDATE_OPERATION
We already call buildReplaceDataUpdateProjection WITHOUT the condition in the union path.
// buildReplaceDataUpdateProjection
val operation = If(cond, Literal(UPDATE_OPERATION), Literal(COPY_OPERATION))
Project(Alias(operation, OPERATION_COLUMN)() +: updatedValues, plan)
There was a problem hiding this comment.
We can, I changed it now to be inside buildReplaceDataUpdateProjection.
What changes were proposed in this pull request?
Previously, all DSv2 tests used an in-memory table that had some metadata attributes. This caused the code path for no-metadata attributes to be missed. This PR introduces a new property
no-metadatafor testing with an InMemoryTable without metadata attributes.Previous implementation had a bug for ReplaceData plans that it would use
DataWritingSparkTaskwithout projection, which means that the connector would receive one more column (the__row_operationcolumn) in addition to the row data to write. This is fixed in this PR by creating a new Writing TaskDataWithProjectionWritingSparkTaskthat supports projecting only row data.Additionally, following changes are done to clean-up the code:
WRITE_WITH_METADATA_OPERATIONandWRITE_OPERATION, and instead createdCOPY_OPERATIONto be used along with other existing operations.RowLevelWriteExecas a parent ofReplaceDataExec/WriteDeltaExec, which now holds a helpergetMetricValuefor metric computationWhy are the changes needed?
To fix a bug.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
New unit tests.
Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Opus 4.6