-
Notifications
You must be signed in to change notification settings - Fork 2k
Fix massive spill files for StringView/BinaryView columns II #21633
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
a42f659
cd8ecda
06b0df1
68c3403
7fd21a5
4fa9cf2
8627423
92a286b
1530ba7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -24,7 +24,10 @@ use arrow::array::RecordBatch; | |
| use datafusion_common::exec_datafusion_err; | ||
| use datafusion_execution::disk_manager::RefCountedTempFile; | ||
|
|
||
| use super::{IPCStreamWriter, spill_manager::SpillManager}; | ||
| use super::{ | ||
| IPCStreamWriter, gc_view_arrays, | ||
| spill_manager::{GetSlicedSize, SpillManager}, | ||
| }; | ||
|
|
||
| /// Represents an in-progress spill file used for writing `RecordBatch`es to disk, created by `SpillManager`. | ||
| /// Caller is able to use this struct to incrementally append in-memory batches to | ||
|
|
@@ -51,16 +54,25 @@ impl InProgressSpillFile { | |
|
|
||
| /// Appends a `RecordBatch` to the spill file, initializing the writer if necessary. | ||
| /// | ||
| /// Before writing, performs GC on StringView/BinaryView arrays to compact backing | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. FWIW I think the same general approach might be useful / needed for other "view" type arrays -- I am thinking sliced LIstView for example as well as sliced ListArray and sliced Utf8 🤔 Maybe as a follow on PR
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes I think there's a larger discussion around the footgun of view arrays sharing data and how slicing in general can result in fragmentation of data in arrays and wasted memory. I don't know what the big picture story is but personally I feel it would be reasonable to have some heuristic like "if an array becomes more than 50% dead references gc it" |
||
| /// buffers. When a view array is sliced, it still references the original full buffers, | ||
| /// causing massive spill files without GC (see issue #19414: 820MB → 33MB after GC). | ||
| /// | ||
| /// Returns the post-GC sliced memory size of the batch for memory accounting. | ||
| /// | ||
| /// # Errors | ||
| /// - Returns an error if the file is not active (has been finalized) | ||
| /// - Returns an error if appending would exceed the disk usage limit configured | ||
| /// by `max_temp_directory_size` in `DiskManager` | ||
| pub fn append_batch(&mut self, batch: &RecordBatch) -> Result<()> { | ||
| pub fn append_batch(&mut self, batch: &RecordBatch) -> Result<usize> { | ||
| if self.in_progress_file.is_none() { | ||
| return Err(exec_datafusion_err!( | ||
| "Append operation failed: No active in-progress file. The file may have already been finalized." | ||
| )); | ||
| } | ||
|
|
||
| let gc_batch = gc_view_arrays(batch)?; | ||
|
|
||
| if self.writer.is_none() { | ||
| // Use the SpillManager's declared schema rather than the batch's schema. | ||
| // Individual batches may have different schemas (e.g., different nullability) | ||
|
|
@@ -87,7 +99,7 @@ impl InProgressSpillFile { | |
| } | ||
| } | ||
| if let Some(writer) = &mut self.writer { | ||
| let (spilled_rows, _) = writer.write(batch)?; | ||
| let (spilled_rows, _) = writer.write(&gc_batch)?; | ||
| if let Some(in_progress_file) = &mut self.in_progress_file { | ||
| let pre_size = in_progress_file.current_disk_usage(); | ||
| in_progress_file.update_disk_usage()?; | ||
|
|
@@ -102,7 +114,7 @@ impl InProgressSpillFile { | |
| unreachable!() // Already checked inside current function | ||
| } | ||
| } | ||
| Ok(()) | ||
| gc_batch.get_sliced_size() | ||
| } | ||
|
|
||
| pub fn flush(&mut self) -> Result<()> { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This dependency seems unused.
The only occurrence of
arrow_datais at https://github.com/apache/datafusion/pull/21633/changes#diff-1f7d15c867929af294664ebbde4e8c9038186222cbb95ed86e527406cf066e84R463 for a test helper.