/
subscriber.go
224 lines (182 loc) 路 5.21 KB
/
subscriber.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
package hub
import (
"net/url"
"sync"
"github.com/gofrs/uuid"
log "github.com/sirupsen/logrus"
)
type updateSource struct {
in chan *Update
buffer []*Update
}
// Subscriber represents a client subscribed to a list of topics.
type Subscriber struct {
ID string
EscapedID string
Claims *claims
Topics []string
EscapedTopics []string
RequestLastEventID string
LogFields log.Fields
Debug bool
disconnectedOnce sync.Once
out chan *Update
disconnected chan struct{}
responseLastEventID chan string
history updateSource
live updateSource
topicSelectorStore *TopicSelectorStore
}
// NewSubscriber creates a new subscriber.
func NewSubscriber(lastEventID string, tss *TopicSelectorStore) *Subscriber {
id := "urn:uuid:" + uuid.Must(uuid.NewV4()).String()
s := &Subscriber{
ID: id,
EscapedID: url.QueryEscape(id),
RequestLastEventID: lastEventID,
LogFields: log.Fields{
"subscriber_id": id,
"last_event_id": lastEventID,
},
responseLastEventID: make(chan string, 1),
history: updateSource{},
live: updateSource{in: make(chan *Update)},
out: make(chan *Update),
disconnected: make(chan struct{}),
topicSelectorStore: tss,
}
if lastEventID != "" {
s.history.in = make(chan *Update)
}
return s
}
// start stores incoming updates in an history and a live buffer and dispatch them.
// Updates coming from the history are always dispatched first.
func (s *Subscriber) start() {
defer s.cleanup()
for {
select {
case u, ok := <-s.history.in:
if !ok {
s.history.in = nil
break
}
if s.CanDispatch(u) {
s.history.buffer = append(s.history.buffer, u)
}
case u, ok := <-s.live.in:
if !ok {
// chan drained
return
}
if s.CanDispatch(u) {
s.live.buffer = append(s.live.buffer, u)
}
case s.outChan() <- s.nextUpdate():
if len(s.history.buffer) > 0 {
s.history.buffer = s.history.buffer[1:]
break
}
s.live.buffer = s.live.buffer[1:]
}
}
}
func (s *Subscriber) cleanup() {
s.topicSelectorStore.cleanup(s.Topics)
if s.Claims != nil && s.Claims.Mercure.Subscribe != nil {
s.topicSelectorStore.cleanup(s.Claims.Mercure.Subscribe)
}
}
// outChan returns the out channel if buffers aren't empty, or nil to block.
func (s *Subscriber) outChan() chan<- *Update {
if len(s.live.buffer) > 0 || len(s.history.buffer) > 0 {
return s.out
}
return nil
}
// nextUpdate returns the next update to dispatch.
// The history is always entirely flushed before starting to dispatch live updates.
func (s *Subscriber) nextUpdate() *Update {
// Always flush the history buffer first to preserve order
if s.history.in != nil || len(s.history.buffer) > 0 {
if len(s.history.buffer) > 0 {
return s.history.buffer[0]
}
return nil
}
if len(s.live.buffer) > 0 {
return s.live.buffer[0]
}
return nil
}
// Dispatch an update to the subscriber.
func (s *Subscriber) Dispatch(u *Update, fromHistory bool) bool {
var in chan<- *Update
if fromHistory {
in = s.history.in
} else {
in = s.live.in
}
select {
case <-s.disconnected:
close(s.live.in)
return false
default:
}
select {
case <-s.disconnected:
close(s.live.in)
return false
case in <- u:
}
return true
}
// Receive returns a chan when incoming updates are dispatched.
func (s *Subscriber) Receive() <-chan *Update {
return s.out
}
// HistoryDispatched must be called when all messages coming from the history have been dispatched.
func (s *Subscriber) HistoryDispatched(responseLastEventID string) {
s.responseLastEventID <- responseLastEventID
close(s.history.in)
}
// Disconnect disconnects the subscriber.
func (s *Subscriber) Disconnect() {
s.disconnectedOnce.Do(func() {
close(s.disconnected)
})
}
// CanDispatch checks if an update can be dispatched to this subsriber.
func (s *Subscriber) CanDispatch(u *Update) bool {
if !canReceive(s.topicSelectorStore, u.Topics, s.Topics, true) {
log.WithFields(createFields(u, s)).Debug("Subscriber has not subscribed to this update")
return false
}
if u.Private && (s.Claims == nil || s.Claims.Mercure.Subscribe == nil || !canReceive(s.topicSelectorStore, u.Topics, s.Claims.Mercure.Subscribe, true)) {
log.WithFields(createFields(u, s)).Debug("Subscriber not authorized to receive this update")
return false
}
return true
}
// getSubscriptions return the list of subscriptions associated to this subscriber.
func (s *Subscriber) getSubscriptions(topic, context string, active bool) []subscription {
subscriptions := make([]subscription, 0, len(s.Topics))
for k, t := range s.Topics {
if topic != "" && !canReceive(s.topicSelectorStore, []string{t}, []string{topic}, false) {
continue
}
subscription := subscription{
Context: context,
ID: "/.well-known/mercure/subscriptions/" + s.EscapedTopics[k] + "/" + s.EscapedID,
Type: "Subscription",
Subscriber: s.ID,
Topic: t,
Active: active,
}
if s.Claims != nil && s.Claims.Mercure.Payload != nil {
subscription.Payload = s.Claims.Mercure.Payload
}
subscriptions = append(subscriptions, subscription)
}
return subscriptions
}