/
wsconn.go
94 lines (81 loc) · 2.12 KB
/
wsconn.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
package relay
import (
"bytes"
"encoding/json"
"strings"
"github.com/RabbyHub/derelay/log"
"github.com/RabbyHub/derelay/metrics"
"github.com/gorilla/websocket"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
type client struct {
conn *websocket.Conn
ws *WsServer
id string // randomly generate, just for logging
role RoleType // dapp or wallet
pubTopics *TopicSet
subTopics *TopicSet
sendbuf chan SocketMessage // send buffer
quit chan struct{}
}
func (c *client) MarshalLogObject(encoder zapcore.ObjectEncoder) error {
if c != nil {
encoder.AddString("id", c.id)
encoder.AddString("role", string(c.role))
encoder.AddArray("pubTopics", c.pubTopics)
encoder.AddArray("subTopics", c.subTopics)
}
return nil
}
func (c *client) read() {
for {
_, m, err := c.conn.ReadMessage()
if err != nil {
c.terminate(err)
return
}
message := SocketMessage{}
if err := json.NewDecoder(bytes.NewReader(m)).Decode(&message); err != nil {
log.Warn("[wsconn] received malformed text message", zap.Error(err), zap.String("raw", string(m)))
continue
}
// Record the client role, this is a customized feature off the offical v1 spec,
// Rabby dapp always sends `"role": "dapp"` in messages to relay server.
c.role = RoleType(strings.ToLower(message.Role))
message.client = c
c.ws.localCh <- message
}
}
func (c *client) write() {
for {
select {
case message := <-c.sendbuf:
m := new(bytes.Buffer)
if err := json.NewEncoder(m).Encode(message); err != nil {
log.Warn("sending malformed text message", zap.Error(err))
continue
}
err := c.conn.WriteMessage(websocket.TextMessage, m.Bytes())
if err != nil {
log.Error("client write error", err, zap.Any("client", c), zap.Any("message", message))
continue
}
case <-c.quit:
return
}
}
}
// send implements a non-blocking sending
func (c *client) send(message SocketMessage) {
select {
case c.sendbuf <- message:
default:
metrics.IncSendBlocking()
}
}
func (c *client) terminate(reason error) {
c.quit <- struct{}{}
c.conn.Close()
c.ws.unregister <- ClientUnregisterEvent{client: c, reason: reason}
}