/
ws_client.go
370 lines (303 loc) · 8.59 KB
/
ws_client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
/*
* Copyright 2018 The openwallet Authors
* This file is part of the openwallet library.
*
* The openwallet library is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* The openwallet library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*/
package owtp
import (
"encoding/json"
"errors"
"fmt"
"github.com/blocktree/openwallet/log"
"github.com/blocktree/go-owcrypt"
"github.com/gorilla/websocket"
"github.com/mr-tron/base58/base58"
"github.com/tidwall/gjson"
"net"
"net/http"
"sync"
"time"
)
//局部常量
const (
WriteWait = 60 * time.Second
PongWait = 30 * time.Second
PingPeriod = (PongWait * 9) / 10
MaxMessageSize = 1 * 1024
)
//WSClient 基于websocket的通信客户端
type WSClient struct {
_auth Authorization
ws *websocket.Conn
handler PeerHandler
_send chan []byte
isHost bool
ReadBufferSize int
WriteBufferSize int
pid string
isConnect bool
mu sync.RWMutex //读写锁
closeOnce sync.Once
done func()
config ConnectConfig //节点配置
}
// Dial connects a client to the given URL.
func Dial(
pid, url string,
handler PeerHandler,
header map[string]string,
ReadBufferSize, WriteBufferSize int) (*WSClient, error) {
var (
httpHeader http.Header
)
if handler == nil {
return nil, errors.New("handler should not be nil! ")
}
log.Debug("Connecting URL:", url)
dialer := websocket.Dialer{
Proxy: http.ProxyFromEnvironment,
HandshakeTimeout: 60 * time.Second,
ReadBufferSize: ReadBufferSize,
WriteBufferSize: WriteBufferSize,
}
if header != nil {
httpHeader = make(http.Header)
for key, value := range header {
httpHeader.Add(key, value)
}
}
ws, _, err := dialer.Dial(url, httpHeader)
if err != nil {
return nil, err
}
client, err := NewWSClient(pid, ws, handler, nil, nil)
if err != nil {
return nil, err
}
client.isConnect = true
client.isHost = true //我方主动连接
client.handler.OnPeerOpen(client)
return client, nil
}
func NewWSClientWithHeader(header http.Header, cert Certificate, conn *websocket.Conn, handler PeerHandler, enableSignature bool, done func()) (*WSClient, error) {
/*
| 参数名称 | 类型 | 是否可空 | 描述 |
|----------|--------|----------------|---------------------------------------------------------------------------------|
| a | string | 否 | 节点公钥,base58,http带入将开启签名 |
| n | uint32 | (websocket必填) | 请求序号。为了保证请求对应响应按序执行,并防御重放攻击,序号可以为随机数,但不可重复。 |
| t | uint32 | (websocket必填) | 时间戳。限制请求在特定时间范围内有效,如10分钟。 |
| s | string | (websocket必填) | 组合[a+n+t]并sha256两次,使用钱包工具配置的本地私钥签名,最后base58编码 |
*/
var (
//enableSig bool
//isConsult bool
tmpPublicKey []byte
remotePublicKey []byte
err error
//nodeID string
)
//log.Debug("http header:", header)
a := header.Get("a")
if len(a) == 0 {
//HTTP的节点ID都采用随机生成,因为是短连接
_, tmpPublicKey = owcrypt.KeyAgreement_initiator_step1(owcrypt.ECC_CURVE_SM2_STANDARD)
//没有授权公钥,不授权的HTTP访问,不建立协商密码,不进行签名授权
remotePublicKey = tmpPublicKey
} else {
//有授权公钥,必须授权的HTTP访问,不建立协商密码,进行签名授权
remotePublicKey, err = base58.Decode(a)
if err != nil {
return nil, err
}
}
//开启签名授权,验证header的签名是否合法
//if enableSignature {
// //校验header的签名
// if !VerifyHeaderSignature(header, remotePublicKey) {
// return nil, fmt.Errorf("the signature in http header is not invalid")
// }
//}
auth := &OWTPAuth{
remotePublicKey: remotePublicKey,
enable: enableSignature,
localPublicKey: cert.PublicKeyBytes(),
localPrivateKey: cert.PrivateKeyBytes(),
}
client, err := NewWSClient(auth.RemotePID(), conn, handler, auth, done)
return client, nil
}
func NewWSClient(pid string, conn *websocket.Conn, handler PeerHandler, auth Authorization, done func()) (*WSClient, error) {
if handler == nil {
return nil, errors.New("handler should not be nil! ")
}
client := &WSClient{
pid: pid,
ws: conn,
_send: make(chan []byte, MaxMessageSize),
_auth: auth,
done: done,
config: ConnectConfig{
ConnectType: Websocket,
Address: conn.RemoteAddr().String(),
},
}
client.isConnect = true
client.setHandler(handler)
return client, nil
}
func (c *WSClient) PID() string {
return c.pid
}
func (c *WSClient) EnableKeyAgreement() bool {
return c._auth.EnableKeyAgreement()
}
func (c *WSClient) auth() Authorization {
return c._auth
}
func (c *WSClient) setHandler(handler PeerHandler) error {
c.handler = handler
return nil
}
func (c *WSClient) IsHost() bool {
return c.isHost
}
func (c *WSClient) IsConnected() bool {
return c.isConnect
}
func (c *WSClient) ConnectConfig() ConnectConfig {
return c.config
}
//Close 关闭连接
func (c *WSClient) close() error {
var err error
//保证节点只关闭一次
c.closeOnce.Do(func() {
if !c.isConnect {
//log.Debug("end close")
return
}
//调用关闭函数通知上级
if c.done != nil {
c.done()
// Be nice to GC
c.done = nil
}
err = c.ws.Close()
c.isConnect = false
c.handler.OnPeerClose(c, "client close")
})
return err
}
//LocalAddr 本地节点地址
func (c *WSClient) LocalAddr() net.Addr {
if c.ws == nil {
return nil
}
return c.ws.LocalAddr()
}
//RemoteAddr 远程节点地址
func (c *WSClient) RemoteAddr() net.Addr {
if c.ws == nil {
return nil
}
return c.ws.RemoteAddr()
}
//Send 发送消息
func (c *WSClient) send(data DataPacket) error {
//log.Emergency("Send DataPacket:", data)
respBytes, err := json.Marshal(data)
if err != nil {
return err
}
//if c.auth != nil && c.auth.EnableAuth() {
// respBytes, err = c.auth.EncryptData(respBytes)
// if err != nil {
// return errors.New("OWTP: EncryptData failed")
// }
//}
//log.Printf("Send: %s\n", string(respBytes))
c._send <- respBytes
return nil
}
//OpenPipe 打开通道
func (c *WSClient) openPipe() error {
if !c.IsConnected() {
return fmt.Errorf("client is not connect")
}
//发送通道
go c.writePump()
//监听消息
go c.readPump()
return nil
}
// WritePump 发送消息通道
func (c *WSClient) writePump() {
ticker := time.NewTicker(PingPeriod) //发送心跳间隔事件要<等待时间
defer func() {
ticker.Stop()
c.close()
//log.Debug("writePump end")
}()
for {
select {
case message, ok := <-c._send:
//发送消息
if !ok {
c.write(websocket.CloseMessage, []byte{})
return
}
if Debug {
log.Debug("Send: ", string(message))
}
if err := c.write(websocket.TextMessage, message); err != nil {
return
}
case <-ticker.C:
//定时器的回调,发送心跳检查,
err := c.write(websocket.PingMessage, []byte{})
if err != nil {
return //客户端不响应心跳就停止
}
}
}
}
// write 输出数据
func (c *WSClient) write(mt int, message []byte) error {
c.ws.SetWriteDeadline(time.Now().Add(WriteWait)) //设置发送的超时时间点
return c.ws.WriteMessage(mt, message)
}
// ReadPump 监听消息
func (c *WSClient) readPump() {
c.ws.SetReadDeadline(time.Now().Add(PongWait)) //设置客户端心跳响应的最后限期
c.ws.SetPongHandler(func(string) error {
c.ws.SetReadDeadline(time.Now().Add(PongWait)) //设置下一次心跳响应的最后限期
return nil
})
defer func() {
c.close()
//log.Debug("readPump end")
}()
for {
_, message, err := c.ws.ReadMessage()
if err != nil {
log.Error("peer:", c.PID(), "Read unexpected error: ", err)
//close(c.send) //读取通道异常,关闭读通道
return
}
if Debug {
log.Debug("Read: ", string(message))
}
packet := NewDataPacket(gjson.ParseBytes(message))
//开一个goroutine处理消息
go c.handler.OnPeerNewDataPacketReceived(c, packet)
}
}