Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement merge command #850

Closed
darabos opened this issue Sep 26, 2022 · 15 comments · Fixed by #2042
Closed

Implement merge command #850

darabos opened this issue Sep 26, 2022 · 15 comments · Fixed by #2042
Labels
enhancement New feature or request

Comments

@darabos
Copy link

darabos commented Sep 26, 2022

If I understand Delta's docs (https://docs.delta.io/latest/delta-update.html#upsert-into-a-table-using-merge) correctly, merge is different from a generic write. Rather than appending to or replacing an existing table, it updates (rewrites) the affected target files. Looks like it is also implemented separately. (MergeIntoCommand.scala)

Use Case

Writing a new table to Delta is not so exciting for me. I could just write a Parquet table. But updating an existing table has no substitute. I'm working on integrating a non-Spark computation system with Delta, and the merge command looks like the best way to put back results into the original table.

Related Issue(s)

#542 added basic writing support.

@darabos darabos added the enhancement New feature or request label Sep 26, 2022
@darabos
Copy link
Author

darabos commented Sep 26, 2022

From MergeIntoCommand.scala it looks like a really complex operation. I may be able to work on this. It would be great if you could help me estimate how much effort this would be. Thanks for any input!

@wjones127
Copy link
Collaborator

Merge is indeed a complex operation. We started with just append/overwrite because that doesn't require a query engine; you can just pass some data and write to parquet. But for things like delete and merge, we'll have to integrate a query engine and/or allow plugging one in. (Since we need a way to query for which rows match.) We're in the early stages of designing that, so it might be a couple months before we are even ready to start implementing merge.

That being said, we'd definitely like to support this and would welcome contributions 😄 . Are you planning on building your integration on top of the Rust library or the Python library? And for your use case are you more interested in having the query engine parts provided for you (probably DataFusion) or being able to pass in your own implementation?

@darabos
Copy link
Author

darabos commented Sep 26, 2022

Thanks! We would be using the Python interface and already have a Dask system on hand. It would be great if we could use that!

@nkarpov
Copy link

nkarpov commented Apr 18, 2023

I'm prototyping this and found that DataFusion does not have an equivalent to input_file_name in Spark. This built-in UDF is used in the delta-spark implementation to help filter out unmatched files in all of UPDATE/DELETE/MERGE commands (without this, the entire table would be rewritten every time).

It looks like there were a few starts years ago on this work, but it didn't make it through the repo split. The most current issue I'm able to find is here: apache/arrow#18601, and the previous two efforts apache/arrow#9944 and apache/arrow#9976.

I think ultimately this built-in UDF should exist in DataFusion, but wondering if anyone with more familiarity with DataFusion has any ideas on how we might accomplish this in the interim? I'm just familiarizing myself with DataFusion so, so far, the only thing that comes to mind is a pretty horrendous "read each file as a DF + project the file name, and then union all the dataframes".

@wjones127
Copy link
Collaborator

Perhaps a decent first pass is to have a query per file?

Something vaguely like (for a delete query):

let file_stream = futures::stream::iter(delta_table.files_uris());

let action_stream = file_stream.map(|file| async {
   let orig_num_rows = parquet_row_count(file).await?;
   let df = ctx.read_parquet(file);
   let res = df.filter(!delete_clause).collect().await?;
   if res.num_rows() == 0 {
      (RemoveAction { file }, None)
   } else if res.num_rows() == orig_num_rows {
      (None, None)
   } else {
      let new_file = generate_new_file_name();
      write_parquet(res, new_fiel).await?;
      (RemoveAction { file }, AddAction { new_file })
   }
});

let actions = action_stream.collect_vec()?;

@nkarpov
Copy link

nkarpov commented Apr 18, 2023

I think that works great for UPDATE/DELETE, but don't see how it would work for MERGE in the eventual distributed (ballista) case (maybe that's too much to think about for now). We need the origin of the row at runtime of the join (or post), unlike the static predicates in UPDATE/DELETE that we can evaluate one file at a time.

Your example is actually close to how I envisioned the messy way of supporting this in merge: by projecting lit(filename) to each df (1:1 with file), and then union them all to produce the pre-join targetDF, then we'd have filename available in the join result.

Anyway, in a single node version, I suppose we could could just perform the entire MERGE one file at a time, same as your DELETE example.

@wjones127
Copy link
Collaborator

That's fair. I do think we should solve UPDATE/DELETE first. That will get a lot of useful stuff out of the way.

Anyway, in a single node version, I suppose we could could just perform the entire MERGE one file at a time, same as your DELETE example.

I think that's a decent first pass to protoype.

Could you file a new issue in https://github.com/apache/arrow-datafusion? That's where the upstream issue should live now.

@Blajda
Copy link
Collaborator

Blajda commented Jul 9, 2023

I have a functional prototype of merge done and would like to start a discussion on the public rust interface.

Currently I have something like this that follows the same pattern as the other operations.
Keep in mind that merge supports update, deletes, and inserts. Each sub-operation have optional predicate conditions too.

let (table, _metrics) = DeltaOps(table)
    .merge(source, col("id").eq(col("id_src")))
    .when_matched_update()
    .with_predicate("value >= 20")
    .with_update("value", col("value_src"))
    .with_update("modified", col("modified_src"))
    .build()
    .when_matched_delete()
    .build()
    .when_not_matched_insert()
    .with_set("id", col("id_src"))
    .with_set("value", col("value_src"))
    .with_set("modified", col("modified_src"))
    .build()
    .await?;

I don't like this approach since cargo format forces each child builder to be vertically aligned with the parent.

One possible approach is to have a closure that accepts the child builder. It would looks something like this.

let (table, _metrics) = DeltaOps(table)
    .merge(source, col("id").eq(col("id_src")))
    .when_matched_update(|update| {
        update
            .predicate("value >= 20")
            .update("value", col("value_src"))
            .update("modified", col("modified_src"))
    })
    .when_matched_delete(|delete| delete)
    .when_not_matched_insert(|insert| {
        insert
            .set("id", col("id_src"))
            .set("value", col("value_src"))
            .set("modified", col("modified_src"))
    })
    .await?;

@wjones127
Copy link
Collaborator

One possible approach is to have a closure that accepts the child builder.

@Blajda I agree that one looks nice. looks closer to the API in PySpark, which I assume is what you are modeling off of.

@roeap
Copy link
Collaborator

roeap commented Jul 17, 2023

I like the proposed api as well. I guess it would also help us to not keep two apis for the child builder in sync?

@nitindatta
Copy link

this is great @Blajda

@Blajda
Copy link
Collaborator

Blajda commented Jul 18, 2023

I guess it would also help us to not keep two apis for the child builder in sync?

@roeap I don't quite follow what you mean by this. Seems like the consensus is on the second proposal and I intended to implement only one. Is your suggestion to do both?

@roeap
Copy link
Collaborator

roeap commented Jul 18, 2023

Is your suggestion to do both?

Absolutely not. Yes, the second one seems to be the winner :).

My understanding was, that the closure for when_matched_update would essentially get an UpdateBuilder for the update (sub) operation - in that case any change to that builder would also be reflected in that API without any adjustments.

I may also just have misunderstood things. In any case the effort would be manageable :).

wjones127 added a commit that referenced this issue Sep 11, 2023
# Description
Implement the Merge operation using Datafusion.

Currently the implementation rewrites the entire DeltaTable limiting the
files that are rewritten will be performed in future work.

# Related Issue(s)
- progresses #850


# Documentation

<!---
Share links to useful documentation
--->

---------

Co-authored-by: Will Jones <willjones127@gmail.com>
Co-authored-by: Robert Pack <42610831+roeap@users.noreply.github.com>
@ion-elgreco
Copy link
Collaborator

ion-elgreco commented Sep 11, 2023

@darabos @wjones127 @Blajda should this issue be closed and a new one created for an improved version of MERGE where it's not a full rewrite, but partial?

@Blajda
Copy link
Collaborator

Blajda commented Sep 12, 2023

I think we should keep this open so we have single issue to track progress. We should should only close this issue once merge does not perform a full rewrite and the join operator uses a HashJoin or SortMergeJoin.

Blajda added a commit that referenced this issue Nov 19, 2023
# Description
This refactors the merge operation to use DataFusion's DataFrame and
LogicalPlan APIs

The NLJ is eliminated and the query planner can pick the optimal join
operator. This also enables the operation to use multiple threads and
should result in significant speed up.
Merge is still limited to using a single thread in some area. When
collecting benchmarks, I encountered multiple OoM issues with
Datafusion's hash join implementation. There are multiple tickets
upstream open regarding this. For now, I've limited the number of
partitions to just 1 to prevent this.

Predicates passed as SQL are also easier to use now. Manual casting was
required to ensure data types were aligned. Now the logical plan will
perform type coercion when optimizing the plan.

# Related Issues
- enhances #850
- closes #1790 
- closes #1753
ion-elgreco pushed a commit to ion-elgreco/delta-rs that referenced this issue Nov 20, 2023
# Description
This refactors the merge operation to use DataFusion's DataFrame and
LogicalPlan APIs

The NLJ is eliminated and the query planner can pick the optimal join
operator. This also enables the operation to use multiple threads and
should result in significant speed up.
Merge is still limited to using a single thread in some area. When
collecting benchmarks, I encountered multiple OoM issues with
Datafusion's hash join implementation. There are multiple tickets
upstream open regarding this. For now, I've limited the number of
partitions to just 1 to prevent this.

Predicates passed as SQL are also easier to use now. Manual casting was
required to ensure data types were aligned. Now the logical plan will
perform type coercion when optimizing the plan.

# Related Issues
- enhances delta-io#850
- closes delta-io#1790 
- closes delta-io#1753
ion-elgreco pushed a commit to ion-elgreco/delta-rs that referenced this issue Nov 20, 2023
# Description
This refactors the merge operation to use DataFusion's DataFrame and
LogicalPlan APIs

The NLJ is eliminated and the query planner can pick the optimal join
operator. This also enables the operation to use multiple threads and
should result in significant speed up.
Merge is still limited to using a single thread in some area. When
collecting benchmarks, I encountered multiple OoM issues with
Datafusion's hash join implementation. There are multiple tickets
upstream open regarding this. For now, I've limited the number of
partitions to just 1 to prevent this.

Predicates passed as SQL are also easier to use now. Manual casting was
required to ensure data types were aligned. Now the logical plan will
perform type coercion when optimizing the plan.

# Related Issues
- enhances delta-io#850
- closes delta-io#1790 
- closes delta-io#1753
ion-elgreco pushed a commit to ion-elgreco/delta-rs that referenced this issue Nov 20, 2023
# Description
This refactors the merge operation to use DataFusion's DataFrame and
LogicalPlan APIs

The NLJ is eliminated and the query planner can pick the optimal join
operator. This also enables the operation to use multiple threads and
should result in significant speed up.
Merge is still limited to using a single thread in some area. When
collecting benchmarks, I encountered multiple OoM issues with
Datafusion's hash join implementation. There are multiple tickets
upstream open regarding this. For now, I've limited the number of
partitions to just 1 to prevent this.

Predicates passed as SQL are also easier to use now. Manual casting was
required to ensure data types were aligned. Now the logical plan will
perform type coercion when optimizing the plan.

# Related Issues
- enhances delta-io#850
- closes delta-io#1790 
- closes delta-io#1753
Blajda added a commit that referenced this issue Dec 30, 2023
# Description
Implements a new Datafusion node called `MergeBarrier` that determines
which files have modifications. For files that do not have modifications
a remove action is no longer created.

# Related Issue(s)
- enhances #850
Blajda added a commit that referenced this issue Jan 6, 2024
# Description
Update documentation to reflect the current state of merge. Since merge
now supports upserts without performing a full rewrite I'd let to mark
it as "done".

There is of course further optimizations that can be performed but it is
now in a usable state.

# Related Issue(s)
- closes #850
r3stl355 pushed a commit to r3stl355/delta-rs that referenced this issue Jan 10, 2024
# Description
Update documentation to reflect the current state of merge. Since merge
now supports upserts without performing a full rewrite I'd let to mark
it as "done".

There is of course further optimizations that can be performed but it is
now in a usable state.

# Related Issue(s)
- closes delta-io#850
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

7 participants