/
websockets.go
224 lines (211 loc) · 5.4 KB
/
websockets.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
// Copyright © 2022 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package spievents
import (
"context"
"encoding/json"
"io"
"sync"
"time"
"github.com/gorilla/websocket"
"github.com/hyperledger/firefly-common/pkg/fftypes"
"github.com/hyperledger/firefly-common/pkg/log"
"github.com/hyperledger/firefly/pkg/core"
)
type webSocket struct {
ctx context.Context
manager *adminEventManager
wsConn *websocket.Conn
cancelCtx func()
connID string
senderDone chan struct{}
receiverDone chan struct{}
events chan *core.ChangeEvent
collections []string
filter core.ChangeEventFilter
mux sync.Mutex
closed bool
blocked *core.ChangeEvent
lastWarnTime *fftypes.FFTime
}
func newWebSocket(ae *adminEventManager, wsConn *websocket.Conn) *webSocket {
connID := fftypes.NewUUID().String()
ctx := log.WithLogField(ae.ctx, "admin.ws", connID)
ctx, cancelCtx := context.WithCancel(ctx)
wc := &webSocket{
ctx: ctx,
manager: ae,
wsConn: wsConn,
cancelCtx: cancelCtx,
connID: connID,
events: make(chan *core.ChangeEvent, ae.queueLength),
senderDone: make(chan struct{}),
receiverDone: make(chan struct{}),
}
go wc.sendLoop()
go wc.receiveLoop()
return wc
}
func (wc *webSocket) eventMatches(changeEvent *core.ChangeEvent) bool {
collectionMatches := false
for _, c := range wc.collections {
if c == changeEvent.Collection {
collectionMatches = true
break
}
}
if !collectionMatches {
return false
}
if len(wc.filter.Namespaces) > 0 {
namespaceMatches := false
for _, ns := range wc.filter.Namespaces {
if ns == changeEvent.Namespace {
namespaceMatches = true
break
}
}
if !namespaceMatches {
return false
}
}
if len(wc.filter.Types) > 0 {
typeMatches := false
for _, t := range wc.filter.Types {
if t == changeEvent.Type {
typeMatches = true
break
}
}
if !typeMatches {
return false
}
}
return true
}
func (wc *webSocket) writeObject(obj interface{}) {
writer, err := wc.wsConn.NextWriter(websocket.TextMessage)
if err == nil {
err = json.NewEncoder(writer).Encode(obj)
_ = writer.Close()
}
if err != nil {
// Log and continue - the receiver closing will be what ends our loop
log.L(wc.ctx).Errorf("Write failed on socket: %s", err)
}
}
func (wc *webSocket) sendLoop() {
l := log.L(wc.ctx)
defer close(wc.senderDone)
defer wc.close()
for {
select {
case changeEvent := <-wc.events:
wc.mux.Lock()
blocked := wc.blocked
wc.blocked = nil
wc.mux.Unlock()
if blocked != nil {
l.Debugf("Notifying client it missed %d events since %s", blocked.DroppedCount, blocked.DroppedSince)
wc.writeObject(blocked)
}
if !wc.eventMatches(changeEvent) {
continue
}
l.Tracef("Sending: %+v", changeEvent)
wc.writeObject(changeEvent)
case <-wc.receiverDone:
l.Debugf("Sender closing - receiver completed")
return
case <-wc.ctx.Done():
l.Debugf("Sender closing - context cancelled")
return
}
}
}
func (wc *webSocket) receiveLoop() {
l := log.L(wc.ctx)
defer close(wc.receiverDone)
for {
var msgData []byte
var cmd core.WSChangeEventCommand
_, reader, err := wc.wsConn.NextReader()
if err == nil {
msgData, err = io.ReadAll(reader)
if err == nil {
err = json.Unmarshal(msgData, &cmd)
}
}
if err != nil {
l.Errorf("Read failed: %s", err)
return
}
l.Tracef("Received: %s", string(msgData))
switch cmd.Type {
case core.WSChangeEventCommandTypeStart:
wc.handleStart(&cmd)
default:
l.Errorf("Invalid request sent on socket: %+v", cmd)
}
}
}
func (wc *webSocket) dispatch(event *core.ChangeEvent) {
// We take as much as we possibly can off of this function, including string matching etc.
// This function is called on the critical path of the commit for all database operations.
select {
case wc.events <- event:
default:
wc.mux.Lock()
var blocked *core.ChangeEvent
if wc.blocked == nil {
wc.blocked = &core.ChangeEvent{
Type: core.ChangeEventTypeDropped,
DroppedSince: fftypes.Now(),
}
}
blocked = wc.blocked
blocked.DroppedCount++
wc.mux.Unlock()
if wc.lastWarnTime == nil || time.Since(*wc.lastWarnTime.Time()) > wc.manager.blockedWarnInterval {
log.L(wc.ctx).Warnf("Change event listener is blocked an missing %d events (since %s)", blocked.DroppedCount, blocked.DroppedSince)
}
}
}
func (wc *webSocket) handleStart(start *core.WSChangeEventCommand) {
wc.mux.Lock()
wc.collections = start.Collections
wc.filter = start.Filter
wc.mux.Unlock()
}
func (wc *webSocket) close() {
var didClosed bool
wc.mux.Lock()
if !wc.closed {
didClosed = true
wc.closed = true
_ = wc.wsConn.Close()
wc.cancelCtx()
}
wc.mux.Unlock()
// Drop lock before callback
if didClosed {
wc.manager.wsClosed(wc.connID)
}
}
func (wc *webSocket) waitClose() {
<-wc.senderDone
<-wc.receiverDone
}