feat(ipc): Avoid repeated heap allocations and buffer copies in IPC writer#9836
feat(ipc): Avoid repeated heap allocations and buffer copies in IPC writer#9836pchintar wants to merge 1 commit intoapache:mainfrom
Conversation
|
run benchmark ipc_writer |
|
🤖 Arrow criterion benchmark running (GKE) | trigger CPU Details (lscpu)Comparing ipc-writer-avoid-repetitive-allocs (8ce051e) to 4fa8d2f (merge-base) diff File an issue against this benchmark runner |
|
🤖 Arrow criterion benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagebase (merge-base)
branch
File an issue against this benchmark runner |
|
Ok, so the results are positive (~15% improvement for non-compressed paths), but they're different from what I got, probably because of the different processors. I ran mine on my personal Mac, which has an Intel Core i9 x86_64 processor. |
|
@alamb Also, it is clear that the compressed path is much slower than non-compressed overall, so I took a look into the zstd compressed path and I found out that in
That's one extra allocation + one extra copy per buffer. Zstd actually provides Current code (alloc --> compress --> copy)#[cfg(feature = "zstd")]
fn compress_zstd(
input: &[u8],
output: &mut Vec<u8>,
context: &mut CompressionContext,
) -> Result<(), ArrowError> {
let result = context.zstd_compressor().compress(input)?;
output.extend_from_slice(&result);
Ok(())
}New approach (compress --> direct write)#[cfg(feature = "zstd")]
fn compress_zstd(
input: &[u8],
output: &mut Vec<u8>,
context: &mut CompressionContext,
) -> Result<(), ArrowError> {
use zstd_safe::compress_bound;
let compressor = context.zstd_compressor();
// Compute maximum compressed size
let bound = compress_bound(input.len());
// Reserve space and extend buffer to allow in-place write
let offset = output.len();
output.resize(offset + bound, 0);
// Compress directly into output buffer
let written = compressor
.compress_to_buffer(input, &mut output[offset..])
.map_err(|e| ArrowError::ExternalError(Box::new(e)))?;
// Truncate to actual compressed size
output.truncate(offset + written);
Ok(())
}I'll make this modification later today and re-test |
Thanks @pchintar I'll mark this PR as draft -- please let me know when it is ready for a look |
|
Or maaybe I misunderstood your intent -- do you want me to review this PR now? Or shall I wait for your next proposal? |
|
Hi @alamb, So, I took a closer look at the compress_to_vec(buffer, ...) // ~52k calls to compress_to_vec per `ipc_writer` runRight now the flow is strictly serial: i.e. Since buffers are independent, I’m considering restructuring this to: Conceptually: This would keep the same IPC format and compression behavior, while avoiding any output-size tradeoff. Implementation-wise, something like: let parallelism = thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(1)
.min(4);Then process bounded chunks: for chunk in pending_buffers.chunks(parallelism) {
let compressed = compress_chunk_in_parallel(chunk)?;
append_in_original_order(compressed)?;
}where each worker owns its compression context: let mut ctx = CompressionContext::default();
codec.compress_to_vec(buffer.as_slice(), &mut out, &mut ctx)?;Would this kind of bounded per-batch parallelism be acceptable in Thanks! |
I don't think adding parallelism (eg. threasd) to the underlying library calls acceptable as it comes with threading and memory overhead (more buffering) that are not appropriate for all usecases What probably would be useful is if you could find some way (maybe an example) to show people who wanted to make this tradeoff how it could be done |
|
It seems like hte approach of this PR (avoid allocations) still is a benefit -- do you think it is something worth pursuing (aka should I find time to review this PR?) Thank you for your help (as always) |

Which issue does this PR close?
Rationale for this change
The current IPC writer path performs repeated heap allocations and buffer copies for every record batch, even when writing batches with identical structure.
In
arrow-ipc/src/writer.rs, the writer path is structured as:The key issue is that
EncodedDataowns its buffers:This forces:
Vec<u8>buffers per batchto_vec()intoVec<u8>This leads to unnecessary latency overhead in common scenarios such as iterative or high-frequency batch writes. The issue is purely performance-related and does not affect correctness.
What changes are included in this PR?
This PR introduces a private, writer-specific fast path for non-dictionary batches in
StreamWriterandFileWriter:Reuses a writer-owned scratch structure across batches:
FlatBufferBuilderarrow_databuffernodes,buffers,variadic_buffer_counts)Avoids copying flatbuffer data by writing directly from
finished_data()Bypasses the
EncodedDatapath for non-dictionary batches while preserving it for dictionary casesAdds a helper to detect dictionary usage and safely fall back to the existing implementation when needed
Are these changes tested?
Yes.
ipc_writer(cargo bench -p arrow-ipc --bench ipc_writer --features zstd -- --sample-size 50) show improvements for non-compressed workloadscargo test -p arrow-ipc --lib writer) pass without modification, confirming overall correctness and compatibility.Are there any user-facing changes?
No.