-
Notifications
You must be signed in to change notification settings - Fork 0
/
subscribe.go
222 lines (185 loc) · 5.52 KB
/
subscribe.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
package subscribe
import (
"errors"
"sync"
"sync/atomic"
"github.com/brronsuite/lnd/queue"
)
// ErrServerShuttingDown is an error returned in case the server is in the
// process of shutting down.
var ErrServerShuttingDown = errors.New("subscription server shutting down")
// Client is used to get notified about updates the caller has subscribed to,
type Client struct {
// cancel should be called in case the client no longer wants to
// subscribe for updates from the server.
cancel func()
updates *queue.ConcurrentQueue
quit chan struct{}
}
// Updates returns a read-only channel where the updates the client has
// subscribed to will be delivered.
func (c *Client) Updates() <-chan interface{} {
return c.updates.ChanOut()
}
// Quit is a channel that will be closed in case the server decides to no
// longer deliver updates to this client.
func (c *Client) Quit() <-chan struct{} {
return c.quit
}
// Cancel should be called in case the client no longer wants to
// subscribe for updates from the server.
func (c *Client) Cancel() {
c.cancel()
}
// Server is a struct that manages a set of subscriptions and their
// corresponding clients. Any update will be delivered to all active clients.
type Server struct {
clientCounter uint64 // To be used atomically.
started uint32 // To be used atomically.
stopped uint32 // To be used atomically.
clients map[uint64]*Client
clientUpdates chan *clientUpdate
updates chan interface{}
quit chan struct{}
wg sync.WaitGroup
}
// clientUpdate is an internal message sent to the subscriptionHandler to
// either register a new client for subscription or cancel an existing
// subscription.
type clientUpdate struct {
// cancel indicates if the update to the client is cancelling an
// existing client's subscription. If not then this update will be to
// subscribe a new client.
cancel bool
// clientID is the unique identifier for this client. Any further
// updates (deleting or adding) to this notification client will be
// dispatched according to the target clientID.
clientID uint64
// client is the new client that will receive updates. Will be nil in
// case this is a cancallation update.
client *Client
}
// NewServer returns a new Server.
func NewServer() *Server {
return &Server{
clients: make(map[uint64]*Client),
clientUpdates: make(chan *clientUpdate),
updates: make(chan interface{}),
quit: make(chan struct{}),
}
}
// Start starts the Server, making it ready to accept subscriptions and
// updates.
func (s *Server) Start() error {
if !atomic.CompareAndSwapUint32(&s.started, 0, 1) {
return nil
}
s.wg.Add(1)
go s.subscriptionHandler()
return nil
}
// Stop stops the server.
func (s *Server) Stop() error {
if !atomic.CompareAndSwapUint32(&s.stopped, 0, 1) {
return nil
}
close(s.quit)
s.wg.Wait()
return nil
}
// Subscribe returns a Client that will receive updates any time the Server is
// made aware of a new event.
func (s *Server) Subscribe() (*Client, error) {
// We'll first atomically obtain the next ID for this client from the
// incrementing client ID counter.
clientID := atomic.AddUint64(&s.clientCounter, 1)
// Create the client that will be returned. The Cancel method is
// populated to send the cancellation intent to the
// subscriptionHandler.
client := &Client{
updates: queue.NewConcurrentQueue(20),
quit: make(chan struct{}),
cancel: func() {
select {
case s.clientUpdates <- &clientUpdate{
cancel: true,
clientID: clientID,
}:
case <-s.quit:
return
}
},
}
select {
case s.clientUpdates <- &clientUpdate{
cancel: false,
clientID: clientID,
client: client,
}:
case <-s.quit:
return nil, ErrServerShuttingDown
}
return client, nil
}
// SendUpdate is called to send the passed update to all currently active
// subscription clients.
func (s *Server) SendUpdate(update interface{}) error {
select {
case s.updates <- update:
return nil
case <-s.quit:
return ErrServerShuttingDown
}
}
// subscriptionHandler is the main handler for the Server. It will handle
// incoming updates and subscriptions, and forward the incoming updates to the
// registered clients.
//
// NOTE: MUST be run as a goroutine.
func (s *Server) subscriptionHandler() {
defer s.wg.Done()
for {
select {
// If a client update is received, the either a new
// subscription becomes active, or we cancel and existing one.
case update := <-s.clientUpdates:
clientID := update.clientID
// In case this is a cancellation, stop the client's
// underlying queue, and remove the client from the set
// of active subscription clients.
if update.cancel {
client, ok := s.clients[update.clientID]
if ok {
client.updates.Stop()
close(client.quit)
delete(s.clients, clientID)
}
continue
}
// If this was not a cancellation, start the underlying
// queue and add the client to our set of subscription
// clients. It will be notified about any new updates
// the server receives.
update.client.updates.Start()
s.clients[update.clientID] = update.client
// A new update was received, forward it to all active clients.
case upd := <-s.updates:
for _, client := range s.clients {
select {
case client.updates.ChanIn() <- upd:
case <-client.quit:
case <-s.quit:
return
}
}
// In case the server is shutting down, stop the clients and
// close the quit channels to notify them.
case <-s.quit:
for _, client := range s.clients {
client.updates.Stop()
close(client.quit)
}
return
}
}
}