forked from hashicorp/consul
-
Notifications
You must be signed in to change notification settings - Fork 0
/
store.go
309 lines (249 loc) · 9.2 KB
/
store.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package token
import (
"sync"
"crypto/subtle"
)
type TokenSource bool
const (
TokenSourceConfig TokenSource = false
TokenSourceAPI TokenSource = true
)
type TokenKind int
const (
TokenKindAgent TokenKind = iota
TokenKindAgentRecovery
TokenKindUser
TokenKindReplication
TokenKindConfigFileRegistration
)
type watcher struct {
kind TokenKind
ch chan<- struct{}
}
// Notifier holds the channel used to notify a watcher
// of token updates as well as some internal tracking
// information to allow for deregistering the notifier.
type Notifier struct {
id int
Ch <-chan struct{}
}
// Store is used to hold the special ACL tokens used by Consul agents. It is
// designed to update the tokens on the fly, so the token store itself should be
// plumbed around and used to get tokens at runtime, don't save the resulting
// tokens.
type Store struct {
// l synchronizes access to the token store.
l sync.RWMutex
// userToken is passed along for requests when the user didn't supply a
// token, and may be left blank to use the anonymous token. This will
// also be used for agent operations if the agent token isn't set.
userToken string
// userTokenSource indicates where this token originated from
userTokenSource TokenSource
// agentToken is used for internal agent operations like self-registering
// with the catalog and anti-entropy, but should never be used for
// user-initiated operations.
agentToken string
// agentTokenSource indicates where this token originated from
agentTokenSource TokenSource
// agentRecoveryToken is a special token that's only used locally for
// access to the /v1/agent utility operations if the servers aren't
// available.
agentRecoveryToken string
// agentRecoveryTokenSource indicates where this token originated from
agentRecoveryTokenSource TokenSource
// replicationToken is a special token that's used by servers to
// replicate data from the primary datacenter.
replicationToken string
// replicationTokenSource indicates where this token originated from
replicationTokenSource TokenSource
// configFileRegistrationToken is used to register services and checks
// that are defined in configuration files.
configFileRegistrationToken string
// configFileRegistrationTokenSource indicates where this token originated from
configFileRegistrationTokenSource TokenSource
watchers map[int]watcher
watcherIndex int
persistence *fileStore
// persistenceLock is used to synchronize access to the persisted token store
// within the data directory. This will prevent loading while writing as well as
// multiple concurrent writes.
persistenceLock sync.RWMutex
// enterpriseTokens contains tokens only used in consul-enterprise
enterpriseTokens
}
// Notify will set up a watch for when tokens of the desired kind is changed
func (t *Store) Notify(kind TokenKind) Notifier {
// buffering ensures that notifications aren't missed if the watcher
// isn't already in a select and that our notifications don't
// block returning from the Update* methods.
ch := make(chan struct{}, 1)
w := watcher{
kind: kind,
ch: ch,
}
t.l.Lock()
defer t.l.Unlock()
if t.watchers == nil {
t.watchers = make(map[int]watcher)
}
// we specifically want to avoid the zero-value to prevent accidental stop-notification requests
t.watcherIndex += 1
t.watchers[t.watcherIndex] = w
return Notifier{id: t.watcherIndex, Ch: ch}
}
// StopNotify stops the token store from sending notifications to the specified notifiers chan
func (t *Store) StopNotify(n Notifier) {
t.l.Lock()
defer t.l.Unlock()
delete(t.watchers, n.id)
}
// anyKindAllowed returns true if any of the kinds in the `check` list are
// set to be allowed in the `allowed` map.
//
// Note: this is mostly just a convenience to simplify the code in
// sendNotificationLocked and prevent more nested looping with breaks/continues
// and other state tracking.
func anyKindAllowed(allowed TokenKind, check []TokenKind) bool {
for _, kind := range check {
if allowed == kind {
return true
}
}
return false
}
// sendNotificationLocked will iterate through all watchers and notify them that a
// token they are watching has been updated.
//
// NOTE: this function explicitly does not attempt to send the kind or new token value
// along through the channel. With that approach watchers could potentially miss updates
// if the buffered chan fills up. Instead with this approach we just notify that any
// token they care about has been udpated and its up to the caller to retrieve the
// new value (after receiving from the chan). With this approach its entirely possible
// for the watcher to be notified twice before actually retrieving the token after the first
// read from the chan. This is better behavior than missing events. It can cause some
// churn temporarily but in common cases its not expected that these tokens would be updated
// frequently enough to cause this to happen.
func (t *Store) sendNotificationLocked(kinds ...TokenKind) {
for _, watcher := range t.watchers {
if !anyKindAllowed(watcher.kind, kinds) {
// ignore this watcher as it doesn't want events for these kinds of token
continue
}
select {
case watcher.ch <- struct{}{}:
default:
// its already pending a notification
}
}
}
// UpdateUserToken replaces the current user token in the store.
// Returns true if it was changed.
func (t *Store) UpdateUserToken(token string, source TokenSource) bool {
return t.updateToken(token, source, &t.userToken, &t.userTokenSource, TokenKindUser)
}
// UpdateAgentToken replaces the current agent token in the store.
// Returns true if it was changed.
func (t *Store) UpdateAgentToken(token string, source TokenSource) bool {
return t.updateToken(token, source, &t.agentToken, &t.agentTokenSource, TokenKindAgent)
}
// UpdateAgentRecoveryToken replaces the current agent recovery token in the store.
// Returns true if it was changed.
func (t *Store) UpdateAgentRecoveryToken(token string, source TokenSource) bool {
return t.updateToken(token, source, &t.agentRecoveryToken,
&t.agentRecoveryTokenSource, TokenKindAgentRecovery)
}
// UpdateReplicationToken replaces the current replication token in the store.
// Returns true if it was changed.
func (t *Store) UpdateReplicationToken(token string, source TokenSource) bool {
return t.updateToken(token, source, &t.replicationToken,
&t.replicationTokenSource, TokenKindReplication)
}
// UpdateConfigFileRegistrationToken replaces the current config file registration token
// in the store. Returns true if it was changed.
func (t *Store) UpdateConfigFileRegistrationToken(token string, source TokenSource) bool {
return t.updateToken(token, source, &t.configFileRegistrationToken,
&t.configFileRegistrationTokenSource, TokenKindConfigFileRegistration)
}
func (t *Store) updateToken(token string, source TokenSource, dstToken *string, dstSource *TokenSource, kind TokenKind) bool {
t.l.Lock()
changed := *dstToken != token || *dstSource != source
*dstToken = token
*dstSource = source
if changed {
t.sendNotificationLocked(kind)
}
t.l.Unlock()
return changed
}
// UserToken returns the best token to use for user operations.
func (t *Store) UserToken() string {
t.l.RLock()
defer t.l.RUnlock()
return t.userToken
}
// AgentToken returns the best token to use for internal agent operations.
func (t *Store) AgentToken() string {
t.l.RLock()
defer t.l.RUnlock()
if tok := t.enterpriseAgentToken(); tok != "" {
return tok
}
if t.agentToken != "" {
return t.agentToken
}
return t.userToken
}
func (t *Store) AgentRecoveryToken() string {
t.l.RLock()
defer t.l.RUnlock()
return t.agentRecoveryToken
}
// ReplicationToken returns the replication token.
func (t *Store) ReplicationToken() string {
t.l.RLock()
defer t.l.RUnlock()
return t.replicationToken
}
func (t *Store) ConfigFileRegistrationToken() string {
t.l.RLock()
defer t.l.RUnlock()
return t.configFileRegistrationToken
}
// UserToken returns the best token to use for user operations.
func (t *Store) UserTokenAndSource() (string, TokenSource) {
t.l.RLock()
defer t.l.RUnlock()
return t.userToken, t.userTokenSource
}
// AgentToken returns the best token to use for internal agent operations.
func (t *Store) AgentTokenAndSource() (string, TokenSource) {
t.l.RLock()
defer t.l.RUnlock()
return t.agentToken, t.agentTokenSource
}
func (t *Store) AgentRecoveryTokenAndSource() (string, TokenSource) {
t.l.RLock()
defer t.l.RUnlock()
return t.agentRecoveryToken, t.agentRecoveryTokenSource
}
// ReplicationTokenAndSource returns the replication token and its source.
func (t *Store) ReplicationTokenAndSource() (string, TokenSource) {
t.l.RLock()
defer t.l.RUnlock()
return t.replicationToken, t.replicationTokenSource
}
func (t *Store) ConfigFileRegistrationTokenAndSource() (string, TokenSource) {
t.l.RLock()
defer t.l.RUnlock()
return t.configFileRegistrationToken, t.configFileRegistrationTokenSource
}
// IsAgentRecoveryToken checks to see if a given token is the agent recovery token.
// This will never match an empty token for safety.
func (t *Store) IsAgentRecoveryToken(token string) bool {
t.l.RLock()
defer t.l.RUnlock()
return (token != "") && (subtle.ConstantTimeCompare([]byte(token), []byte(t.agentRecoveryToken)) == 1)
}