/
ws.go
167 lines (138 loc) · 3.89 KB
/
ws.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
package api
import (
"encoding/json"
"net/http"
"strconv"
"time"
"github.com/bcext/cashutil"
"github.com/copernet/whccommon/log"
"github.com/copernet/whcwallet/config"
"github.com/copernet/whcwallet/logic/ws"
"github.com/copernet/whcwallet/util"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
"github.com/satori/go.uuid"
"github.com/sirupsen/logrus"
)
var connMgr = ws.NewConnMgr()
var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
}
type action string
const (
add action = "add"
remove action = "remove"
)
// type only select from add/remove
type OperateAddrsMsg struct {
Type action `json:"type"`
Addresses []string `json:"addresses"`
}
func NotifyBalanceUpdated(ctx *gin.Context) {
conn, err := upgrader.Upgrade(ctx.Writer, ctx.Request, nil)
if err != nil {
log.WithCtx(ctx).Errorf("upgrade to websocket failed: %v", err)
return
}
// generate uuid fro websocket connection
uniqueID, _ := uuid.NewV4()
uid := uniqueID.String()
ctx.Set(log.DefaultTraceLabel, uid)
log.WithCtx(ctx).Debug("websocket connect incoming")
conn.SetCloseHandler(func(code int, text string) error {
message := websocket.FormatCloseMessage(code, "")
conn.WriteControl(websocket.CloseMessage, message,
time.Now().Add(time.Second))
conn.Close()
return nil
})
c := ws.NewConnection(connMgr, conn, uid)
go func() {
defer c.Close()
for {
select {
case <-c.Quit:
logrus.WithFields(logrus.Fields{
log.DefaultTraceLabel: c.Uid,
}).Info("stop reading message, the websocket connection closed")
return
default:
}
_, mes, err := conn.ReadMessage()
if err != nil {
log.WithCtx(ctx).Warnf("read from client message failed: %v", err)
return
}
log.WithCtx(ctx).Debugf("receive message from websocket: %s", string(mes))
var msg OperateAddrsMsg
err = json.Unmarshal(mes, &msg)
if err != nil {
log.WithCtx(ctx).Errorf("read from client message json unmarshal error: %v", err)
return
}
// limit address encode format
addrs, err := util.ConvToCashAddr(msg.Addresses, config.GetChainParam())
if err != nil {
log.WithCtx(ctx).Errorf("address list encode error: %v", err)
return
}
// limit request added address list length
if len(addrs) > maxRequestAddressList || len(addrs) == 0 {
log.WithCtx(ctx).Warnf("address list length illegal, total: %d", len(addrs))
return
}
// add addresses
if msg.Type == add {
// Must add websocket connection to cache firstly.
c.AddAddrs(addrs)
OnAddressAdd(addrs)
log.WithCtx(ctx).Debug("add address via websocket: ", msg.Addresses)
}
// remove addresses
if msg.Type == remove {
c.DelAddr(addrs)
log.WithCtx(ctx).Debug("delete addresses via websocket: ", msg.Addresses)
}
// ignore if the Type is other string
}
}()
// ensure the websocket connection is available.
go func() {
timer := time.NewTicker(10 * time.Second)
defer timer.Stop()
defer c.Close()
n := 1
for {
select {
case <-c.Quit:
logrus.WithFields(logrus.Fields{
log.DefaultTraceLabel: c.Uid,
}).Info("stop ping, the websocket connection closed")
return
case <-timer.C:
err := conn.WriteMessage(websocket.TextMessage, []byte(`{"ping":`+strconv.Itoa(n)+`}`))
if err != nil {
log.WithCtx(ctx).Warnf("ping to client via websocket failed: %v", err)
return
}
n++
}
}
}()
}
func OnAddressAdd(addrs []string) {
// start track the bch balance for these addresses
for _, addr := range addrs {
// ignore the unexpected error, because upstream logic ensure that
// the addresses is encoded correctly.
address, _ := cashutil.DecodeAddress(addr, config.GetChainParam())
// the electrum only recognises base32 format addresses.
// SyncBalanceForBCH handles the situation the subscribe balance update fail.
SyncBalanceForBCH(address)
}
}
func init() {
go connMgr.Notify()
}