perf(table): Decrease memory pressure by releasing records#886
Conversation
laskoviymishka
left a comment
There was a problem hiding this comment.
Fair point. The defer scoping issue is real and obvious from reading the code, but the PR claims a memory improvement without any measurement. A benchmark would prove the claim and prevent regression.
Something like:
func BenchmarkFanoutMemory(b *testing.B) {
// Feed N large record batches through the fanout writer
// Report b.ReportAllocs() and track peak memory via runtime.MemStats
}
Doesn't need to be fancy, even feeding 100 batches through the fanout and showing that peak HeapInuse stays flat (new) vs grows linearly (old) would be enough.
The code change is obviously correct from reading it, but a benchmark makes the improvement concrete and guards against regression.
|
I agree with @laskoviymishka, let's add a benchmark or something to confirm and then this is good for me. |
|
Yep, that makes sense. Let me add a benchmark |
|
Added BenchmarkFanoutMemory in table/fanout_memory_bench_test.go to measure the improvement. The benchmark uses a gated RecordReader that feeds 100 batches (50K rows each) one at a time through a single fanout writer, measuring HeapInuse between sends via runtime.GC() + runtime.ReadMemStats(). ~25% reduction in peak live heap (p=0.002), no change in wall-clock time or total allocations |
BenchmarkFanoutMemory feeds 100 large record batches through a single fanout writer using a gated reader that controls when each batch enters the pipeline. Between sends, it forces GC and samples HeapInuse to measure peak live memory. This proves the processRecord refactor (moving defer record.Release() from the function-scoped fanout loop into processRecord) reduces peak heap by ~25% (p=0.002), confirming batches are released promptly instead of accumulating until the fanout function returns.
64ef8a9 to
f076760
Compare
The fanout function currently defer's
record.Release(). This isn't a big deal if the iterable it reads from contains a small amount of data. However, if it's a long lived streaming channel or contains a lot of data, the fanout function will hold on to **all ** the records until it processes all of them. This dramatically increases memory use, especially in a streaming use case.This PR extracts out the code to process each record into it's own function, and calls
defer record.Release()there. No functional modifications were made. This will release each record after successfully processing the data.