Skip to content

Commit

Permalink
client: add an option to provide a buffer pool for message encoding
Browse files Browse the repository at this point in the history
Users may now provide a `SharedBufferPool` for the purpose of encoding
messages.
  • Loading branch information
HippoBaro committed Sep 10, 2023
1 parent af4d12a commit e6aeb6d
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 47 deletions.
37 changes: 37 additions & 0 deletions rpc_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ type callInfo struct {
codec baseCodec
maxRetryRPCBufferSize int
onFinish []func(err error)
encoderBufferPool SharedBufferPool
}

func defaultCallInfo() *callInfo {
Expand Down Expand Up @@ -559,6 +560,42 @@ func (o MaxRetryRPCBufferSizeCallOption) before(c *callInfo) error {
}
func (o MaxRetryRPCBufferSizeCallOption) after(c *callInfo, attempt *csAttempt) {}

// ClientEncoderBufferPool is a CallOption to provide a SharedBufferPool
// used for the purpose of encoding messages. Buffers from this pool are
// used when encoding messages and returned once they have been transmitted
// over the network to be reused.
//
// Note that a compatible encoding.Codec is needed for buffer reuse. See
// encoding.BufferedCodec for additional details. If a non-compatible codec
// is used, buffer reuse will not apply.
//
// # Experimental
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release.
func ClientEncoderBufferPool(bufferPool SharedBufferPool) CallOption {
return EncoderBufferPoolCallOption{BufferPool: bufferPool}
}

// EncoderBufferPoolCallOption is a CallOption providing a SharedBufferPool
// used for the purpose of encoding messages.Buffers from this pool are
// used when encoding messages and returned once they have been transmitted
// over the network to be reused.
//
// # Experimental
//
// Notice: This type is EXPERIMENTAL and may be changed or removed in a
// later release.
type EncoderBufferPoolCallOption struct {
BufferPool SharedBufferPool
}

func (o EncoderBufferPoolCallOption) before(c *callInfo) error {
c.encoderBufferPool = o.BufferPool
return nil
}
func (o EncoderBufferPoolCallOption) after(c *callInfo, attempt *csAttempt) {}

// The format of the payload: compressed or not?
type payloadFormat uint8

Expand Down
98 changes: 51 additions & 47 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,19 +312,20 @@ func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *Client
}

cs := &clientStream{
callHdr: callHdr,
ctx: ctx,
methodConfig: &mc,
opts: opts,
callInfo: c,
cc: cc,
desc: desc,
codec: c.codec,
cp: cp,
comp: comp,
cancel: cancel,
firstAttempt: true,
onCommit: onCommit,
callHdr: callHdr,
ctx: ctx,
methodConfig: &mc,
opts: opts,
callInfo: c,
cc: cc,
desc: desc,
codec: c.codec,
cp: cp,
comp: comp,
cancel: cancel,
firstAttempt: true,
onCommit: onCommit,
encoderBufferPool: c.encoderBufferPool,
}
if !cc.dopts.disableRetry {
cs.retryThrottler = cc.retryThrottler.Load().(*retryThrottler)
Expand Down Expand Up @@ -559,10 +560,11 @@ type clientStream struct {
// place where we need to check if the attempt is nil.
attempt *csAttempt
// TODO(hedging): hedging will have multiple attempts simultaneously.
committed bool // active attempt committed for retry?
onCommit func()
buffer []func(a *csAttempt) error // operations to replay on retry
bufferSize int // current size of buffer
committed bool // active attempt committed for retry?
onCommit func()
buffer []func(a *csAttempt) error // operations to replay on retry
bufferSize int // current size of buffer
encoderBufferPool SharedBufferPool
}

// csAttempt implements a single transport stream attempt within a
Expand Down Expand Up @@ -1250,17 +1252,18 @@ func newNonRetryClientStream(ctx context.Context, desc *StreamDesc, method strin

// Use a special addrConnStream to avoid retry.
as := &addrConnStream{
callHdr: callHdr,
ac: ac,
ctx: ctx,
cancel: cancel,
opts: opts,
callInfo: c,
desc: desc,
codec: c.codec,
cp: cp,
comp: comp,
t: t,
callHdr: callHdr,
ac: ac,
ctx: ctx,
cancel: cancel,
opts: opts,
callInfo: c,
desc: desc,
codec: c.codec,
cp: cp,
comp: comp,
t: t,
encoderBufferPool: c.encoderBufferPool,
}

s, err := as.t.NewStream(as.ctx, as.callHdr)
Expand Down Expand Up @@ -1295,25 +1298,26 @@ func newNonRetryClientStream(ctx context.Context, desc *StreamDesc, method strin
}

type addrConnStream struct {
s *transport.Stream
ac *addrConn
callHdr *transport.CallHdr
cancel context.CancelFunc
opts []CallOption
callInfo *callInfo
t transport.ClientTransport
ctx context.Context
sentLast bool
desc *StreamDesc
codec baseCodec
cp Compressor
comp encoding.Compressor
decompSet bool
dc Decompressor
decomp encoding.Compressor
p *parser
mu sync.Mutex
finished bool
s *transport.Stream
ac *addrConn
callHdr *transport.CallHdr
cancel context.CancelFunc
opts []CallOption
callInfo *callInfo
t transport.ClientTransport
ctx context.Context
sentLast bool
desc *StreamDesc
codec baseCodec
cp Compressor
comp encoding.Compressor
decompSet bool
dc Decompressor
decomp encoding.Compressor
p *parser
mu sync.Mutex
finished bool
encoderBufferPool SharedBufferPool
}

func (as *addrConnStream) Header() (metadata.MD, error) {
Expand Down

0 comments on commit e6aeb6d

Please sign in to comment.