Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-46207][SQL] Support MergeInto in DataFrameWriterV2 #44119

Closed
wants to merge 19 commits into from

Conversation

huaxingao
Copy link
Contributor

@huaxingao huaxingao commented Dec 2, 2023

What changes were proposed in this pull request?

Add MergeInto support in DataFrameWriterV2

Why are the changes needed?

Spark currently supports merge into sql statement. We want DataFrame to have the same support.

Does this PR introduce any user-facing change?

Yes. This PR introduces new API like the following:

      spark.table("source")
        .mergeInto("target", $"source.id" === $"target.id")
        .whenNotMatched()
        .insertAll()
        .merge()

How was this patch tested?

new tests

Was this patch authored or co-authored using generative AI tooling?

No

@github-actions github-actions bot added the SQL label Dec 2, 2023
@huaxingao
Copy link
Contributor Author

cc @aokolnychyi @cloud-fan @dongjoon-hyun @viirya
This PR is ready for review. Could you please take a look when you have a moment? Thanks a lot!
There is still a mima check failure. It passed on my local, though. I will figure out what is wrong.

@github-actions github-actions bot added the DOCS label Dec 20, 2023
"message" : [
"df.mergeInto needs to be followed by at least one of whenMatched/whenNotMatched/whenNotMatchedBySource."
],
"sqlState" : "23K02"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"sqlState" : "23K02"
"sqlState" : "42K0E"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Thanks

qe.assertCommandExecuted()
}

def withNewMatchedUpdateAction(condition: Option[Expression]): MergeIntoWriter[T] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems too many helper functions...

def withNewMatchedAction(action: MergeAction): MergeIntoWriter[T] = {
  this.matchedActions = this.matchedActions :+ action
  this
}

I think 3 helper functions should be good enough for 3 different action types.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Thanks

import org.apache.spark.sql.Row
import org.apache.spark.sql.functions._

class MergeIntoDataFrameSuite extends RowLevelOperationSuiteBase {
Copy link
Contributor

@cloud-fan cloud-fan Dec 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. We only need to make sure the new scala API works. We don't need to test the underlying v2 sources extensively, which should have been covered already by other tests

Comment on lines 4149 to 4150
*
* @since 4.0.0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to def write, @group basic?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added. Thanks

* Initialize a `WhenNotMatched` action without any condition.
*
* This `WhenNotMatched` can be followed by one of the following merge actions:
* - `insertAll`: Insert all the target table with source dataset records.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Insert all the columns of the target table with ....?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Insert all rows from the source that are not already in the target table.
Please refer https://docs.databricks.com/en/sql/language-manual/delta-merge-into.html#when-not-matched-[by-target]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have fixed this and a few other places

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks.

Comment on lines 92 to 93
* - `insert(Map)`: Insert all the target table records while changing only
* a subset of fields based on the provided assignment.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Insert the specified columns ...

* Initialize a `WhenNotMatchedBySource` action without any condition.
*
* This `WhenNotMatchedBySource` can be followed by one of the following merge actions:
* - `updateAll`: Update all the target table fields with source dataset fields.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update all the columns of the target table ...

*
* This `WhenNotMatchedBySource` can be followed by one of the following merge actions:
* - `updateAll`: Update all the target table fields with source dataset fields.
* - `update(Map)`: Update all the target table records while changing only
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update the specified columns of the target table ...

* - `updateAll`: Update all the target table fields with source dataset fields.
* - `update(Map)`: Update all the target table records while changing only
* a subset of fields based on the provided assignment.
* - `delete`: Delete all the target table records.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delete the target table row.

* - `updateAll`: Update all the target table fields with source dataset fields.
* - `update(Map)`: Update all the target table records while changing only
* a subset of fields based on the provided assignment.
* - `delete`: Delete all the target table records.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delete the matching target table row

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about Delete all target rows that have a match in the source table.?
Please refer https://docs.databricks.com/en/sql/language-manual/delta-merge-into.html#when-matched

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I was referring the doc too. I've tried to combine @huaxingao original sentence and the doc. If @huaxingao wants to use these description from the doc, it is good too.

@@ -4129,6 +4129,36 @@ class Dataset[T] private[sql](
new DataFrameWriterV2[T](table, this)
}

/**
* Create a [[MergeIntoWriter]] for MergeInto action.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is user facing API doc. Not sure if it is proper to put MergeIntoWriter there. For example, we don't put DataFrameWriter in write API doc.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe just describing what the function is used for. E.g., "Merges a set of updates, insertions, and deletions based on a source table into a target table"

https://docs.databricks.com/en/sql/language-manual/delta-merge-into.html

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The key is the MergeIntoWriter is public API or developer API.
cc @cloud-fan

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Thanks

* - `updateAll`: Update all the target table fields with source dataset fields.
* - `update(Map)`: Update all the target table records while changing only
* a subset of fields based on the provided assignment.
* - `delete`: Delete all the target table records.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about Delete all target rows that have a match in the source table.?
Please refer https://docs.databricks.com/en/sql/language-manual/delta-merge-into.html#when-matched

* Initialize a `WhenNotMatched` action without any condition.
*
* This `WhenNotMatched` can be followed by one of the following merge actions:
* - `insertAll`: Insert all the target table with source dataset records.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Insert all rows from the source that are not already in the target table.
Please refer https://docs.databricks.com/en/sql/language-manual/delta-merge-into.html#when-not-matched-[by-target]

@huaxingao
Copy link
Contributor Author

The test failure doesn't seem to be related to my changes.

@beliefer beliefer closed this in 56dc7f8 Dec 21, 2023
@beliefer
Copy link
Contributor

beliefer commented Dec 21, 2023

Merged to master.
Thank you @huaxingao @cloud-fan @viirya @HyukjinKwon

@huaxingao
Copy link
Contributor Author

Thank you all very much for reviewing the PR!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants