Skip to content
This repository has been archived by the owner on Sep 1, 2023. It is now read-only.

Commit

Permalink
Reduce marshaling allocations with MarshalAppend (connectrpc#503)
Browse files Browse the repository at this point in the history
Use the `MarshalAppend` methods introduced in v1.31 of the protobuf
runtime to more aggressively reuse buffers from Connect's internal pool
(for both binary and JSON). This reduces allocations and substantially
improves throughput:

```
BenchmarkConnect/unary-12                   8054           1318347 ns/op        14414394 B/op        251 allocs/op
BenchmarkConnect/unary-12                 10448           1153636 ns/op         6115680 B/op        236 allocs/op
```

---------

Co-authored-by: Akshay Shah <akshay@akshayshah.org>
  • Loading branch information
emcfarlane and akshayjshah committed Jun 27, 2023
1 parent a93cd05 commit 46ff1ed
Show file tree
Hide file tree
Showing 10 changed files with 111 additions and 20 deletions.
8 changes: 4 additions & 4 deletions cmd/protoc-gen-connect-go/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,10 +234,10 @@ func generateClientInterface(g *protogen.GeneratedFile, service *protogen.Servic
g.P("//")
deprecated(g)
}
g.Annotate(names.Client, service.Location)
g.AnnotateSymbol(names.Client, protogen.Annotation{Location: service.Location})
g.P("type ", names.Client, " interface {")
for _, method := range service.Methods {
g.Annotate(names.Client+"."+method.GoName, method.Location)
g.AnnotateSymbol(names.Client+"."+method.GoName, protogen.Annotation{Location: method.Location})
leadingComments(
g,
method.Comments.Leading,
Expand Down Expand Up @@ -369,15 +369,15 @@ func generateServerInterface(g *protogen.GeneratedFile, service *protogen.Servic
g.P("//")
deprecated(g)
}
g.Annotate(names.Server, service.Location)
g.AnnotateSymbol(names.Server, protogen.Annotation{Location: service.Location})
g.P("type ", names.Server, " interface {")
for _, method := range service.Methods {
leadingComments(
g,
method.Comments.Leading,
isDeprecatedMethod(method),
)
g.Annotate(names.Server+"."+method.GoName, method.Location)
g.AnnotateSymbol(names.Server+"."+method.GoName, protogen.Annotation{Location: method.Location})
g.P(serverSignature(g, method))
}
g.P("}")
Expand Down
31 changes: 29 additions & 2 deletions codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,18 @@ type Codec interface {
Unmarshal([]byte, any) error
}

// marshalAppender is an extension to Codec for appending to a byte slice.
type marshalAppender interface {
Codec

// MarshalAppend marshals the given message and appends it to the given
// byte slice.
//
// MarshalAppend may expect a specific type of message, and will error if
// this type is not given.
MarshalAppend([]byte, any) ([]byte, error)
}

// stableCodec is an extension to Codec for serializing with stable output.
type stableCodec interface {
Codec
Expand Down Expand Up @@ -93,6 +105,14 @@ func (c *protoBinaryCodec) Marshal(message any) ([]byte, error) {
return proto.Marshal(protoMessage)
}

func (c *protoBinaryCodec) MarshalAppend(dst []byte, message any) ([]byte, error) {
protoMessage, ok := message.(proto.Message)
if !ok {
return nil, errNotProto(message)
}
return proto.MarshalOptions{}.MarshalAppend(dst, protoMessage)
}

func (c *protoBinaryCodec) Unmarshal(data []byte, message any) error {
protoMessage, ok := message.(proto.Message)
if !ok {
Expand Down Expand Up @@ -132,8 +152,15 @@ func (c *protoJSONCodec) Marshal(message any) ([]byte, error) {
if !ok {
return nil, errNotProto(message)
}
var options protojson.MarshalOptions
return options.Marshal(protoMessage)
return protojson.MarshalOptions{}.Marshal(protoMessage)
}

func (c *protoJSONCodec) MarshalAppend(dst []byte, message any) ([]byte, error) {
protoMessage, ok := message.(proto.Message)
if !ok {
return nil, errNotProto(message)
}
return protojson.MarshalOptions{}.MarshalAppend(dst, protoMessage)
}

func (c *protoJSONCodec) Unmarshal(binary []byte, message any) error {
Expand Down
28 changes: 28 additions & 0 deletions codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,34 @@ func TestCodecRoundTrips(t *testing.T) {
}
}

func TestAppendCodec(t *testing.T) {
t.Parallel()
makeRoundtrip := func(codec marshalAppender) func(string, int64) bool {
var data []byte
return func(text string, number int64) bool {
got := pingv1.PingRequest{}
want := pingv1.PingRequest{Text: text, Number: number}
data = data[:0]
var err error
data, err = codec.MarshalAppend(data, &want)
if err != nil {
t.Fatal(err)
}
err = codec.Unmarshal(data, &got)
if err != nil {
t.Fatal(err)
}
return proto.Equal(&got, &want)
}
}
if err := quick.Check(makeRoundtrip(&protoBinaryCodec{}), nil /* config */); err != nil {
t.Error(err)
}
if err := quick.Check(makeRoundtrip(&protoJSONCodec{}), nil /* config */); err != nil {
t.Error(err)
}
}

func TestStableCodec(t *testing.T) {
t.Parallel()
makeRoundtrip := func(codec stableCodec) func(map[string]string) bool {
Expand Down
50 changes: 41 additions & 9 deletions envelope.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,16 +67,10 @@ func (w *envelopeWriter) Marshal(message any) *Error {
}
return nil
}
raw, err := w.codec.Marshal(message)
if err != nil {
return errorf(CodeInternal, "marshal message: %w", err)
if appender, ok := w.codec.(marshalAppender); ok {
return w.marshalAppend(message, appender)
}
// We can't avoid allocating the byte slice, so we may as well reuse it once
// we're done with it.
buffer := bytes.NewBuffer(raw)
defer w.bufferPool.Put(buffer)
envelope := &envelope{Data: buffer}
return w.Write(envelope)
return w.marshal(message)
}

// Write writes the enveloped message, compressing as necessary. It doesn't
Expand Down Expand Up @@ -104,6 +98,44 @@ func (w *envelopeWriter) Write(env *envelope) *Error {
})
}

func (w *envelopeWriter) marshalAppend(message any, codec marshalAppender) *Error {
// Codec supports MarshalAppend; try to re-use a []byte from the pool.
buffer := w.bufferPool.Get()
defer w.bufferPool.Put(buffer)
raw, err := codec.MarshalAppend(buffer.Bytes(), message)
if err != nil {
return errorf(CodeInternal, "marshal message: %w", err)
}
if cap(raw) > buffer.Cap() {
// The buffer from the pool was too small, so MarshalAppend grew the slice.
// Pessimistically assume that the too-small buffer is insufficient for the
// application workload, so there's no point in keeping it in the pool.
// Instead, replace it with the larger, newly-allocated slice. This
// allocates, but it's a small, constant-size allocation.
*buffer = *bytes.NewBuffer(raw)
} else {
// MarshalAppend didn't allocate, but we need to fix the internal state of
// the buffer. Compared to replacing the buffer (as above), buffer.Write
// copies but avoids allocating.
buffer.Write(raw)
}
envelope := &envelope{Data: buffer}
return w.Write(envelope)
}

func (w *envelopeWriter) marshal(message any) *Error {
// Codec doesn't support MarshalAppend; let Marshal allocate a []byte.
raw, err := w.codec.Marshal(message)
if err != nil {
return errorf(CodeInternal, "marshal message: %w", err)
}
buffer := bytes.NewBuffer(raw)
// Put our new []byte into the pool for later reuse.
defer w.bufferPool.Put(buffer)
envelope := &envelope{Data: buffer}
return w.Write(envelope)
}

func (w *envelopeWriter) write(env *envelope) *Error {
prefix := [5]byte{}
prefix[0] = env.Flags
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@ go 1.19

require (
github.com/google/go-cmp v0.5.9
google.golang.org/protobuf v1.28.1
google.golang.org/protobuf v1.31.0
)
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,7 @@ golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8T
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175w=
google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
google.golang.org/protobuf v1.30.1-0.20230508203708-b8fc77060104 h1:3QvSsKaCYve39lBVTCnmryh9VdHhPfRJJXquZqsEqgI=
google.golang.org/protobuf v1.30.1-0.20230508203708-b8fc77060104/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8=
google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
2 changes: 1 addition & 1 deletion internal/gen/connect/collide/v1/collide.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion internal/gen/connect/import/v1/import.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion internal/gen/connect/ping/v1/ping.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion internal/gen/connectext/grpc/status/v1/status.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 46ff1ed

Please sign in to comment.