This repository has been archived by the owner on Aug 3, 2020. It is now read-only.
forked from ligato/vpp-agent
/
plugin_scheduler.go
492 lines (421 loc) · 15.3 KB
/
plugin_scheduler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
// Copyright (c) 2018 Cisco and/or its affiliates.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package kvscheduler
import (
"context"
"errors"
"os"
"runtime/trace"
"sync"
"time"
"github.com/gogo/protobuf/proto"
"github.com/ligato/cn-infra/idxmap"
"github.com/ligato/cn-infra/idxmap/mem"
"github.com/ligato/cn-infra/infra"
"github.com/ligato/cn-infra/rpc/rest"
kvs "github.com/ligato/vpp-agent/plugins/kvscheduler/api"
"github.com/ligato/vpp-agent/plugins/kvscheduler/internal/graph"
"github.com/ligato/vpp-agent/plugins/kvscheduler/internal/registry"
"github.com/ligato/vpp-agent/plugins/kvscheduler/internal/utils"
)
const (
// DependencyRelation identifies dependency relation for the graph.
DependencyRelation = "depends-on"
// DerivesRelation identifies relation of value derivation for the graph.
DerivesRelation = "derives"
// how often the transaction history gets trimmed to remove records too old to keep
txnHistoryTrimmingPeriod = 1 * time.Minute
// by default, a history of processed transaction is recorded
defaultRecordTransactionHistory = true
// by default, only transaction processed in the last 24 hours are kept recorded
// (with the exception of permanently recorded init period)
defaultTransactionHistoryAgeLimit = 24 * 60 // in minutes
// by default, transactions from the first hour of runtime stay permanently
// recorded
defaultPermanentlyRecordedInitPeriod = 60 // in minutes
// by default, all NB transactions and SB notifications are run without
// simulation (Retries are always first simulated)
defaultEnableTxnSimulation = false
// by default, a concise summary of every processed transactions is printed
// to stdout
defaultPrintTxnSummary = true
// name of the environment variable used to enable verification after every transaction
verifyModeEnv = "KVSCHED_VERIFY_MODE"
// name of the environment variable used to turn on automatic check for
// the preservation of the original network namespace after descriptor operations
checkNetNamespaceEnv = "KVSCHED_CHECK_NET_NS"
// name of the environment variable used to trigger log messages showing
// graph traversal
logGraphWalkEnv = "KVSCHED_LOG_GRAPH_WALK"
)
// Scheduler is a CN-infra plugin implementing KVScheduler.
// Detailed documentation can be found in the "api" and "docs" sub-folders.
type Scheduler struct {
Deps
// configuration
config *Config
// management of go routines
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
// in-memory representation of all created+pending kv-pairs and their dependencies
graph graph.Graph
// registry for descriptors
registry registry.Registry
// a list of key prefixed covered by registered descriptors
keyPrefixes []string
// TXN processing
txnLock sync.Mutex // can be used to pause transaction processing; always lock before the graph!
txnQueue chan *transaction
txnSeqNumber uint64
resyncCount uint
// value status
updatedStates utils.KeySet // base values with updated status
valStateWatchers []valStateWatcher
// TXN history
historyLock sync.Mutex
txnHistory []*kvs.RecordedTxn // ordered from the oldest to the latest
startTime time.Time
// debugging
verifyMode bool
logGraphWalk bool
}
// Deps lists dependencies of the scheduler.
type Deps struct {
infra.PluginDeps
HTTPHandlers rest.HTTPHandlers
}
// Config holds the KVScheduler configuration.
type Config struct {
RecordTransactionHistory bool `json:"record-transaction-history"`
TransactionHistoryAgeLimit uint32 `json:"transaction-history-age-limit"` // in minutes
PermanentlyRecordedInitPeriod uint32 `json:"permanently-recorded-init-period"` // in minutes
EnableTxnSimulation bool `json:"enable-txn-simulation"`
PrintTxnSummary bool `json:"print-txn-summary"`
}
// SchedulerTxn implements transaction for the KV scheduler.
type SchedulerTxn struct {
scheduler *Scheduler
values map[string]proto.Message
}
// valStateWatcher represents one subscription for value state updates.
type valStateWatcher struct {
channel chan<- *kvs.BaseValueStatus
selector kvs.KeySelector
}
// Init initializes the scheduler. Single go routine is started that will process
// all the transactions synchronously.
func (s *Scheduler) Init() error {
// default configuration
s.config = &Config{
RecordTransactionHistory: defaultRecordTransactionHistory,
TransactionHistoryAgeLimit: defaultTransactionHistoryAgeLimit,
PermanentlyRecordedInitPeriod: defaultPermanentlyRecordedInitPeriod,
EnableTxnSimulation: defaultEnableTxnSimulation,
PrintTxnSummary: defaultPrintTxnSummary,
}
// load configuration
err := s.loadConfig(s.config)
if err != nil {
s.Log.Error(err)
return err
}
s.Log.Debugf("KVScheduler configuration: %+v", *s.config)
// prepare context for all go routines
s.ctx, s.cancel = context.WithCancel(context.Background())
// initialize graph for in-memory storage of key-value pairs
graphOpts := graph.Opts{
RecordOldRevs: s.config.RecordTransactionHistory,
RecordAgeLimit: s.config.TransactionHistoryAgeLimit,
PermanentInitPeriod: s.config.PermanentlyRecordedInitPeriod,
MethodTracker: trackGraphMethod,
}
s.graph = graph.NewGraph(graphOpts)
// initialize registry for key->descriptor lookups
s.registry = registry.NewRegistry()
// prepare channel for serializing transactions
s.txnQueue = make(chan *transaction, 100)
// register REST API handlers
s.registerHandlers(s.HTTPHandlers)
// initialize key-set used to mark values with updated status
s.updatedStates = utils.NewSliceBasedKeySet()
// record startup time
s.startTime = time.Now()
// enable or disable debugging mode
s.verifyMode = os.Getenv(verifyModeEnv) != ""
s.logGraphWalk = os.Getenv(logGraphWalkEnv) != ""
// go routine processing serialized transactions
s.wg.Add(1)
go s.consumeTransactions()
// go routine periodically removing transaction records too old to keep
if s.config.RecordTransactionHistory {
s.wg.Add(1)
go s.transactionHistoryTrimming()
}
return nil
}
// loadConfig loads configuration file.
func (s *Scheduler) loadConfig(config *Config) error {
found, err := s.Cfg.LoadValue(config)
if err != nil {
return err
} else if !found {
s.Log.Debugf("%v config not found", s.PluginName)
return nil
}
s.Log.Debugf("%v config found: %+v", s.PluginName, config)
return err
}
// Close stops all the go routines.
func (s *Scheduler) Close() error {
s.cancel()
s.wg.Wait()
return nil
}
// RegisterKVDescriptor registers descriptor(s) for a set of selected
// keys. It should be called in the Init phase of agent plugins.
// Every key-value pair must have at most one descriptor associated with it
// (none for derived values expressing properties).
func (s *Scheduler) RegisterKVDescriptor(descriptors ...*kvs.KVDescriptor) error {
for _, d := range descriptors {
err := s.registerKVDescriptor(d)
if err != nil {
return err
}
}
return nil
}
func (s *Scheduler) registerKVDescriptor(descriptor *kvs.KVDescriptor) error {
// TODO: validate descriptor
if s.registry.GetDescriptor(descriptor.Name) != nil {
return kvs.ErrDescriptorExists
}
stats.addDescriptor(descriptor.Name)
s.registry.RegisterDescriptor(descriptor)
if descriptor.NBKeyPrefix != "" {
s.keyPrefixes = append(s.keyPrefixes, descriptor.NBKeyPrefix)
}
if descriptor.WithMetadata {
var metadataMap idxmap.NamedMappingRW
if descriptor.MetadataMapFactory != nil {
metadataMap = descriptor.MetadataMapFactory()
} else {
metadataMap = mem.NewNamedMapping(s.Log, descriptor.Name, nil)
}
graphW := s.graph.Write(true, false)
graphW.RegisterMetadataMap(descriptor.Name, metadataMap)
graphW.Release()
}
return nil
}
// GetRegisteredNBKeyPrefixes returns a list of key prefixes from NB with values
// described by registered descriptors and therefore managed by the scheduler.
func (s *Scheduler) GetRegisteredNBKeyPrefixes() []string {
return s.keyPrefixes
}
// StartNBTransaction starts a new transaction from NB to SB plane.
// The enqueued actions are scheduled for execution by Txn.Commit().
func (s *Scheduler) StartNBTransaction() kvs.Txn {
txn := &SchedulerTxn{
scheduler: s,
values: make(map[string]proto.Message),
}
return txn
}
// TransactionBarrier ensures that all notifications received prior to the call
// are associated with transactions that have already finalized.
func (s *Scheduler) TransactionBarrier() {
s.txnLock.Lock()
s.txnLock.Unlock()
}
// PushSBNotification notifies about a spontaneous value change(s) in the SB
// plane (i.e. not triggered by NB transaction).
func (s *Scheduler) PushSBNotification(notif ...kvs.KVWithMetadata) error {
txn := &transaction{
txnType: kvs.SBNotification,
}
for _, value := range notif {
txn.values = append(txn.values, kvForTxn{
key: value.Key,
value: value.Value,
metadata: value.Metadata,
origin: kvs.FromSB,
})
}
return s.enqueueTxn(txn)
}
// GetMetadataMap returns (read-only) map associating value label with value
// metadata of a given descriptor.
// Returns nil if the descriptor does not expose metadata.
func (s *Scheduler) GetMetadataMap(descriptor string) idxmap.NamedMapping {
graphR := s.graph.Read()
defer graphR.Release()
return graphR.GetMetadataMap(descriptor)
}
// GetValueStatus returns the status of a non-derived value with the given
// key.
func (s *Scheduler) GetValueStatus(key string) *kvs.BaseValueStatus {
graphR := s.graph.Read()
defer graphR.Release()
return getValueStatus(graphR.GetNode(key), key)
}
// WatchValueStatus allows to watch for changes in the status of non-derived
// values with keys selected by the selector (all if keySelector==nil).
func (s *Scheduler) WatchValueStatus(channel chan<- *kvs.BaseValueStatus, keySelector kvs.KeySelector) {
s.txnLock.Lock()
defer s.txnLock.Unlock()
s.valStateWatchers = append(s.valStateWatchers, valStateWatcher{
channel: channel,
selector: keySelector,
})
}
// DumpValuesByDescriptor dumps values associated with the given
// descriptor as viewed from either NB (what was requested to be applied),
// SB (what is actually applied) or from the inside (what kvscheduler's
// cached view of SB is).
func (s *Scheduler) DumpValuesByDescriptor(descriptor string, view kvs.View) (values []kvs.KVWithMetadata, err error) {
if view == kvs.SBView {
// pause transaction processing
s.txnLock.Lock()
defer s.txnLock.Unlock()
}
graphR := s.graph.Read()
defer graphR.Release()
if view == kvs.NBView {
// return the intended state
var kvPairs []kvs.KVWithMetadata
nbNodes := graphR.GetNodes(nil,
graph.WithFlags(&DescriptorFlag{descriptor}),
graph.WithoutFlags(&DerivedFlag{}, &ValueStateFlag{kvs.ValueState_OBTAINED}))
for _, node := range nbNodes {
lastUpdate := getNodeLastUpdate(node)
if lastUpdate == nil || lastUpdate.value == nil {
// filter found NB values and values requested to be deleted
continue
}
kvPairs = append(kvPairs, kvs.KVWithMetadata{
Key: node.GetKey(),
Value: lastUpdate.value,
Origin: kvs.FromNB,
Metadata: node.GetMetadata(),
})
}
return kvPairs, nil
}
/* Cached/SB: */
// retrieve from the in-memory graph first (for Retrieve it is used for correlation)
inMemNodes := nodesToKVPairsWithMetadata(
graphR.GetNodes(nil, descrValsSelectors(descriptor, true)...))
if view == kvs.CachedView {
// return the scheduler's view of SB for the given descriptor
return inMemNodes, nil
}
// obtain Retrieve handler from the descriptor
kvDescriptor := s.registry.GetDescriptor(descriptor)
if kvDescriptor == nil {
err = errors.New("descriptor is not registered")
return
}
if kvDescriptor.Retrieve == nil {
err = errors.New("descriptor does not support Retrieve operation")
return
}
// retrieve the state directly from SB via descriptor
values, err = kvDescriptor.Retrieve(inMemNodes)
return
}
func (s *Scheduler) getDescriptorForKeyPrefix(keyPrefix string) string {
var descriptorName string
s.txnLock.Lock()
for _, descriptor := range s.registry.GetAllDescriptors() {
if descriptor.NBKeyPrefix == keyPrefix {
descriptorName = descriptor.Name
}
}
s.txnLock.Unlock()
return descriptorName
}
// DumpValuesByKeyPrefix like DumpValuesByDescriptor returns a dump of values,
// but the descriptor is selected based on the key prefix.
func (s *Scheduler) DumpValuesByKeyPrefix(keyPrefix string, view kvs.View) (values []kvs.KVWithMetadata, err error) {
descriptorName := s.getDescriptorForKeyPrefix(keyPrefix)
if descriptorName == "" {
err = errors.New("unknown key prefix")
return
}
return s.DumpValuesByDescriptor(descriptorName, view)
}
// SetValue changes (non-derived) value.
// If <value> is nil, the value will get deleted.
func (txn *SchedulerTxn) SetValue(key string, value proto.Message) kvs.Txn {
txn.values[key] = value
return txn
}
// Commit orders scheduler to execute enqueued operations.
// Operations with unmet dependencies will get postponed and possibly
// executed later.
func (txn *SchedulerTxn) Commit(ctx context.Context) (txnSeqNum uint64, err error) {
ctx, task := trace.NewTask(ctx, "scheduler.Commit")
defer task.End()
txnSeqNum = ^uint64(0)
txnData := &transaction{
ctx: ctx,
txnType: kvs.NBTransaction,
nb: &nbTxn{},
values: make([]kvForTxn, 0, len(txn.values)),
}
// collect values
for key, value := range txn.values {
txnData.values = append(txnData.values, kvForTxn{
key: key,
value: value,
origin: kvs.FromNB,
})
}
// parse transaction options
txnData.nb.isBlocking = !kvs.IsNonBlockingTxn(ctx)
txnData.nb.resyncType, txnData.nb.verboseRefresh = kvs.IsResync(ctx)
txnData.nb.retryArgs, txnData.nb.retryEnabled = kvs.IsWithRetry(ctx)
txnData.nb.revertOnFailure = kvs.IsWithRevert(ctx)
txnData.nb.description, _ = kvs.IsWithDescription(ctx)
txnData.nb.withSimulation = txn.scheduler.config.EnableTxnSimulation || kvs.IsWithSimulation(ctx)
// validate transaction options
if txnData.nb.resyncType == kvs.DownstreamResync && len(txnData.values) > 0 {
return txnSeqNum, kvs.NewTransactionError(kvs.ErrCombinedDownstreamResyncWithChange, nil)
}
if txnData.nb.revertOnFailure && txnData.nb.resyncType != kvs.NotResync {
return txnSeqNum, kvs.NewTransactionError(kvs.ErrRevertNotSupportedWithResync, nil)
}
// enqueue txn and for blocking Commit wait for the errors
if txnData.nb.isBlocking {
txnData.nb.resultChan = make(chan txnResult, 1)
}
err = txn.scheduler.enqueueTxn(txnData)
if err != nil {
return txnSeqNum, kvs.NewTransactionError(err, nil)
}
if txnData.nb.isBlocking {
select {
case <-txn.scheduler.ctx.Done():
return txnSeqNum, kvs.NewTransactionError(kvs.ErrClosedScheduler, nil)
case <-ctx.Done():
return txnSeqNum, kvs.NewTransactionError(kvs.ErrTxnWaitCanceled, nil)
case txnResult := <-txnData.nb.resultChan:
close(txnData.nb.resultChan)
trace.Logf(ctx, "txnSeqNum", "%d", txnResult.txnSeqNum)
return txnResult.txnSeqNum, txnResult.err
}
}
return txnSeqNum, nil
}