-
Notifications
You must be signed in to change notification settings - Fork 2.8k
/
kvstore_watchdog.go
103 lines (93 loc) · 3.09 KB
/
kvstore_watchdog.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
// SPDX-License-Identifier: Apache-2.0
// Copyright Authors of Cilium
package main
import (
"context"
"strings"
"time"
"github.com/cilium/cilium/pkg/allocator"
"github.com/cilium/cilium/pkg/defaults"
"github.com/cilium/cilium/pkg/identity"
"github.com/cilium/cilium/pkg/identity/cache"
"github.com/cilium/cilium/pkg/idpool"
"github.com/cilium/cilium/pkg/inctimer"
"github.com/cilium/cilium/pkg/kvstore"
kvstoreallocator "github.com/cilium/cilium/pkg/kvstore/allocator"
"github.com/cilium/cilium/pkg/logging/logfields"
)
// keyPathFromLockPath returns the path of the given key that contains a lease
// prefixed to its path.
func keyPathFromLockPath(k string) string {
// vendor/go.etcd.io/etcd/clientv3/concurrency/mutex.go:L46
i := strings.LastIndexByte(k, '/')
if i >= 0 {
return k[:i]
}
return k
}
// getOldestLeases returns the value that has the smaller revision for each
// 'path'. A 'path' shares the same common prefix for different locks.
func getOldestLeases(lockPaths map[string]kvstore.Value) map[string]kvstore.Value {
type LockValue struct {
kvstore.Value
keyPath string
}
oldestPaths := map[string]LockValue{}
for lockPath, v := range lockPaths {
keyPath := keyPathFromLockPath(lockPath)
oldestKeyPath, ok := oldestPaths[keyPath]
if !ok || v.ModRevision < oldestKeyPath.ModRevision {
// Store the oldest common path
oldestPaths[keyPath] = LockValue{
keyPath: lockPath,
Value: v,
}
}
}
oldestLeases := map[string]kvstore.Value{}
for _, v := range oldestPaths {
// Retrieve the oldest lock path
oldestLeases[v.keyPath] = v.Value
}
return oldestLeases
}
func startKvstoreWatchdog() {
log.WithField(logfields.Interval, defaults.LockLeaseTTL).Infof("Starting kvstore watchdog")
backend, err := kvstoreallocator.NewKVStoreBackend(cache.IdentitiesPath, "", nil, kvstore.Client())
if err != nil {
log.WithError(err).Fatal("Unable to initialize kvstore backend for identity garbage collection")
}
minID := idpool.ID(identity.MinimalAllocationIdentity)
maxID := idpool.ID(identity.MaximumAllocationIdentity)
a := allocator.NewAllocatorForGC(backend, allocator.WithMin(minID), allocator.WithMax(maxID))
keysToDelete := map[string]kvstore.Value{}
go func() {
lockTimer, lockTimerDone := inctimer.New()
defer lockTimerDone()
for {
keysToDelete = getOldestLeases(keysToDelete)
ctx, cancel := context.WithTimeout(context.Background(), defaults.LockLeaseTTL)
keysToDelete2, err := a.RunLocksGC(ctx, keysToDelete)
if err != nil {
log.WithError(err).Warning("Unable to run security identity garbage collector")
} else {
keysToDelete = keysToDelete2
}
cancel()
<-lockTimer.After(defaults.LockLeaseTTL)
}
}()
go func() {
hbTimer, hbTimerDone := inctimer.New()
defer hbTimerDone()
for {
ctx, cancel := context.WithTimeout(context.Background(), defaults.LockLeaseTTL)
err := kvstore.Client().Update(ctx, kvstore.HeartbeatPath, []byte(time.Now().Format(time.RFC3339)), true)
if err != nil {
log.WithError(err).Warning("Unable to update heartbeat key")
}
cancel()
<-hbTimer.After(kvstore.HeartbeatWriteInterval)
}
}()
}