Skip to content

Commit

Permalink
Implement new Codec that uses [][]byte instead of []byte
Browse files Browse the repository at this point in the history
This new codec, as outlined in grpc#6619 will
allow the reuse of buffers to their fullest extent. Note that this deliberately
does not (yet) implement support for stathandlers, however all the relevant APIs
have been updated and both old and new Codec implementations are supported.
  • Loading branch information
PapaCharlie committed Apr 9, 2024
1 parent 0baa668 commit ed1ff3a
Show file tree
Hide file tree
Showing 17 changed files with 854 additions and 284 deletions.
152 changes: 152 additions & 0 deletions bufslice/bufslice.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package bufslice

import (
"io"
)

type BufferProvider interface {
GetBuffer(size int) []byte
ReturnBuffer([]byte)
}

type NoopBufferProvider struct{}

func (n NoopBufferProvider) GetBuffer(size int) []byte {
return make([]byte, size)
}

func (n NoopBufferProvider) ReturnBuffer(bytes []byte) {}

type Writer struct {
buffers *[][]byte
provider BufferProvider
}

func (s *Writer) appendNewBuffer(size int) []byte {
buf := s.provider.GetBuffer(size)[:0]
*s.buffers = append(*s.buffers, buf)
return buf
}

func (s *Writer) Write(p []byte) (n int, err error) {
n = len(p)

lastIdx := len(*s.buffers) - 1
var lastBuffer []byte
if lastIdx != -1 {
lastBuffer = (*s.buffers)[lastIdx]
}

if lastIdx == -1 || cap(lastBuffer) == len(lastBuffer) {
lastBuffer = s.appendNewBuffer(len(p))
lastIdx++
}

if availableCapacity := cap(lastBuffer) - len(lastBuffer); availableCapacity < len(p) {
(*s.buffers)[lastIdx] = append(lastBuffer, p[:availableCapacity]...)
p = p[availableCapacity:]

lastBuffer = s.appendNewBuffer(len(p) - availableCapacity)
lastIdx++
}

(*s.buffers)[lastIdx] = append(lastBuffer, p...)

return n, nil
}

func (s *Writer) ReadFrom(r io.Reader) (n int64, err error) {
// TODO(PapaCharlie): default to the max http2 frame size used by the underlying
// http/2 transport, however this can likely be improved. Maybe BufferProvider
// can optionally implement an interface that hints at the optimal buffer size?
const chunkSize = 16 * 1024

for {
buf := s.appendNewBuffer(chunkSize)
// Always maximize how much of the buffer is reused if the provider returned a
// larger buffer.
buf = buf[:cap(buf)]
read, err := r.Read(buf)
buf = buf[:read]
(*s.buffers)[len(*s.buffers)-1] = buf
n += int64(read)
if err != nil {
return n, err
}
}
}

func NewWriter(buffers *[][]byte, provider BufferProvider) *Writer {
return &Writer{buffers: buffers, provider: provider}
}

func Len(buffers [][]byte) (l int) {
for _, b := range buffers {
l += len(b)
}
return l
}

func WriteTo(buffers [][]byte, out []byte) {
out = out[:0]
for _, b := range buffers {
out = append(out, b...)
}
}

func Materialize(buffers [][]byte) []byte {
buf := make([]byte, 0, Len(buffers))
WriteTo(buffers, buf)
return buf
}

type Reader struct {
data [][]byte
len int
dataIdx, sliceIdx int
}

func (r *Reader) Data() [][]byte {
return r.data
}

func (r *Reader) Len() int {
return r.len
}

func (r *Reader) Read(buf []byte) (n int, _ error) {
for len(buf) != 0 && r.len != 0 {
data := r.data[r.dataIdx]
copied := copy(buf, data[r.sliceIdx:])
r.len -= copied

buf = buf[copied:]

if copied == len(data) {
r.dataIdx++
r.sliceIdx = 0
} else {
r.sliceIdx += copied
}
n += copied
}

if n == 0 {
return 0, io.EOF
}

return n, nil
}

func NewReader(data [][]byte) *Reader {
return &Reader{
data: data,
len: Len(data),
}
}

func ReturnAll(data [][]byte, provider BufferProvider) {
for _, b := range data {
provider.ReturnBuffer(b)
}
}
45 changes: 38 additions & 7 deletions codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,51 @@
package grpc

import (
"google.golang.org/grpc/bufslice"
"google.golang.org/grpc/encoding"
_ "google.golang.org/grpc/encoding/proto" // to register the Codec for "proto"
)

// baseCodec contains the functionality of both Codec and encoding.Codec, but
// omits the name/string, which vary between the two and are not needed for
// anything besides the registry in the encoding package.
// baseCodec captures the new encoding.CodecV2 interface without the Name
// function, allowing it to be implemented by older Codec and encoding.Codec
// implementations. The omitted Name function is only needed for the register in
// the encoding package and is not part of the core functionality.
type baseCodec interface {
Marshal(v any) ([]byte, error)
Unmarshal(data []byte, v any) error
Marshal(v any) ([][]byte, error)
Unmarshal(data [][]byte, v any) error
}

var _ baseCodec = Codec(nil)
var _ baseCodec = encoding.Codec(nil)
func getCodec(name string) baseCodec {
var codec baseCodec
codec = encoding.GetCodecV2(name)
if codec == nil {
codecV1 := encoding.GetCodec(name)
if codecV1 != nil {
codec = codecV1Bridge{codec: codecV1}
}
}
return codec
}

type codecV1Bridge struct {
codec interface {
Marshal(v any) ([]byte, error)
Unmarshal(data []byte, v any) error
}
}

func (c codecV1Bridge) Marshal(v any) ([][]byte, error) {
data, err := c.codec.Marshal(v)
if err != nil {
return nil, err
} else {
return [][]byte{data}, nil
}
}

func (c codecV1Bridge) Unmarshal(data [][]byte, v any) (err error) {
return c.codec.Unmarshal(bufslice.Materialize(data), v)
}

// Codec defines the interface gRPC uses to encode and decode messages.
// Note that implementations of this interface must be thread safe;
Expand Down
73 changes: 73 additions & 0 deletions encoding/encoding_v2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
*
* Copyright 2017 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package encoding

import (
"strings"
)

// CodecV2 defines the interface gRPC uses to encode and decode messages. Note
// that implementations of this interface must be thread safe; a CodecV2's
// methods can be called from concurrent goroutines.
type CodecV2 interface {
Marshal(v any) (out [][]byte, err error)
Unmarshal(data [][]byte, v any) error
// Name returns the name of the Codec implementation. The returned string
// will be used as part of content type in transmission. The result must be
// static; the result cannot change between calls.
Name() string
}

var registeredV2Codecs = make(map[string]CodecV2)

// RegisterCodecV2 registers the provided CodecV2 for use with all gRPC clients and
// servers.
//
// The CodecV2 will be stored and looked up by result of its Name() method, which
// should match the content-subtype of the encoding handled by the CodecV2. This
// is case-insensitive, and is stored and looked up as lowercase. If the
// result of calling Name() is an empty string, RegisterCodecV2 will panic. See
// Content-Type on
// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
// more details.
//
// If both a Codec and CodecV2 are registered with the same name, the CodecV2
// will be used.
//
// NOTE: this function must only be called during initialization time (i.e. in
// an init() function), and is not thread-safe. If multiple Codecs are
// registered with the same name, the one registered last will take effect.
func RegisterCodecV2(codec CodecV2) {
if codec == nil {
panic("cannot register a nil CodecV2")
}
if codec.Name() == "" {
panic("cannot register CodecV2 with empty string result for Name()")
}
contentSubtype := strings.ToLower(codec.Name())
registeredV2Codecs[contentSubtype] = codec
}

// GetCodecV2 gets a registered CodecV2 by content-subtype, or nil if no CodecV2 is
// registered for the content-subtype.
//
// The content-subtype is expected to be lowercase.
func GetCodecV2(contentSubtype string) CodecV2 {
return registeredV2Codecs[contentSubtype]
}
1 change: 1 addition & 0 deletions encoding/gzip/gzip.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func init() {
return &writer{Writer: gzip.NewWriter(io.Discard), pool: &c.poolCompressor}
}
encoding.RegisterCompressor(c)
//encoding.RegisterCompressorV2(&compressorV2{c: c})
}

type writer struct {
Expand Down
76 changes: 76 additions & 0 deletions encoding/proto/proto_v2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

// Package proto defines the protobuf codec. Importing this package will
// register the codec.
package proto

import (
"fmt"

"google.golang.org/grpc/bufslice"
"google.golang.org/grpc/encoding"
"google.golang.org/grpc/internal"
"google.golang.org/protobuf/proto"
)

func init() {
encoding.RegisterCodecV2(&codecV2{SharedBufferPool: internal.NewSharedBufferPool()})
}

// codec is an experimental.CodecV2 implementation with protobuf. It is the
// default codec for gRPC.
type codecV2 struct {
internal.SharedBufferPool
}

var _ encoding.CodecV2 = (*codecV2)(nil)
var _ bufslice.BufferProvider = (*codecV2)(nil)

func (c *codecV2) Marshal(v any) ([][]byte, error) {
vv := messageV2Of(v)
if vv == nil {
return nil, fmt.Errorf("proto: failed to marshal, message is %T, want proto.Message", v)
}

buf := c.GetBuffer(proto.Size(vv))
_, err := proto.MarshalOptions{}.MarshalAppend(buf[:0], vv)
if err != nil {
c.ReturnBuffer(buf)
return nil, err
} else {
return [][]byte{buf}, nil
}
}

func (c *codecV2) Unmarshal(data [][]byte, v any) (err error) {
vv := messageV2Of(v)
if vv == nil {
return fmt.Errorf("failed to unmarshal, message is %T, want proto.Message", v)
}

buf := c.GetBuffer(bufslice.Len(data))
defer c.ReturnBuffer(buf)
bufslice.WriteTo(data, buf)
// TODO: Upgrade proto.Unmarshal to support [][]byte
return proto.Unmarshal(buf, vv)
}

func (c *codecV2) Name() string {
return Name
}
Loading

0 comments on commit ed1ff3a

Please sign in to comment.