-
Notifications
You must be signed in to change notification settings - Fork 0
/
xtcp_conn_pkg.go
166 lines (156 loc) · 5.16 KB
/
xtcp_conn_pkg.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
package xtcp
import (
"encoding/binary"
"fmt"
"github.com/go-xe2/x/core/exception"
"time"
)
const (
// 默认允许最大的简单协议包大小(byte), 65535 byte
mPKG_DEFAULT_MAX_DATA_SIZE = 65535
// 默认简单协议包头大小
mPKG_DEFAULT_HEADER_SIZE = 2
// 协议头最大大小
mPKG_MAX_HEADER_SIZE = 4
)
// 数据读取选项
type TPkgOption struct {
HeaderSize int // 自定义头大小(默认为2字节,最大不能超过4字节)
MaxDataSize int // (byte)数据读取的最大包大小,默认最大不能超过2字节(65535 byte)
Retry TRetry // 失败重试
}
// getTPkgOption wraps and returns the TPkgOption.
// If no option given, it returns a new option with default value.
func getPkgOption(option ...TPkgOption) (*TPkgOption, error) {
pkgOption := TPkgOption{}
if len(option) > 0 {
pkgOption = option[0]
}
if pkgOption.HeaderSize == 0 {
pkgOption.HeaderSize = mPKG_DEFAULT_HEADER_SIZE
}
if pkgOption.MaxDataSize == 0 {
pkgOption.MaxDataSize = mPKG_DEFAULT_MAX_DATA_SIZE
} else if pkgOption.MaxDataSize > 0xFFFFFF {
return nil, fmt.Errorf(`package size %d exceeds allowed max size %d`, pkgOption.MaxDataSize, 0xFFFFFF)
}
return &pkgOption, nil
}
// 根据简单协议发送数据包。
//
// 简单协议数据格式:数据长度(24bit)|数据字段(变长)。
//
// 注意:
// 1. "数据长度"仅为"数据字段"的长度,不包含头信息的长度字段3字节。
// 2. 由于"数据长度"为3字节,并且使用的BigEndian字节序,因此这里最后返回的buffer使用了buffer[1:]。
func (c *TConn) SendPkg(data []byte, option ...TPkgOption) error {
pkgOption, err := getPkgOption(option...)
if err != nil {
return err
}
length := len(data)
if length > pkgOption.MaxDataSize {
return fmt.Errorf(`data size %d exceeds max pkg size %d`, length, pkgOption.MaxDataSize)
}
offset := mPKG_MAX_HEADER_SIZE - pkgOption.HeaderSize
buffer := make([]byte, mPKG_MAX_HEADER_SIZE+len(data))
binary.BigEndian.PutUint32(buffer[0:], uint32(length))
copy(buffer[mPKG_MAX_HEADER_SIZE:], data)
if pkgOption.Retry.Count > 0 {
return c.Send(buffer[offset:], pkgOption.Retry)
}
//fmt.Println("SendPkg:", buffer[offset:])
return c.Send(buffer[offset:])
}
// 简单协议: 带超时时间的数据发送
func (c *TConn) SendPkgWithTimeout(data []byte, timeout time.Duration, option ...TPkgOption) (err error) {
if err := c.SetSendDeadline(time.Now().Add(timeout)); err != nil {
return err
}
defer func() {
err = exception.Wrap(c.SetSendDeadline(time.Time{}), "SetSendDeadline error")
}()
err = c.SendPkg(data, option...)
return
}
// 简单协议: 发送数据并等待接收返回数据
func (c *TConn) SendRecvPkg(data []byte, option ...TPkgOption) ([]byte, error) {
if err := c.SendPkg(data, option...); err == nil {
return c.RecvPkg(option...)
} else {
return nil, err
}
}
// 简单协议: 发送数据并等待接收返回数据(带返回超时等待时间)
func (c *TConn) SendRecvPkgWithTimeout(data []byte, timeout time.Duration, option ...TPkgOption) ([]byte, error) {
if err := c.SendPkg(data, option...); err == nil {
return c.RecvPkgWithTimeout(timeout, option...)
} else {
return nil, err
}
}
// 简单协议: 获取一个数据包。
func (c *TConn) RecvPkg(option ...TPkgOption) (result []byte, err error) {
var temp []byte
var length int
pkgOption, err := getPkgOption(option...)
if err != nil {
return nil, err
}
for {
// 先根据对象的缓冲区数据进行计算
for {
if len(c.buffer) >= pkgOption.HeaderSize {
// 不满足4个字节的uint32类型,因此这里"低位"补0
if length <= 0 {
switch pkgOption.HeaderSize {
case 1:
length = int(binary.BigEndian.Uint32([]byte{0, 0, 0, c.buffer[0]}))
case 2:
length = int(binary.BigEndian.Uint32([]byte{0, 0, c.buffer[0], c.buffer[1]}))
case 3:
length = int(binary.BigEndian.Uint32([]byte{0, c.buffer[0], c.buffer[1], c.buffer[2]}))
default:
length = int(binary.BigEndian.Uint32([]byte{c.buffer[0], c.buffer[1], c.buffer[2], c.buffer[3]}))
}
}
// 解析的大小是否符合规范,清空从该连接接收到的所有数据包
if length < 0 || length > pkgOption.MaxDataSize {
c.buffer = c.buffer[:0]
return nil, fmt.Errorf(`invalid package size %d`, length)
}
// 不满足包大小,需要继续读取
if len(c.buffer) < length+pkgOption.HeaderSize {
break
}
result = c.buffer[pkgOption.HeaderSize : pkgOption.HeaderSize+length]
c.buffer = c.buffer[pkgOption.HeaderSize+length:]
length = 0
return
} else {
break
}
}
// 读取系统socket当前缓冲区的数据
temp, err = c.Recv(0, pkgOption.Retry)
if err != nil {
break
}
if len(temp) > 0 {
c.buffer = append(c.buffer, temp...)
}
//fmt.Println("RecvPkg:", c.buffer)
}
return
}
// 简单协议: 带超时时间的消息包获取
func (c *TConn) RecvPkgWithTimeout(timeout time.Duration, option ...TPkgOption) (data []byte, err error) {
if err := c.SetRecvDeadline(time.Now().Add(timeout)); err != nil {
return nil, err
}
defer func() {
err = exception.Wrap(c.SetRecvDeadline(time.Time{}), "SetRecvDeadline error")
}()
data, err = c.RecvPkg(option...)
return
}