Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-43775][SQL] DataSource V2: Allow representing updates as deletes and inserts #41300

Closed
wants to merge 1 commit into from

Conversation

aokolnychyi
Copy link
Contributor

What changes were proposed in this pull request?

This PR adds a way for data sources to request Spark to represent updates as deletes and inserts.

Why are the changes needed?

It may be beneficial for data sources to represent updates as deletes and inserts for delta-based implementations. Specifically, it may help to properly distribute and order records before writing.

Delete records set only row ID and metadata attributes. Update records set data, row ID, metadata attributes. Insert records set only data attributes.

For instance, a data source may rely on a metadata column _row_id (synthetic internally generated) to identify the row and may be partitioned by bucket(product_id). Splitting updates into inserts and deletes would allow data sources to cluster all update and insert records in MERGE for the same partition into a single task. Otherwise, the clustering key for updates and inserts will be different (inserts will always have _row_id as null as it is a metadata column). The new functionality is critical to reduce the number of generated files. It also makes sense in UPDATE operations if the original and new partition of a record do not match.

Does this PR introduce any user-facing change?

This PR adds a new method to SupportsDelta but the change is backward compatible.

How was this patch tested?

This PR comes with tests.

@github-actions github-actions bot added the SQL label May 24, 2023
Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@@ -91,6 +91,33 @@ trait RewriteRowLevelCommand extends Rule[LogicalPlan] {
rowIdAttrs
}

protected def deltaDeleteOutput(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added to the parent class to reuse in MERGE later.

import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, Expression}
import org.apache.spark.sql.catalyst.util.truncatedString

case class SplitUpdateAsDeleteAndInsert(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe, I can use Expand/ExpandExec. Let me explore this to see if there is any performance implication.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I do think this node is a bit more efficient compared to Expand as it acts almost like Project.

  • No need to call copy() on each row.
  • Sub expression elimination like in ProjectExec.
  • Deferred evaluation of input attributes unless they are needed like in ProjectExec.

What are your thoughts, do you think it is worth having such a node? Technically, Expand would work too but it will be less efficient and will probably require more memory. This use case does not require a generic approach like in Expand.

cc @dongjoon-hyun @viirya @cloud-fan @sunchao @huaxingao

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe, we can implement some of those optimizations for Expand instead of adding another node. I am inclined towards using Expand but let me know what everyone thinks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm in favor of reusing Expand, so that we can get the codegen and/or vectorization (if people install some spark plugins) of it for free.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switched to Expand, will follow up with improvements later.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you re-trigger the failed CI pipeline, @aokolnychyi ?

@aokolnychyi aokolnychyi reopened this May 27, 2023
@dongjoon-hyun
Copy link
Member

It seems to fail to re-trigger, @aokolnychyi . Do you have a GitHub Action link on your commit?

@aokolnychyi
Copy link
Contributor Author

@dongjoon-hyun, looks like something weird happened. I switched to Expand and rebased to pick up recent changes. Let's see if tests will work this time. The PR should be ready for a detailed review.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM. Thank you, @aokolnychyi . Expand change also looks good to me, too.

Since it was @cloud-fan 's comment, could you review this once more, @cloud-fan ?

@dongjoon-hyun
Copy link
Member

Thank you again, @aokolnychyi , @cloud-fan , @viirya .
Merged to master for Apache Spark 3.5.0.

@aokolnychyi
Copy link
Contributor Author

Thanks, @dongjoon-hyun @cloud-fan @viirya!

czxm pushed a commit to czxm/spark that referenced this pull request Jun 12, 2023
…es and inserts

### What changes were proposed in this pull request?

This PR adds a way for data sources to request Spark to represent updates as deletes and inserts.

### Why are the changes needed?

It may be beneficial for data sources to represent updates as deletes and inserts for delta-based implementations. Specifically, it may help to properly distribute and order records before writing.

Delete records set only row ID and metadata attributes. Update records set data, row ID, metadata attributes. Insert records set only data attributes.

For instance, a data source may rely on a metadata column `_row_id` (synthetic internally generated) to identify the row and may be partitioned by `bucket(product_id)`. Splitting updates into inserts and deletes would allow data sources to cluster all update and insert records in MERGE for the same partition into a single task. Otherwise, the clustering key for updates and inserts will be different (inserts will always have `_row_id` as null as it is a metadata column). The new functionality is critical to reduce the number of generated files. It also makes sense in UPDATE operations if the original and new partition of a record do not match.

### Does this PR introduce _any_ user-facing change?

This PR adds a new method to `SupportsDelta` but the change is backward compatible.

### How was this patch tested?

This PR comes with tests.

Closes apache#41300 from aokolnychyi/spark-43775.

Authored-by: aokolnychyi <aokolnychyi@apple.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
viirya pushed a commit to viirya/spark-1 that referenced this pull request Oct 19, 2023
…es and inserts

### What changes were proposed in this pull request?

This PR adds a way for data sources to request Spark to represent updates as deletes and inserts.

### Why are the changes needed?

It may be beneficial for data sources to represent updates as deletes and inserts for delta-based implementations. Specifically, it may help to properly distribute and order records before writing.

Delete records set only row ID and metadata attributes. Update records set data, row ID, metadata attributes. Insert records set only data attributes.

For instance, a data source may rely on a metadata column `_row_id` (synthetic internally generated) to identify the row and may be partitioned by `bucket(product_id)`. Splitting updates into inserts and deletes would allow data sources to cluster all update and insert records in MERGE for the same partition into a single task. Otherwise, the clustering key for updates and inserts will be different (inserts will always have `_row_id` as null as it is a metadata column). The new functionality is critical to reduce the number of generated files. It also makes sense in UPDATE operations if the original and new partition of a record do not match.

### Does this PR introduce _any_ user-facing change?

This PR adds a new method to `SupportsDelta` but the change is backward compatible.

### How was this patch tested?

This PR comes with tests.

Closes apache#41300 from aokolnychyi/spark-43775.

Authored-by: aokolnychyi <aokolnychyi@apple.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
(cherry picked from commit fead25a)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
4 participants