/
subscription.go
178 lines (143 loc) · 4.35 KB
/
subscription.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
package stream
import (
"context"
"errors"
"sync/atomic"
"github.com/hashicorp/nomad/nomad/structs"
)
const (
// subscriptionStateOpen is the default state of a subscription. An open
// subscription may receive new events.
subscriptionStateOpen uint32 = 0
// subscriptionStateClosed indicates that the subscription was closed, possibly
// as a result of a change to an ACL token, and will not receive new events.
// The subscriber must issue a new Subscribe request.
subscriptionStateClosed uint32 = 1
)
// ErrSubscriptionClosed is a error signalling the subscription has been
// closed. The client should Unsubscribe, then re-Subscribe.
var ErrSubscriptionClosed = errors.New("subscription closed by server, client should resubscribe")
var ErrACLInvalid = errors.New("Provided ACL token is invalid for requested topics")
type Subscription struct {
// state must be accessed atomically 0 means open, 1 means closed with reload
state uint32
req *SubscribeRequest
// currentItem stores the current buffer item we are on. It
// is mutated by calls to Next.
currentItem *bufferItem
// forceClosed is closed when forceClose is called. It is used by
// EventBroker to cancel Next().
forceClosed chan struct{}
// unsub is a function set by EventBroker that is called to free resources
// when the subscription is no longer needed.
// It must be safe to call the function from multiple goroutines and the function
// must be idempotent.
unsub func()
}
type SubscribeRequest struct {
Token string
Index uint64
Namespace string
Topics map[structs.Topic][]string
// StartExactlyAtIndex specifies if a subscription needs to
// start exactly at the requested Index. If set to false,
// the closest index in the buffer will be returned if there is not
// an exact match
StartExactlyAtIndex bool
}
func newSubscription(req *SubscribeRequest, item *bufferItem, unsub func()) *Subscription {
return &Subscription{
forceClosed: make(chan struct{}),
req: req,
currentItem: item,
unsub: unsub,
}
}
func (s *Subscription) Next(ctx context.Context) (structs.Events, error) {
if atomic.LoadUint32(&s.state) == subscriptionStateClosed {
return structs.Events{}, ErrSubscriptionClosed
}
for {
next, err := s.currentItem.Next(ctx, s.forceClosed)
switch {
case err != nil && atomic.LoadUint32(&s.state) == subscriptionStateClosed:
return structs.Events{}, ErrSubscriptionClosed
case err != nil:
return structs.Events{}, err
}
s.currentItem = next
events := filter(s.req, next.Events.Events)
if len(events) == 0 {
continue
}
return structs.Events{Index: next.Events.Index, Events: events}, nil
}
}
func (s *Subscription) NextNoBlock() ([]structs.Event, error) {
if atomic.LoadUint32(&s.state) == subscriptionStateClosed {
return nil, ErrSubscriptionClosed
}
for {
next := s.currentItem.NextNoBlock()
if next == nil {
return nil, nil
}
s.currentItem = next
events := filter(s.req, next.Events.Events)
if len(events) == 0 {
continue
}
return events, nil
}
}
func (s *Subscription) Unsubscribe() {
s.unsub()
}
// filter events to only those that match a subscriptions topic/keys/namespace
func filter(req *SubscribeRequest, events []structs.Event) []structs.Event {
if len(events) == 0 {
return nil
}
allTopicKeys := req.Topics[structs.TopicAll]
// Return all events if subscribed to all namespaces and all topics
if req.Namespace == "*" && len(allTopicKeys) == 1 && allTopicKeys[0] == string(structs.TopicAll) {
return events
}
var result []structs.Event
for _, event := range events {
if req.Namespace != "*" && event.Namespace != "" && event.Namespace != req.Namespace {
continue
}
// *[*] always matches
if len(allTopicKeys) == 1 && allTopicKeys[0] == string(structs.TopicAll) {
result = append(result, event)
continue
}
keys := allTopicKeys
if topicKeys, ok := req.Topics[event.Topic]; ok {
keys = append(keys, topicKeys...)
}
if len(keys) == 1 && keys[0] == string(structs.TopicAll) {
result = append(result, event)
continue
}
for _, key := range keys {
if eventMatchesKey(event, key) {
result = append(result, event)
continue
}
}
}
return result
}
func eventMatchesKey(event structs.Event, key string) bool {
if event.Key == key {
return true
}
for _, fk := range event.FilterKeys {
if fk == key {
return true
}
}
return false
}