/
memberlist_manager.go
151 lines (133 loc) · 4.32 KB
/
memberlist_manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
package memberlist_manager
import (
"context"
"errors"
"time"
"github.com/chroma-core/chroma/go/pkg/common"
"github.com/pingcap/log"
"go.uber.org/zap"
"k8s.io/client-go/util/workqueue"
)
// A memberlist manager is responsible for managing the memberlist for a
// coordinator. A memberlist consists of a store and a watcher. The store
// is responsible for storing the memberlist in a persistent store, and the
// watcher is responsible for watching the nodes in the cluster and updating
// the store accordingly. Concretely, the memberlist manager reconciles between these
// and the store is backed by a Kubernetes custom resource, and the watcher is a
// kubernetes watch on pods with a given label.
type IMemberlistManager interface {
common.Component
}
type MemberlistManager struct {
workqueue workqueue.RateLimitingInterface // workqueue for the coordinator
nodeWatcher IWatcher // node watcher for the coordinator
memberlistStore IMemberlistStore // memberlist store for the coordinator
reconcileInterval time.Duration // interval for reconciliation
reconcileCount uint // number of updates to reconcile at once
}
func NewMemberlistManager(nodeWatcher IWatcher, memberlistStore IMemberlistStore) *MemberlistManager {
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
return &MemberlistManager{
workqueue: queue,
nodeWatcher: nodeWatcher,
memberlistStore: memberlistStore,
}
}
func (m *MemberlistManager) Start() error {
log.Info("Starting memberlist manager")
m.nodeWatcher.RegisterCallback(func(nodeIp string) {
m.workqueue.Add(nodeIp)
})
err := m.nodeWatcher.Start()
if err != nil {
return err
}
go m.run()
return nil
}
func (m *MemberlistManager) run() {
count := uint(0)
lastUpdate := time.Now()
updates := map[string]bool{}
for {
interface_key, shutdown := m.workqueue.Get()
if shutdown {
log.Info("Shutting down memberlist manager")
break
}
key, ok := interface_key.(string)
if !ok {
log.Error("Error while asserting workqueue key to string")
m.workqueue.Done(key)
continue
}
count++
updates[key] = true
if count >= m.reconcileCount || time.Since(lastUpdate) > m.reconcileInterval {
memberlist, resourceVersion, err := m.getOldMemberlist()
if err != nil {
log.Error("Error while getting memberlist", zap.Error(err))
continue
}
log.Info("Old Memberlist", zap.Any("memberlist", memberlist))
newMemberlist, err := m.nodeWatcher.ListReadyMembers()
if err != nil {
log.Error("Error while getting ready members", zap.Error(err))
continue
}
// do not update memberlist if there's no change
if !memberlistSame(memberlist, newMemberlist) {
err = m.updateMemberlist(newMemberlist, *resourceVersion)
if err != nil {
log.Error("Error while updating memberlist", zap.Error(err))
continue
}
}
for key := range updates {
m.workqueue.Done(key)
}
count = uint(0)
lastUpdate = time.Now()
updates = map[string]bool{}
}
}
}
func memberlistSame(oldMemberlist Memberlist, newMemberlist Memberlist) bool {
if len(oldMemberlist) != len(newMemberlist) {
return false
}
// use a map to check if the new memberlist contains all the old members
newMemberlistMap := make(map[string]bool)
for _, member := range newMemberlist {
newMemberlistMap[member] = true
}
for _, member := range oldMemberlist {
if _, ok := newMemberlistMap[member]; !ok {
return false
}
}
return true
}
func (m *MemberlistManager) getOldMemberlist() (Memberlist, *string, error) {
memberlist, resourceVersion, err := m.memberlistStore.GetMemberlist(context.Background())
if err != nil {
return nil, nil, err
}
if memberlist == nil {
return nil, nil, errors.New("Memberlist recieved is nil")
}
return *memberlist, &resourceVersion, nil
}
func (m *MemberlistManager) updateMemberlist(memberlist Memberlist, resourceVersion string) error {
return m.memberlistStore.UpdateMemberlist(context.Background(), &memberlist, resourceVersion)
}
func (m *MemberlistManager) SetReconcileInterval(interval time.Duration) {
m.reconcileInterval = interval
}
func (m *MemberlistManager) SetReconcileCount(count uint) {
m.reconcileCount = count
}
func (m *MemberlistManager) Stop() error {
m.workqueue.ShutDown()
return nil
}