/
packet.go
executable file
·204 lines (156 loc) · 4.14 KB
/
packet.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
package nets
import (
"encoding/binary"
"fmt"
"sync"
"sync/atomic"
"github.com/0x00b/gobbq/engine/bytespool"
"github.com/0x00b/gobbq/engine/codec"
"github.com/0x00b/gobbq/proto/bbq"
)
// NOTE 如果需要通过chan 或者其他方式给其他协程使用,一定要retain,release
// NOTE 如果需要通过chan 或者其他方式给其他协程使用,一定要retain,release
// NOTE 如果需要通过chan 或者其他方式给其他协程使用,一定要retain,release
// Packet is a packet for sending data
type Packet struct {
refcount int32
Src *Conn // not nil indicates this is request packet
// header: 只能在packet的生命周期内使用
Header *bbq.Header
totalLen uint32
headerLen uint32
// data(header + body)
bytes *bytespool.Bytes
}
const (
minPacketBufferLen = bytespool.MinBufferCap
MaxPacketBodyLength = bytespool.MaxBufferCap
)
var (
packetEndian = binary.LittleEndian
packetPool = &sync.Pool{
New: func() any {
p := &Packet{
Header: &bbq.Header{},
}
return p
},
}
)
type PacketType uint8
const (
PacketRPC PacketType = 0x0
PacketSys PacketType = 0x1
)
var packetName = map[PacketType]string{
PacketRPC: "RPC",
PacketSys: "PING",
}
func (t PacketType) String() string {
if s, ok := packetName[t]; ok {
return s
}
return fmt.Sprintf("UNKNOWN_MESSAGE_TYPE_%d", uint8(t))
}
// Has reports whether f contains all (0 or more) flags in v.
func HasFlags(v, flags uint32) bool {
return (v & flags) == flags
}
const (
FlagDataChecksumIEEE uint32 = 0x01
)
// NewPacket 一定要记得 pkt.Release()
func NewPacket() *Packet {
pkt := packetPool.Get().(*Packet)
pkt.reset()
// xlog.Printf("pkt pool get: %d", unsafe.Pointer(pkt))
return pkt
}
func (p *Packet) String() string {
return fmt.Sprintf("[hdrlen:%d, totallen:%d, hdr[%s]", p.headerLen, p.totalLen, p.Header.String())
}
// 想持有pkt,需要自行Retain/Release
func (p *Packet) Retain() {
refcount := atomic.AddInt32(&p.refcount, 1)
_ = refcount
// xlog.Printf("retain pkt:%d, %d", unsafe.Pointer(p), refcount)
}
// Release releases the packet to packet pool
func (p *Packet) Release() {
refcount := atomic.AddInt32(&p.refcount, -1)
// xlog.Printf("release pkt:%d, %d", unsafe.Pointer(p), refcount)
if refcount == 0 {
// xlog.Printf("release pkt:%d, %d", unsafe.Pointer(p), unsafe.Pointer(p.bytes))
bytespool.Put(p.bytes)
packetPool.Put(p)
} else if refcount < 0 {
panic(fmt.Errorf("releasing packet with refcount=%d", p.refcount))
}
}
// WriteBytes appends slice of bytes to the end of packetBody
func (p *Packet) WriteBody(b []byte) error {
header, err := codec.DefaultCodec.Marshal(p.Header)
if err != nil {
return err
}
p.headerLen = 4 + uint32(len(header))
p.totalLen = p.headerLen + uint32(len(b))
data := p.extendPacketData(p.totalLen)
packetEndian.PutUint32(data, p.headerLen)
copy(data[4:p.headerLen], header)
copy(data[p.headerLen:p.totalLen], b)
return nil
}
func (p *Packet) Serialize() []byte {
pdata := p.Data()
return append(packetEndian.AppendUint32(nil, uint32(len(pdata))), pdata...)
}
// PacketBody returns the total packetBody of packet
func (p *Packet) PacketBody() []byte {
if p.bytes == nil {
return nil
}
return p.bytes.Bytes()[p.headerLen:p.totalLen]
}
// PacketCap returns the current packetBody capacity
func (p *Packet) GetPacketCap() uint32 {
if p.bytes == nil {
return 0
}
return uint32(cap(p.bytes.Bytes()))
}
func (p *Packet) reset() {
p.Header.Reset()
p.Src = nil
p.headerLen = 0
p.totalLen = 0
p.refcount = 1
p.bytes = nil
}
func (p *Packet) Data() []byte {
if p.bytes == nil {
return nil
}
return p.bytes.Bytes()[:p.totalLen]
}
// 返回的结果是在header的buf之后
func (p *Packet) extendPacketData(size uint32) []byte {
if size > MaxPacketBodyLength {
panic(errPacketBodyTooLarge)
}
oldCap := p.GetPacketCap()
if size <= oldCap { // most case
return p.Data()
}
// get new buffer
bs := bytespool.Get(size)
if bs == nil {
panic("bytespool get bytes error")
}
// xlog.Printf("bytes pool get: %d %d", unsafe.Pointer(p), unsafe.Pointer(bs))
// copy(bs.Bytes(), p.Data())
oldBytes := p.bytes
p.bytes = bs
bytespool.Put(oldBytes)
return p.Data()
}