Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge is slower than expected and loads more than expected into memory. #2573

Open
adamfaulkner-at opened this issue Jun 5, 2024 · 5 comments
Labels
bug Something isn't working question Further information is requested

Comments

@adamfaulkner-at
Copy link

Environment

Delta-rs version: 0.17.3

Binding: Rust

Environment:

  • Cloud provider: AWS
  • OS: Ubuntu 22.04.3 LTS
  • Other:

Bug

What happened:

Given a source table with ~100M rows in it, stored as a delta lake table in S3, sorted by "rideid". (This is ~38 parquet files that are about 100MB each). I'm trying to "upsert" 1 row using code that looks like this:

        (table, _) = DeltaOps(table)
            .merge(source_df, col("source.rideid").eq(col("target.rideid")))
            .with_source_alias("source")
            .with_target_alias("target")
            .when_not_matched_insert(|insert| {
                COLUMNS
                    .iter()
                    .fold(insert, |insert, &column| {
                        insert.set(column, col(format!("source.{}", column)))
                    })
            })?
            .when_matched_update(|update| {
                COLUMNS.iter().fold(update, |update, &column| {
                    update.update(column, col(format!("source.{}", column)))
                })
            })?
            .await?;

(COLUMNS is simply an array that contains all 13 of the columns in the table.) This consumes all of my computer's memory then crashes.

I've tried partitioning the data by using a hash of the rideid, this doesn't seem to change the fact that I run out of memory and cannot run this operation.

What you expected to happen:

This is pretty surprising, because I can run the join in DataFusion pretty efficiently:

        let overlap_results = ctx.sql("SELECT target.rideid FROM target LEFT JOIN source ON target.rideid = source.rideid WHERE source.rideid IS NOT NULL").await?.collect().await?;

This query takes about 2 seconds and consumes only ~500MB of memory. I can build my own upsert on top of this sort of datafusion query, delete, and write that seems to work fine.

How to reproduce it:

Implement an "upsert" operation using merge on a table with ~100M rows, observe how much memory this consumes.

More details:

@adamfaulkner-at adamfaulkner-at added the bug Something isn't working label Jun 5, 2024
@ion-elgreco
Copy link
Collaborator

It needs to scan the entire table if you don't use partitioning, if you do partition then you need to give an explicit partition predicate to reduce the amount of partitions you read

@ion-elgreco ion-elgreco added the question Further information is requested label Jun 7, 2024
@adamfaulkner-at
Copy link
Author

Thanks for the reply @ion-elgreco , why does it need to scan the entire table into memory before it starts writing data? Is this just a lack of optimization, or is there something fundamental to what merge is doing that prevents this kind of optimization?

@vegarsti
Copy link
Contributor

Thanks for the reply @ion-elgreco , why does it need to scan the entire table into memory before it starts writing data? Is this just a lack of optimization, or is there something fundamental to what merge is doing that prevents this kind of optimization?

It needs to scan the entire table because it needs to find out which rows the merge into condition applies to.

@adamfaulkner-at
Copy link
Author

It needs to scan the entire table because it needs to find out which rows the merge into condition applies to.

I understand this. However, it doesn't need to hold the entire table in memory while it is performing the merge. It could do this in a streaming fashion – this is more or less what you get out of the box with datafusion.

@adamfaulkner-at
Copy link
Author

To answer my own questions here:

why does it need to scan the entire table into memory before it starts writing data

MergeBarrier holds on to all records for a particular file until either a delete, update, or insert is encountered, or until the input data is fully exhausted. This means that in workloads with a large input set, where merge does not typically have deletes, updates, and inserts, the entire dataset will usually be buffered in memory.

This could be avoided if we somehow explicitly told DataFusion to fully exhaust one file at a time so that data could be flushed.

I can imagine using partitioning to break the merge into many operations so that all the data is not pulled into memory at once. But in my case, I'd probably rather put the effort towards not using Merge.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working question Further information is requested
Projects
None yet
Development

No branches or pull requests

3 participants