Skip to content

Commit

Permalink
Implement new Codec that uses [][]byte instead of []byte
Browse files Browse the repository at this point in the history
This new codec, as outlined in grpc#6619 will allow the reuse of buffers to their fullest extent. Note that this deliberately does not (yet) implement support for stathandlers, however all the relevant APIs have been updated and both old and new Codec implementations are supported.
  • Loading branch information
PapaCharlie committed Apr 10, 2024
1 parent 0baa668 commit 12bf91d
Show file tree
Hide file tree
Showing 21 changed files with 852 additions and 340 deletions.
75 changes: 68 additions & 7 deletions codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,77 @@ import (
_ "google.golang.org/grpc/encoding/proto" // to register the Codec for "proto"
)

// baseCodec contains the functionality of both Codec and encoding.Codec, but
// omits the name/string, which vary between the two and are not needed for
// anything besides the registry in the encoding package.
// baseCodec captures the new encoding.CodecV2 interface without the Name
// function, allowing it to be implemented by older Codec and encoding.Codec
// implementations. The omitted Name function is only needed for the register in
// the encoding package and is not part of the core functionality.
type baseCodec interface {
Marshal(v any) ([]byte, error)
Unmarshal(data []byte, v any) error
Marshal(v any) (encoding.BufferSlice, error)
Unmarshal(data encoding.BufferSlice, v any) error
}

type namedBaseCodec interface {
baseCodec
Name() string
}

func getCodec(name string) namedBaseCodec {
codecV2 := encoding.GetCodecV2(name)
if codecV2 != nil {
return codecV2
}

codecV1 := encoding.GetCodec(name)
if codecV1 != nil {
return newCodecV1Bridge(codecV1)
}

return nil
}

func newCodecV0Bridge(c Codec) baseCodec {
return codecV0Bridge{c}
}

func newCodecV1Bridge(c encoding.Codec) namedBaseCodec {
return codecV1Bridge{
codecV0Bridge: codecV0Bridge{c},
name: c.Name(),
}
}

var _ baseCodec = Codec(nil)
var _ baseCodec = encoding.Codec(nil)
var _ baseCodec = codecV0Bridge{}

type codecV0Bridge struct {
codec interface {
Marshal(v any) ([]byte, error)
Unmarshal(data []byte, v any) error
}
}

func (c codecV0Bridge) Marshal(v any) (encoding.BufferSlice, error) {
data, err := c.codec.Marshal(v)
if err != nil {
return nil, err
} else {
return encoding.BufferSlice{encoding.NewBuffer(data, nil)}, nil
}
}

func (c codecV0Bridge) Unmarshal(data encoding.BufferSlice, v any) (err error) {
return c.codec.Unmarshal(data.Materialize(), v)
}

var _ namedBaseCodec = codecV1Bridge{}

type codecV1Bridge struct {
codecV0Bridge
name string
}

func (c codecV1Bridge) Name() string {
return c.name
}

// Codec defines the interface gRPC uses to encode and decode messages.
// Note that implementations of this interface must be thread safe;
Expand Down
3 changes: 2 additions & 1 deletion dialoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"google.golang.org/grpc/channelz"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/encoding"
"google.golang.org/grpc/internal"
internalbackoff "google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/binarylog"
Expand Down Expand Up @@ -653,7 +654,7 @@ func defaultDialOptions() dialOptions {
bs: internalbackoff.DefaultExponential,
healthCheckFunc: internal.HealthCheckFunc,
idleTimeout: 30 * time.Minute,
recvBufferPool: nopBufferPool{},
recvBufferPool: encoding.NopBufferPool{},
defaultScheme: "dns",
}
}
Expand Down
138 changes: 138 additions & 0 deletions encoding/buffers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package encoding

import (
"io"
"sync/atomic"
)

type BufferSlice []*Buffer

type Buffer struct {
data []byte
refs atomic.Int32
free func([]byte)
}

type BufferProvider interface {
GetAndSetBuffer(length int, write func([]byte)) *Buffer
}

type BufferProviderFunc func(length int, write func(b []byte)) *Buffer

func (b BufferProviderFunc) GetAndSetBuffer(length int, write func([]byte)) *Buffer {
return b(length, write)
}

var NoopBufferProvider BufferProvider = BufferProviderFunc(func(length int, write func(b []byte)) *Buffer {
buf := make([]byte, length)
write(buf)
return NewBuffer(buf, nil)
})

func NewBuffer(data []byte, free func([]byte)) *Buffer {
return (&Buffer{data: data, free: free}).Ref()
}

func (b *Buffer) ReadOnlyData() []byte {
return b.data
}

func (b *Buffer) Ref() *Buffer {
b.refs.Add(1)
return b
}

func (b *Buffer) Free() {
if b.refs.Add(-1) == 0 && b.free != nil {
b.free(b.data)
b.data = nil
}
}

type Writer struct {
buffers *BufferSlice
provider BufferProvider
}

func (s *Writer) Write(p []byte) (n int, err error) {
*s.buffers = append(*s.buffers, s.provider.GetAndSetBuffer(len(p), func(b []byte) { copy(b, p) }))

return len(p), nil
}

func NewWriter(buffers *BufferSlice, provider BufferProvider) *Writer {
return &Writer{buffers: buffers, provider: provider}
}

type Reader struct {
data BufferSlice
len int
idx int
}

func (r *Reader) Len() int {
return r.len
}

func (r *Reader) Read(buf []byte) (n int, _ error) {
for len(buf) != 0 && r.len != 0 {
data := r.data[r.idx].ReadOnlyData()
copied := copy(buf, data[r.idx:])
r.len -= copied

buf = buf[copied:]

if copied == len(data) {
r.data = r.data[1:]
r.idx = 0
} else {
r.idx += copied
}
n += copied
}

if n == 0 {
return 0, io.EOF
}

return n, nil
}

func (s BufferSlice) Reader() *Reader {
return &Reader{
data: s,
len: s.Len(),
}
}

func (s BufferSlice) Len() (length int) {
for _, b := range s {
length += len(b.ReadOnlyData())
}
return length
}

func (s BufferSlice) Ref() {
for _, b := range s {
b.Ref()
}
}

func (s BufferSlice) Free() {
for _, b := range s {
b.Free()
}
}

func (s BufferSlice) WriteTo(out []byte) {
out = out[:0]
for _, b := range s {
out = append(out, b.ReadOnlyData()...)
}
}

func (s BufferSlice) Materialize() []byte {
out := make([]byte, s.Len())
s.WriteTo(out)
return out
}
73 changes: 73 additions & 0 deletions encoding/encoding_v2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
*
* Copyright 2017 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package encoding

import (
"strings"
)

// CodecV2 defines the interface gRPC uses to encode and decode messages. Note
// that implementations of this interface must be thread safe; a CodecV2's
// methods can be called from concurrent goroutines.
type CodecV2 interface {
Marshal(v any) (out BufferSlice, err error)
Unmarshal(data BufferSlice, v any) error
// Name returns the name of the Codec implementation. The returned string
// will be used as part of content type in transmission. The result must be
// static; the result cannot change between calls.
Name() string
}

var registeredV2Codecs = make(map[string]CodecV2)

// RegisterCodecV2 registers the provided CodecV2 for use with all gRPC clients and
// servers.
//
// The CodecV2 will be stored and looked up by result of its Name() method, which
// should match the content-subtype of the encoding handled by the CodecV2. This
// is case-insensitive, and is stored and looked up as lowercase. If the
// result of calling Name() is an empty string, RegisterCodecV2 will panic. See
// Content-Type on
// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
// more details.
//
// If both a Codec and CodecV2 are registered with the same name, the CodecV2
// will be used.
//
// NOTE: this function must only be called during initialization time (i.e. in
// an init() function), and is not thread-safe. If multiple Codecs are
// registered with the same name, the one registered last will take effect.
func RegisterCodecV2(codec CodecV2) {
if codec == nil {
panic("cannot register a nil CodecV2")
}
if codec.Name() == "" {
panic("cannot register CodecV2 with empty string result for Name()")
}
contentSubtype := strings.ToLower(codec.Name())
registeredV2Codecs[contentSubtype] = codec
}

// GetCodecV2 gets a registered CodecV2 by content-subtype, or nil if no CodecV2 is
// registered for the content-subtype.
//
// The content-subtype is expected to be lowercase.
func GetCodecV2(contentSubtype string) CodecV2 {
return registeredV2Codecs[contentSubtype]
}
73 changes: 73 additions & 0 deletions encoding/proto/proto_v2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package proto

import (
"fmt"

"google.golang.org/grpc/encoding"
"google.golang.org/protobuf/proto"
)

func init() {
encoding.RegisterCodecV2(&codecV2{SharedBufferPool: encoding.NewSharedBufferPool()})
}

// codec is a CodecV2 implementation with protobuf. It is the default codec for
// gRPC.
type codecV2 struct {
encoding.SharedBufferPool
}

var _ encoding.CodecV2 = (*codecV2)(nil)

func (c *codecV2) Marshal(v any) (encoding.BufferSlice, error) {
vv := messageV2Of(v)
if vv == nil {
return nil, fmt.Errorf("proto: failed to marshal, message is %T, want proto.Message", v)
}

buf := c.Get(proto.Size(vv))
_, err := proto.MarshalOptions{}.MarshalAppend(buf[:0], vv)
if err != nil {
c.Put(buf)
return nil, err
} else {
return encoding.BufferSlice{encoding.NewBuffer(buf, c.Put)}, nil
}
}

func (c *codecV2) Unmarshal(data encoding.BufferSlice, v any) (err error) {
vv := messageV2Of(v)
if vv == nil {
return fmt.Errorf("failed to unmarshal, message is %T, want proto.Message", v)
}

defer data.Free()

buf := c.Get(data.Len())
defer c.Put(buf)
data.WriteTo(buf)
// TODO: Upgrade proto.Unmarshal to support encoding.BufferSlice
return proto.Unmarshal(buf, vv)
}

func (c *codecV2) Name() string {
return Name
}
Loading

0 comments on commit 12bf91d

Please sign in to comment.