Skip to content

Commit 5710e83

Browse files
committed
Re-instate (Decompressor).DecompressedSize optimization
This is a) for parity with how gRPC v1.56.3 worked. But also it showed up as a giant regression in allocations, as we were now going through `io.Copy` which allocates a temporary 32KiB buffer. Our payloads are often much smaller.
1 parent 3093b97 commit 5710e83

File tree

1 file changed

+24
-18
lines changed

1 file changed

+24
-18
lines changed

rpc_util.go

Lines changed: 24 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"compress/gzip"
2323
"context"
2424
"encoding/binary"
25+
"errors"
2526
"fmt"
2627
"io"
2728
"math"
@@ -880,24 +881,29 @@ func decompress(compressor encoding.Compressor, d mem.BufferSlice, maxReceiveMes
880881
// TODO: Can/should this still be preserved with the new BufferSlice API? Are
881882
// there any actual benefits to allocating a single large buffer instead of
882883
// multiple smaller ones?
883-
//if sizer, ok := compressor.(interface {
884-
// DecompressedSize(compressedBytes []byte) int
885-
//}); ok {
886-
// if size := sizer.DecompressedSize(d); size >= 0 {
887-
// if size > maxReceiveMessageSize {
888-
// return nil, size, nil
889-
// }
890-
// // size is used as an estimate to size the buffer, but we
891-
// // will read more data if available.
892-
// // +MinRead so ReadFrom will not reallocate if size is correct.
893-
// //
894-
// // TODO: If we ensure that the buffer size is the same as the DecompressedSize,
895-
// // we can also utilize the recv buffer pool here.
896-
// buf := bytes.NewBuffer(make([]byte, 0, size+bytes.MinRead))
897-
// bytesRead, err := buf.ReadFrom(io.LimitReader(dcReader, int64(maxReceiveMessageSize)+1))
898-
// return buf.Bytes(), int(bytesRead), err
899-
// }
900-
//}
884+
if sizer, ok := compressor.(interface {
885+
DecompressedSize(compressedBytes io.Reader) int
886+
}); ok {
887+
if size := sizer.DecompressedSize(d.Reader()); size >= 0 {
888+
if size > maxReceiveMessageSize {
889+
return nil, size, nil
890+
}
891+
// DecompressedSize needs to be exact.
892+
buf := pool.Get(size)
893+
if _, err := io.ReadFull(dcReader, *buf); err != nil {
894+
return nil, 0, err
895+
}
896+
// This read is crucial because some compressors optimize for having
897+
// been read entirely, for example by releasing memory back to a pool.
898+
if _, err := dcReader.Read(nil); err != io.EOF {
899+
if err == nil {
900+
err = errors.New("read after decompressed size")
901+
}
902+
return nil, 0, err
903+
}
904+
return mem.BufferSlice{mem.NewBuffer(buf, pool)}, size, nil
905+
}
906+
}
901907

902908
var out mem.BufferSlice
903909
_, err = io.Copy(mem.NewWriter(&out, pool), io.LimitReader(dcReader, int64(maxReceiveMessageSize)+1))

0 commit comments

Comments
 (0)