forked from pachyderm/pachyderm
/
pbutil.go
93 lines (80 loc) · 2 KB
/
pbutil.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
package pbutil
import (
"encoding/binary"
"io"
"unsafe"
"github.com/gogo/protobuf/proto"
)
// Reader is io.Reader for proto.Message instead of []byte.
type Reader interface {
Read(val proto.Message) error
ReadBytes() ([]byte, error)
}
// Writer is io.Writer for proto.Message instead of []byte.
type Writer interface {
Write(val proto.Message) (int64, error)
WriteBytes([]byte) (int64, error)
}
// ReadWriter is io.ReadWriter for proto.Message instead of []byte.
type ReadWriter interface {
Reader
Writer
}
type readWriter struct {
w io.Writer
r io.Reader
buf []byte
}
func (r *readWriter) ReadBytes() ([]byte, error) {
var l int64
if err := binary.Read(r.r, binary.LittleEndian, &l); err != nil {
return nil, err
}
if r.buf == nil || len(r.buf) < int(l) {
r.buf = make([]byte, l)
}
buf := r.buf[0:l]
if _, err := io.ReadFull(r.r, buf); err != nil {
if err == io.EOF {
return nil, io.ErrUnexpectedEOF
}
return nil, err
}
return buf, nil
}
// Read reads val from r.
func (r *readWriter) Read(val proto.Message) error {
buf, err := r.ReadBytes()
if err != nil {
return err
}
return proto.Unmarshal(buf, val)
}
func (r *readWriter) WriteBytes(bytes []byte) (int64, error) {
if err := binary.Write(r.w, binary.LittleEndian, int64(len(bytes))); err != nil {
return 0, err
}
lenByteSize := unsafe.Sizeof(int64(len(bytes)))
n, err := r.w.Write(bytes)
return int64(lenByteSize) + int64(n), err
}
// Write writes val to r.
func (r *readWriter) Write(val proto.Message) (int64, error) {
bytes, err := proto.Marshal(val)
if err != nil {
return 0, err
}
return r.WriteBytes(bytes)
}
// NewReader returns a new Reader with r as its source.
func NewReader(r io.Reader) Reader {
return &readWriter{r: r}
}
// NewWriter returns a new Writer with w as its sink.
func NewWriter(w io.Writer) Writer {
return &readWriter{w: w}
}
// NewReadWriter returns a new ReadWriter with rw as both its source and its sink.
func NewReadWriter(rw io.ReadWriter) ReadWriter {
return &readWriter{r: rw, w: rw}
}