/
clustermesh.go
291 lines (237 loc) · 8.61 KB
/
clustermesh.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
// SPDX-License-Identifier: Apache-2.0
// Copyright Authors of Cilium
package clustermesh
import (
"context"
"fmt"
"github.com/prometheus/client_golang/prometheus"
"github.com/cilium/cilium/api/v1/models"
"github.com/cilium/cilium/pkg/allocator"
"github.com/cilium/cilium/pkg/controller"
"github.com/cilium/cilium/pkg/ipcache"
"github.com/cilium/cilium/pkg/kvstore"
"github.com/cilium/cilium/pkg/kvstore/store"
"github.com/cilium/cilium/pkg/lock"
"github.com/cilium/cilium/pkg/metrics"
nodemanager "github.com/cilium/cilium/pkg/node/manager"
nodeStore "github.com/cilium/cilium/pkg/node/store"
"github.com/cilium/cilium/pkg/option"
)
const (
// configNotificationsChannelSize is the size of the channel used to
// notify a clustermesh of configuration changes
configNotificationsChannelSize = 512
subsystem = "clustermesh"
)
// Configuration is the configuration that must be provided to
// NewClusterMesh()
type Configuration struct {
// Name is the name of the local cluster. This is used for logging and metrics
Name string
// NodeName is the name of the local node. This is used for logging and metrics
NodeName string
// ConfigDirectory is the path to the directory that will be watched for etcd
// configuration files to appear
ConfigDirectory string
// NodeKeyCreator is the function used to create node instances as
// nodes are being discovered in remote clusters
NodeKeyCreator store.KeyCreator
// ServiceMerger is the interface responsible to merge service and
// endpoints into an existing cache
ServiceMerger ServiceMerger
// NodeManager is the node manager to manage all discovered remote
// nodes
NodeManager *nodemanager.Manager
nodeObserver store.Observer
// RemoteIdentityWatcher provides identities that have been allocated on a
// remote cluster.
RemoteIdentityWatcher RemoteIdentityWatcher
IPCache *ipcache.IPCache
}
// RemoteIdentityWatcher is any type which provides identities that have been
// allocated on a remote cluster.
type RemoteIdentityWatcher interface {
// WatchRemoteIdentities starts watching for identities in another kvstore and
// syncs all identities to the local identity cache.
WatchRemoteIdentities(backend kvstore.BackendOperations) (*allocator.RemoteCache, error)
// Close stops the watcher.
Close()
}
// NodeObserver returns the node store observer of the configuration
func (c *Configuration) NodeObserver() store.Observer {
if c.nodeObserver != nil {
return c.nodeObserver
}
return nodeStore.NewNodeObserver(c.NodeManager)
}
// ClusterMesh is a cache of multiple remote clusters
type ClusterMesh struct {
// conf is the configuration, it is immutable after NewClusterMesh()
conf Configuration
mutex lock.RWMutex
clusters map[string]*remoteCluster
controllers *controller.Manager
configWatcher *configDirectoryWatcher
ipcache *ipcache.IPCache
// globalServices is a list of all global services. The datastructure
// is protected by its own mutex inside the structure.
globalServices *globalServiceCache
// metricTotalRemoteClusters is gauge metric keeping track of total number
// of remote clusters.
metricTotalRemoteClusters *prometheus.GaugeVec
}
// NewClusterMesh creates a new remote cluster cache based on the
// provided configuration
func NewClusterMesh(c Configuration) (*ClusterMesh, error) {
cm := &ClusterMesh{
conf: c,
clusters: map[string]*remoteCluster{},
controllers: controller.NewManager(),
globalServices: newGlobalServiceCache(c.Name, c.NodeName),
metricTotalRemoteClusters: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: metrics.Namespace,
Subsystem: subsystem,
Name: "remote_clusters",
Help: "The total number of remote clusters meshed with the local cluster",
}, []string{metrics.LabelSourceCluster, metrics.LabelSourceNodeName}),
ipcache: c.IPCache,
}
w, err := createConfigDirectoryWatcher(c.ConfigDirectory, cm)
if err != nil {
return nil, fmt.Errorf("unable to create config directory watcher: %s", err)
}
cm.configWatcher = w
if err := cm.configWatcher.watch(); err != nil {
return nil, err
}
_ = metrics.RegisterList([]prometheus.Collector{cm.metricTotalRemoteClusters})
return cm, nil
}
// Close stops watching for remote cluster configuration files to appear and
// will close all connections to remote clusters
func (cm *ClusterMesh) Close() {
cm.mutex.Lock()
defer cm.mutex.Unlock()
if cm.configWatcher != nil {
cm.configWatcher.close()
}
for name, cluster := range cm.clusters {
cluster.onRemove()
delete(cm.clusters, name)
}
cm.controllers.RemoveAllAndWait()
metrics.Unregister(cm.metricTotalRemoteClusters)
}
func (cm *ClusterMesh) newRemoteCluster(name, path string) *remoteCluster {
rc := &remoteCluster{
name: name,
configPath: path,
mesh: cm,
changed: make(chan bool, configNotificationsChannelSize),
controllers: controller.NewManager(),
swg: lock.NewStoppableWaitGroup(),
metricLastFailureTimestamp: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: metrics.Namespace,
Subsystem: subsystem,
Name: "remote_cluster_last_failure_ts",
Help: "The timestamp of the last failure of the remote cluster",
}, []string{metrics.LabelSourceCluster, metrics.LabelSourceNodeName, metrics.LabelTargetCluster}),
metricReadinessStatus: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: metrics.Namespace,
Subsystem: subsystem,
Name: "remote_cluster_readiness_status",
Help: "The readiness status of the remote cluster",
}, []string{metrics.LabelSourceCluster, metrics.LabelSourceNodeName, metrics.LabelTargetCluster}),
metricTotalFailures: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: metrics.Namespace,
Subsystem: subsystem,
Name: "remote_cluster_failures",
Help: "The total number of failures related to the remote cluster",
}, []string{metrics.LabelSourceCluster, metrics.LabelSourceNodeName, metrics.LabelTargetCluster}),
metricTotalNodes: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: metrics.Namespace,
Subsystem: subsystem,
Name: "remote_cluster_nodes",
Help: "The total number of nodes in the remote cluster",
}, []string{metrics.LabelSourceCluster, metrics.LabelSourceNodeName, metrics.LabelTargetCluster}),
}
_ = metrics.RegisterList([]prometheus.Collector{rc.metricLastFailureTimestamp, rc.metricReadinessStatus, rc.metricTotalFailures, rc.metricTotalNodes})
return rc
}
func (cm *ClusterMesh) add(name, path string) {
if name == option.Config.ClusterName {
log.WithField(fieldClusterName, name).Debug("Ignoring configuration for own cluster")
return
}
inserted := false
cm.mutex.Lock()
cluster, ok := cm.clusters[name]
if !ok {
cluster = cm.newRemoteCluster(name, path)
cm.clusters[name] = cluster
inserted = true
}
cm.metricTotalRemoteClusters.WithLabelValues(cm.conf.Name, cm.conf.NodeName).Set(float64(len(cm.clusters)))
cm.mutex.Unlock()
log.WithField(fieldClusterName, name).Debug("Remote cluster configuration added")
if inserted {
cluster.onInsert(cm.conf.RemoteIdentityWatcher)
} else {
// signal a change in configuration
cluster.changed <- true
}
}
func (cm *ClusterMesh) remove(name string) {
cm.mutex.Lock()
if cluster, ok := cm.clusters[name]; ok {
cluster.onRemove()
delete(cm.clusters, name)
cm.metricTotalRemoteClusters.WithLabelValues(cm.conf.Name, cm.conf.NodeName).Set(float64(len(cm.clusters)))
cm.globalServices.onClusterDelete(name)
}
cm.mutex.Unlock()
log.WithField(fieldClusterName, name).Debug("Remote cluster configuration removed")
}
// NumReadyClusters returns the number of remote clusters to which a connection
// has been established
func (cm *ClusterMesh) NumReadyClusters() int {
cm.mutex.RLock()
defer cm.mutex.RUnlock()
nready := 0
for _, cm := range cm.clusters {
if cm.isReady() {
nready++
}
}
return nready
}
// ClustersSynced returns after all clusters were synchronized with the bpf
// datapath.
func (cm *ClusterMesh) ClustersSynced(ctx context.Context) error {
cm.mutex.RLock()
swgs := make([]*lock.StoppableWaitGroup, 0, len(cm.clusters))
for _, cluster := range cm.clusters {
swgs = append(swgs, cluster.swg)
}
cm.mutex.RUnlock()
for _, swg := range swgs {
select {
case <-swg.WaitChannel():
case <-ctx.Done():
return ctx.Err()
}
}
return nil
}
// Status returns the status of the ClusterMesh subsystem
func (cm *ClusterMesh) Status() (status *models.ClusterMeshStatus) {
cm.mutex.RLock()
defer cm.mutex.RUnlock()
status = &models.ClusterMeshStatus{
NumGlobalServices: int64(cm.globalServices.size()),
}
for _, cm := range cm.clusters {
status.Clusters = append(status.Clusters, cm.status())
}
return
}