Skip to content

Commit

Permalink
codec/proto: allow reuse of marshal byte buffers
Browse files Browse the repository at this point in the history
Performance benchmarks can be found below. Obviously, a 10KB request and
10KB response is tailored to showcase this improvement as this is where
codec buffer re-use shines, but I've run other benchmarks too (like
1-byte requests and responses) and there's no discernable impact on
performance.

To no one's surprise, the number of bytes allocated per operation goes
down by almost exactly 10 KB across 60k+ queries, which suggests
excellent buffer re-use. The number of allocations itself increases by
5-ish, but that's probably because of a few additional slice pointers
that we need to store; these are 8-byte allocations and should have
virtually no impact on the allocator and garbage collector.

    streaming-networkMode_none-bufConn_false-keepalive_false-benchTime_10s-trace_false-latency_0s-kbps_0-MTU_0-maxConcurrentCalls_1-reqSize_10240B-respSize_10240B-compressor_off-channelz_false-preloader_false

	 Title       Before        After Percentage
      TotalOps        61821        65568     6.06%
       SendOps            0            0      NaN%
       RecvOps            0            0      NaN%
      Bytes/op    116033.83    105560.37    -9.03%
     Allocs/op       111.79       117.89     5.37%
       ReqT/op 506437632.00 537133056.00     6.06%
      RespT/op 506437632.00 537133056.00     6.06%
      50th-Lat    143.303µs    136.558µs    -4.71%
      90th-Lat    197.926µs    188.623µs    -4.70%
      99th-Lat    521.575µs    507.591µs    -2.68%
       Avg-Lat    161.294µs    152.038µs    -5.74%

Closes grpc#2816
  • Loading branch information
Adhityaa Chandrasekar committed Nov 8, 2019
1 parent 4717e3b commit 606cd73
Show file tree
Hide file tree
Showing 9 changed files with 111 additions and 56 deletions.
9 changes: 9 additions & 0 deletions codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,15 @@ type baseCodec interface {
Unmarshal(data []byte, v interface{}) error
}

// A bufferedBaseCodec is exactly like a baseCodec, but also requires a
// ReturnBuffer method to be implemented. Once a Marshal caller is done with
// the returned byte buffer, they can choose to return it back to the encoding
// library for re-use using this method.
type bufferedBaseCodec interface {
baseCodec
ReturnBuffer(buf []byte)
}

var _ baseCodec = Codec(nil)
var _ baseCodec = encoding.Codec(nil)

Expand Down
9 changes: 9 additions & 0 deletions encoding/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,15 @@ type Codec interface {
Name() string
}

// A BufferedCodec is exactly like a Codec, but also requires a ReturnBuffer
// method to be implemented. Once a Marshal caller is done with the returned
// byte buffer, they can choose to return it back to the encoding library for
// re-use using this method.
type BufferedCodec interface {
Codec
ReturnBuffer(buf []byte)
}

var registeredCodecs = make(map[string]Codec)

// RegisterCodec registers the provided Codec for use with all gRPC clients and
Expand Down
59 changes: 26 additions & 33 deletions encoding/proto/proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
package proto

import (
"math"
"sync"

"github.com/golang/protobuf/proto"
Expand All @@ -38,29 +37,16 @@ func init() {
// codec is a Codec implementation with protobuf. It is the default codec for gRPC.
type codec struct{}

type cachedProtoBuffer struct {
lastMarshaledSize uint32
proto.Buffer
}

func capToMaxInt32(val int) uint32 {
if val > math.MaxInt32 {
return uint32(math.MaxInt32)
}
return uint32(val)
}

func marshal(v interface{}, cb *cachedProtoBuffer) ([]byte, error) {
func marshal(v interface{}, pb *proto.Buffer) ([]byte, error) {
protoMsg := v.(proto.Message)
newSlice := make([]byte, 0, cb.lastMarshaledSize)
newSlice := returnBufferPool.Get().([]byte)

cb.SetBuf(newSlice)
cb.Reset()
if err := cb.Marshal(protoMsg); err != nil {
pb.SetBuf(newSlice)
pb.Reset()
if err := pb.Marshal(protoMsg); err != nil {
return nil, err
}
out := cb.Bytes()
cb.lastMarshaledSize = capToMaxInt32(len(out))
out := pb.Bytes()
return out, nil
}

Expand All @@ -70,12 +56,12 @@ func (codec) Marshal(v interface{}) ([]byte, error) {
return pm.Marshal()
}

cb := protoBufferPool.Get().(*cachedProtoBuffer)
out, err := marshal(v, cb)
pb := protoBufferPool.Get().(*proto.Buffer)
out, err := marshal(v, pb)

// put back buffer and lose the ref to the slice
cb.SetBuf(nil)
protoBufferPool.Put(cb)
pb.SetBuf(nil)
protoBufferPool.Put(pb)
return out, err
}

Expand All @@ -88,23 +74,30 @@ func (codec) Unmarshal(data []byte, v interface{}) error {
return pu.Unmarshal(data)
}

cb := protoBufferPool.Get().(*cachedProtoBuffer)
cb.SetBuf(data)
err := cb.Unmarshal(protoMsg)
cb.SetBuf(nil)
protoBufferPool.Put(cb)
pb := protoBufferPool.Get().(*proto.Buffer)
pb.SetBuf(data)
err := pb.Unmarshal(protoMsg)
pb.SetBuf(nil)
protoBufferPool.Put(pb)
return err
}

func (codec) ReturnBuffer(data []byte) {
returnBufferPool.Put(data[:0])
}

func (codec) Name() string {
return Name
}

var protoBufferPool = &sync.Pool{
New: func() interface{} {
return &cachedProtoBuffer{
Buffer: proto.Buffer{},
lastMarshaledSize: 16,
}
return &proto.Buffer{}
},
}

var returnBufferPool = &sync.Pool{
New: func() interface{} {
return make([]byte, 0, 16)
},
}
9 changes: 8 additions & 1 deletion internal/transport/controlbuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ type dataFrame struct {
d []byte
// onEachWrite is called every time
// a part of d is written out.
onEachWrite func()
onEachWrite func()
returnBuffer func()
}

func (*dataFrame) isTransportResponseFrame() bool { return false }
Expand Down Expand Up @@ -841,6 +842,9 @@ func (l *loopyWriter) processData() (bool, error) {
if err := l.framer.fr.WriteData(dataItem.streamID, dataItem.endStream, nil); err != nil {
return false, err
}
if dataItem.returnBuffer != nil {
dataItem.returnBuffer()
}
str.itl.dequeue() // remove the empty data item from stream
if str.itl.isEmpty() {
str.state = empty
Expand Down Expand Up @@ -907,6 +911,9 @@ func (l *loopyWriter) processData() (bool, error) {

if len(dataItem.h) == 0 && len(dataItem.d) == 0 { // All the data from that message was written out.
str.itl.dequeue()
if dataItem.returnBuffer != nil {
dataItem.returnBuffer()
}
}
if str.itl.isEmpty() {
str.state = empty
Expand Down
11 changes: 9 additions & 2 deletions internal/transport/http2_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -831,9 +831,16 @@ func (t *http2Client) Write(s *Stream, hdr []byte, data []byte, opts *Options) e
} else if s.getState() != streamActive {
return errStreamDone
}
var returnBuffer func()
if opts.ReturnBuffer != nil {
returnBuffer = func() {
opts.ReturnBuffer(data)
}
}
df := &dataFrame{
streamID: s.id,
endStream: opts.Last,
streamID: s.id,
endStream: opts.Last,
returnBuffer: returnBuffer,
}
if hdr != nil || data != nil { // If it's not an empty data frame.
// Add some data to grpc message header so that we can equally
Expand Down
16 changes: 12 additions & 4 deletions internal/transport/http2_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -906,12 +906,20 @@ func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) e
emptyLen = len(data)
}
hdr = append(hdr, data[:emptyLen]...)
origdata := data
data = data[emptyLen:]
var returnBuffer func()
if opts.ReturnBuffer != nil {
returnBuffer = func() {
opts.ReturnBuffer(origdata)
}
}
df := &dataFrame{
streamID: s.id,
h: hdr,
d: data,
onEachWrite: t.setResetPingStrikes,
streamID: s.id,
h: hdr,
d: data,
onEachWrite: t.setResetPingStrikes,
returnBuffer: returnBuffer,
}
if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
select {
Expand Down
3 changes: 3 additions & 0 deletions internal/transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,9 @@ type Options struct {
// Last indicates whether this write is the last piece for
// this stream.
Last bool
// If non-nil, ReturnBuffer should be called to return some allocated buffer
// back to a sync pool.
ReturnBuffer func([]byte)
}

// CallHdr carries the information of a particular RPC.
Expand Down
12 changes: 11 additions & 1 deletion server.go
Original file line number Diff line number Diff line change
Expand Up @@ -842,7 +842,8 @@ func (s *Server) incrCallsFailed() {
}

func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options, comp encoding.Compressor) error {
data, err := encode(s.getCodec(stream.ContentSubtype()), msg)
codec := s.getCodec(stream.ContentSubtype())
data, err := encode(codec, msg)
if err != nil {
grpclog.Errorln("grpc: server failed to encode response: ", err)
return err
Expand All @@ -852,6 +853,15 @@ func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Str
grpclog.Errorln("grpc: server failed to compress response: ", err)
return err
}

if bcodec, ok := codec.(bufferedBaseCodec); ok {
if compData != nil {
bcodec.ReturnBuffer(data)
} else {
opts.ReturnBuffer = bcodec.ReturnBuffer
}
}

hdr, payload := msgHeader(data, compData)
// TODO(dfawley): should we be checking len(data) instead?
if len(payload) > s.opts.maxSendMessageSize {
Expand Down
39 changes: 24 additions & 15 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -696,7 +696,7 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) {
}

// load hdr, payload, data
hdr, payload, data, err := prepareMsg(m, cs.codec, cs.cp, cs.comp)
hdr, payload, data, returnBuffer, err := prepareMsg(m, cs.codec, cs.cp, cs.comp)
if err != nil {
return err
}
Expand All @@ -707,7 +707,7 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) {
}
msgBytes := data // Store the pointer before setting to nil. For binary logging.
op := func(a *csAttempt) error {
err := a.sendMsg(m, hdr, payload, data)
err := a.sendMsg(m, hdr, payload, data, returnBuffer)
// nil out the message and uncomp when replaying; they are only needed for
// stats which is disabled for subsequent attempts.
m, data = nil, nil
Expand Down Expand Up @@ -833,7 +833,7 @@ func (cs *clientStream) finish(err error) {
cs.cancel()
}

func (a *csAttempt) sendMsg(m interface{}, hdr, payld, data []byte) error {
func (a *csAttempt) sendMsg(m interface{}, hdr, payld, data []byte, returnBuffer func([]byte)) error {
cs := a.cs
if a.trInfo != nil {
a.mu.Lock()
Expand All @@ -842,7 +842,8 @@ func (a *csAttempt) sendMsg(m interface{}, hdr, payld, data []byte) error {
}
a.mu.Unlock()
}
if err := a.t.Write(a.s, hdr, payld, &transport.Options{Last: !cs.desc.ClientStreams}); err != nil {

if err := a.t.Write(a.s, hdr, payld, &transport.Options{Last: !cs.desc.ClientStreams, ReturnBuffer: returnBuffer}); err != nil {
if !cs.desc.ClientStreams {
// For non-client-streaming RPCs, we return nil instead of EOF on error
// because the generated code requires it. finish is not called; RecvMsg()
Expand Down Expand Up @@ -1165,8 +1166,8 @@ func (as *addrConnStream) SendMsg(m interface{}) (err error) {
as.sentLast = true
}

// load hdr, payload, data
hdr, payld, _, err := prepareMsg(m, as.codec, as.cp, as.comp)
// load hdr, payload, data, returnBuffer
hdr, payld, _, returnBuffer, err := prepareMsg(m, as.codec, as.cp, as.comp)
if err != nil {
return err
}
Expand All @@ -1176,7 +1177,7 @@ func (as *addrConnStream) SendMsg(m interface{}) (err error) {
return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payld), *as.callInfo.maxSendMessageSize)
}

if err := as.t.Write(as.s, hdr, payld, &transport.Options{Last: !as.desc.ClientStreams}); err != nil {
if err := as.t.Write(as.s, hdr, payld, &transport.Options{Last: !as.desc.ClientStreams, ReturnBuffer: returnBuffer}); err != nil {
if !as.desc.ClientStreams {
// For non-client-streaming RPCs, we return nil instead of EOF on error
// because the generated code requires it. finish is not called; RecvMsg()
Expand Down Expand Up @@ -1408,8 +1409,8 @@ func (ss *serverStream) SendMsg(m interface{}) (err error) {
}
}()

// load hdr, payload, data
hdr, payload, data, err := prepareMsg(m, ss.codec, ss.cp, ss.comp)
// load hdr, payload, returnBuffer, data
hdr, payload, data, returnBuffer, err := prepareMsg(m, ss.codec, ss.cp, ss.comp)
if err != nil {
return err
}
Expand All @@ -1418,7 +1419,7 @@ func (ss *serverStream) SendMsg(m interface{}) (err error) {
if len(payload) > ss.maxSendMessageSize {
return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), ss.maxSendMessageSize)
}
if err := ss.t.Write(ss.s, hdr, payload, &transport.Options{Last: false}); err != nil {
if err := ss.t.Write(ss.s, hdr, payload, &transport.Options{Last: false, ReturnBuffer: returnBuffer}); err != nil {
return toRPCErr(err)
}
if ss.binlog != nil {
Expand Down Expand Up @@ -1510,20 +1511,28 @@ func MethodFromServerStream(stream ServerStream) (string, bool) {
// prepareMsg returns the hdr, payload and data
// using the compressors passed or using the
// passed preparedmsg
func prepareMsg(m interface{}, codec baseCodec, cp Compressor, comp encoding.Compressor) (hdr, payload, data []byte, err error) {
func prepareMsg(m interface{}, codec baseCodec, cp Compressor, comp encoding.Compressor) (hdr, payload, data []byte, returnBuffer func([]byte), err error) {
if preparedMsg, ok := m.(*PreparedMsg); ok {
return preparedMsg.hdr, preparedMsg.payload, preparedMsg.encodedData, nil
return preparedMsg.hdr, preparedMsg.payload, preparedMsg.encodedData, nil, nil
}
// The input interface is not a prepared msg.
// Marshal and Compress the data at this point
data, err = encode(codec, m)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}
compData, err := compress(data, cp, comp)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}

if bcodec, ok := codec.(bufferedBaseCodec); ok {
returnBuffer = bcodec.ReturnBuffer
if compData != nil {
bcodec.ReturnBuffer(data)
returnBuffer = nil
}
}
hdr, payload = msgHeader(data, compData)
return hdr, payload, data, nil
return hdr, payload, data, returnBuffer, nil
}

0 comments on commit 606cd73

Please sign in to comment.