-
Notifications
You must be signed in to change notification settings - Fork 5
/
packet-rw.go
149 lines (126 loc) · 3.2 KB
/
packet-rw.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
package srpc
import (
"bytes"
"context"
"encoding/binary"
"io"
"sync"
"github.com/pkg/errors"
)
// maxMessageSize is the max message size in bytes
var maxMessageSize = 1e7
// PacketReadWriter reads and writes packets from a io.ReadWriter.
// Uses a LittleEndian uint32 length prefix.
type PacketReadWriter struct {
// rw is the io.ReadWriterCloser
rw io.ReadWriteCloser
// buf is the buffered data
buf bytes.Buffer
// writeMtx is the write mutex
writeMtx sync.Mutex
}
// NewPacketReadWriter constructs a new read/writer.
func NewPacketReadWriter(rw io.ReadWriteCloser) *PacketReadWriter {
return &PacketReadWriter{rw: rw}
}
// Write writes raw data to the remote.
func (r *PacketReadWriter) Write(p []byte) (n int, err error) {
r.writeMtx.Lock()
defer r.writeMtx.Unlock()
return r.rw.Write(p)
}
// WritePacket writes a packet to the writer.
func (r *PacketReadWriter) WritePacket(p *Packet) error {
r.writeMtx.Lock()
defer r.writeMtx.Unlock()
msgSize := p.SizeVT()
data := make([]byte, 4+msgSize)
binary.LittleEndian.PutUint32(data, uint32(msgSize))
_, err := p.MarshalToSizedBufferVT(data[4:])
if err != nil {
return err
}
var written, n int
for written < len(data) {
n, err = r.rw.Write(data)
if err != nil {
return err
}
written += n
}
return nil
}
// ReadPump executes the read pump in a goroutine.
//
// calls the handler when closed or returning an error
func (r *PacketReadWriter) ReadPump(cb PacketDataHandler, closed CloseHandler) {
err := r.ReadToHandler(cb)
// signal that the stream is now closed.
if closed != nil {
closed(err)
}
}
// ReadToHandler reads data to the given handler.
// Does not handle closing the stream, use ReadPump instead.
func (r *PacketReadWriter) ReadToHandler(cb PacketDataHandler) error {
var currLen uint32
buf := make([]byte, 2048)
isOpen := true
for isOpen {
// read some data into the buffer
n, err := r.rw.Read(buf)
if err != nil {
if err == io.EOF || err == context.Canceled {
isOpen = false
} else {
return err
}
}
// push the data to r.buf
_, err = r.buf.Write(buf[:n])
if err != nil {
return err
}
EmitIfEnough:
// check if we have enough data for a length prefix
bufLen := r.buf.Len()
if bufLen < 4 {
continue
}
// parse the length prefix if not done already
if currLen == 0 {
currLen = r.readLengthPrefix(r.buf.Bytes()[:4])
if currLen == 0 {
return errors.New("unexpected zero len prefix")
}
if currLen > uint32(maxMessageSize) {
return errors.Errorf("message size %v greater than maximum %v", currLen, maxMessageSize)
}
}
// emit the packet if fully buffered
if currLen != 0 && bufLen >= int(currLen)+4 {
pkt := r.buf.Next(int(currLen + 4))[4:]
currLen = 0
if err := cb(pkt); err != nil {
return err
}
// check if there's still enough in the buffer
goto EmitIfEnough
}
}
// closed
return nil
}
// Close closes the packet rw.
func (r *PacketReadWriter) Close() error {
return r.rw.Close()
}
// readLengthPrefix reads the length prefix.
func (r *PacketReadWriter) readLengthPrefix(b []byte) uint32 {
if len(b) < 4 {
return 0
}
return binary.LittleEndian.Uint32(b)
}
// _ is a type assertion
var _ PacketWriter = (*PacketReadWriter)(nil)