Skip to content

Improve upsert memory pressure #1995

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 20 commits into from
Jun 20, 2025
Merged

Conversation

koenvo
Copy link
Contributor

@koenvo koenvo commented May 13, 2025

Summary

This PR updates the upsert logic to use batch processing. The main goal is to prevent out-of-memory (OOM) issues when updating large tables by avoiding loading all data at once.

Note: This has only been tested against the unit tests—no real-world datasets have been evaluated yet.

This PR partially depends on functionality introduced in #1817.


Notes

  • Duplicate detection across multiple batches is not possible with this approach.
  • All data is read sequentially, which may be slower than the parallel read used by to_arrow. fixed using concurrent_tasks parameter

Performance Comparison

In setups with many small files, network and metadata overhead become the dominant factor. This impacts batch reading performance, as each file contributes relatively more overhead than payload. In the test setup used here, metadata access was the largest cost.

Using to_arrow_batch_reader (sequential):

  • Scan: 9993.50 ms
  • To list: 19811.09 ms

Using to_arrow (parallel):

  • Scan: 10607.88 ms

@jayceslesar
Copy link
Contributor

fwiw I think we should try to get this merged in at some point. Some ideas:

  1. Make it a flag to use the batchreader or not, some users might have basically infinite memory
  2. Is there a more optimal way to batch data? Thinking along the lines of using partitions although that may already happen under the hood

@koenvo
Copy link
Contributor Author

koenvo commented Jun 2, 2025

fwiw I think we should try to get this merged in at some point. Some ideas:

  1. Make it a flag to use the batchreader or not, some users might have basically infinite memory
  2. Is there a more optimal way to batch data? Thinking along the lines of using partitions although that may already happen under the hood

I've been thinking about what I (as a developer) want. The answer is: set max memory usage.

Some ideas:

  1. Determine which partitions can fit together in memory and batch load those together
  2. Fetching of parquet files can happen parallel and only do loading sequential
  3. Combine 1 and 2

@koenvo
Copy link
Contributor Author

koenvo commented Jun 2, 2025

Did an update and ran a quick benchmark with different concurrent_tasks settings on to_arrow_batch_reader():

table = catalog.get_table("some_table")

# Benchmark loop
p = table.scan().to_arrow_batch_reader(concurrent_tasks=100)
for batch in tqdm.tqdm(p):
    print(pool.max_memory())

Results (including pool.max_memory()):

  • concurrent_tasks=152it [00:06, 7.73it/s] | Max memory: 7.4 MB
  • concurrent_tasks=10391it [00:06, 61.98it/s] | Max memory: 36.3 MB
  • concurrent_tasks=201412it [00:15, 83.54it/s] | Max memory: 147 MB
  • concurrent_tasks=1001030it [00:09, 106.84it/s] | Max memory: 1.76 GB

Some more testing (on 100mbit connection):

scan.to_arrow_batch_reader(concurrent_tasks=10)
2025-06-03 11:02:48.986 INFO Starting
2025-06-03 11:05:10.927 INFO Rows: 13584102
2025-06-03 11:05:10.927 INFO Memory usage: 78.4MB

scan.to_arrow()
2025-06-03 11:05:47.211 INFO Starting
2025-06-03 11:08:09.907 INFO Rows: 13584102
2025-06-03 11:08:09.907 INFO Memory usage: 11GB

Note: Performance also depends on the network connection.

@koenvo koenvo marked this pull request as ready for review June 3, 2025 09:59
@koenvo koenvo changed the title Use batchreader in upsert Improve to_arrow_batch_reader performance + use to_arrow_batch_reader in upsert to lower memory pressure Jun 3, 2025
@koenvo
Copy link
Contributor Author

koenvo commented Jun 3, 2025

Did another update to get rid of the concurrent_tasks argument. It now defaults to the max-workers Config.

I also refactored to_arrow to use to_arrow_batch_reader under the hood to prevent duplicate implementations of the same functionality.

@Fokko
Copy link
Contributor

Fokko commented Jun 13, 2025

Make it a flag to use the batchreader or not, some users might have basically infinite memory

Typically, I'm not a big fan of this kind of flag, since it will delegate the responsibility to the user, and not every user might know the implications. Instead, let's optimize for the best experience out of the box.

@Fokko
Copy link
Contributor

Fokko commented Jun 13, 2025

@koenvo Looks like there are some issues with the list vs large_list, do you have time to dig into the issue? In the original implementation we had cast operations to ensure that we keep the original types.

@koenvo
Copy link
Contributor Author

koenvo commented Jun 13, 2025

@koenvo Looks like there are some issues with the list vs large_list, do you have time to dig into the issue? In the original implementation we had cast operations to ensure that we keep the original types.

I was kind of scared by the

        if property_as_bool(self._io.properties, PYARROW_USE_LARGE_TYPES_ON_READ, False):
            deprecation_message(
                deprecated_in="0.10.0",
                removed_in="0.11.0",
                help_message=f"Property `{PYARROW_USE_LARGE_TYPES_ON_READ}` will be removed.",
            )
            result = result.cast(arrow_schema)

Currently, I do the cast in my own application code, as I ran into the same issue but I though it was just my implementation. Had to add the cast.

And happy to look into this.

Copy link
Contributor

@Fokko Fokko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two minor comments, this looks great 👍

koenvo and others added 2 commits June 20, 2025 17:08
Use arrow_schema.empty_table()

Co-authored-by: Fokko Driesprong <fokko@apache.org>
Co-authored-by: Fokko Driesprong <fokko@apache.org>
@Fokko Fokko changed the title Improve to_arrow_batch_reader performance + use to_arrow_batch_reader in upsert to lower memory pressure Improve memory pressure by using batching Jun 20, 2025
@Fokko Fokko changed the title Improve memory pressure by using batching Improve upsert memory pressure Jun 20, 2025
@Fokko Fokko merged commit 1ac8ffb into apache:main Jun 20, 2025
10 checks passed
@Fokko
Copy link
Contributor

Fokko commented Jun 20, 2025

Moving this forward, this looks great @koenvo 🙌

@koenvo koenvo deleted the feat/use-batchreader-in-upsert branch June 20, 2025 22:07
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants