-
Notifications
You must be signed in to change notification settings - Fork 5.4k
/
clusterinfoupdater.go
141 lines (131 loc) · 4.35 KB
/
clusterinfoupdater.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
package controller
import (
"context"
"time"
"fmt"
"github.com/argoproj/gitops-engine/pkg/cache"
"github.com/argoproj/gitops-engine/pkg/utils/kube"
log "github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"github.com/argoproj/argo-cd/v2/controller/metrics"
appv1 "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"
"github.com/argoproj/argo-cd/v2/pkg/client/listers/application/v1alpha1"
"github.com/argoproj/argo-cd/v2/util/argo"
appstatecache "github.com/argoproj/argo-cd/v2/util/cache/appstate"
"github.com/argoproj/argo-cd/v2/util/db"
)
const (
secretUpdateInterval = 10 * time.Second
)
type clusterInfoUpdater struct {
infoSource metrics.HasClustersInfo
db db.ArgoDB
appLister v1alpha1.ApplicationNamespaceLister
cache *appstatecache.Cache
clusterFilter func(cluster *appv1.Cluster) bool
projGetter func(app *appv1.Application) (*appv1.AppProject, error)
namespace string
}
func NewClusterInfoUpdater(
infoSource metrics.HasClustersInfo,
db db.ArgoDB,
appLister v1alpha1.ApplicationNamespaceLister,
cache *appstatecache.Cache,
clusterFilter func(cluster *appv1.Cluster) bool,
projGetter func(app *appv1.Application) (*appv1.AppProject, error),
namespace string) *clusterInfoUpdater {
return &clusterInfoUpdater{infoSource, db, appLister, cache, clusterFilter, projGetter, namespace}
}
func (c *clusterInfoUpdater) Run(ctx context.Context) {
c.updateClusters()
ticker := time.NewTicker(secretUpdateInterval)
for {
select {
case <-ctx.Done():
ticker.Stop()
break
case <-ticker.C:
c.updateClusters()
}
}
}
func (c *clusterInfoUpdater) updateClusters() {
infoByServer := make(map[string]*cache.ClusterInfo)
clustersInfo := c.infoSource.GetClustersInfo()
for i := range clustersInfo {
info := clustersInfo[i]
infoByServer[info.Server] = &info
}
clusters, err := c.db.ListClusters(context.Background())
if err != nil {
log.Warnf("Failed to save clusters info: %v", err)
return
}
var clustersFiltered []appv1.Cluster
if c.clusterFilter == nil {
clustersFiltered = clusters.Items
} else {
for i := range clusters.Items {
if c.clusterFilter(&clusters.Items[i]) {
clustersFiltered = append(clustersFiltered, clusters.Items[i])
}
}
}
_ = kube.RunAllAsync(len(clustersFiltered), func(i int) error {
cluster := clustersFiltered[i]
if err := c.updateClusterInfo(cluster, infoByServer[cluster.Server]); err != nil {
log.Warnf("Failed to save clusters info: %v", err)
}
return nil
})
log.Debugf("Successfully saved info of %d clusters", len(clustersFiltered))
}
func (c *clusterInfoUpdater) updateClusterInfo(cluster appv1.Cluster, info *cache.ClusterInfo) error {
apps, err := c.appLister.List(labels.Everything())
if err != nil {
return fmt.Errorf("error while fetching the apps list: %w", err)
}
var appCount int64
for _, a := range apps {
if c.projGetter != nil {
proj, err := c.projGetter(a)
if err != nil || !proj.IsAppNamespacePermitted(a, c.namespace) {
continue
}
}
if err := argo.ValidateDestination(context.Background(), &a.Spec.Destination, c.db); err != nil {
continue
}
if a.Spec.Destination.Server == cluster.Server {
appCount += 1
}
}
now := metav1.Now()
clusterInfo := appv1.ClusterInfo{
ConnectionState: appv1.ConnectionState{ModifiedAt: &now},
ApplicationsCount: appCount,
}
if info != nil {
clusterInfo.ServerVersion = info.K8SVersion
clusterInfo.APIVersions = argo.APIResourcesToStrings(info.APIResources, true)
if info.LastCacheSyncTime == nil {
clusterInfo.ConnectionState.Status = appv1.ConnectionStatusUnknown
} else if info.SyncError == nil {
clusterInfo.ConnectionState.Status = appv1.ConnectionStatusSuccessful
syncTime := metav1.NewTime(*info.LastCacheSyncTime)
clusterInfo.CacheInfo.LastCacheSyncTime = &syncTime
clusterInfo.CacheInfo.APIsCount = int64(info.APIsCount)
clusterInfo.CacheInfo.ResourcesCount = int64(info.ResourcesCount)
} else {
clusterInfo.ConnectionState.Status = appv1.ConnectionStatusFailed
clusterInfo.ConnectionState.Message = info.SyncError.Error()
}
} else {
clusterInfo.ConnectionState.Status = appv1.ConnectionStatusUnknown
if appCount == 0 {
clusterInfo.ConnectionState.Message = "Cluster has no applications and is not being monitored."
}
}
return c.cache.SetClusterInfo(cluster.Server, &clusterInfo)
}