Skip to content

Commit

Permalink
feat: add timeout for update cluster info
Browse files Browse the repository at this point in the history
Signed-off-by: yyzxw <1020938856@qq.com>
  • Loading branch information
yyzxw authored and xiaowu.zhu committed Aug 1, 2023
1 parent c7c6219 commit 6827ad5
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 9 deletions.
35 changes: 27 additions & 8 deletions controller/clusterinfoupdater.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ package controller
import (
"context"
"fmt"
"time"

"github.com/argoproj/argo-cd/v2/util/env"
"github.com/argoproj/gitops-engine/pkg/cache"
"github.com/argoproj/gitops-engine/pkg/utils/kube"
log "github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"time"

"github.com/argoproj/argo-cd/v2/controller/metrics"
appv1 "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"
Expand All @@ -19,7 +21,13 @@ import (
)

const (
secretUpdateInterval = 10 * time.Second
defaultSecretUpdateInterval = 10 * time.Second

EnvClusterInfoTimeout = "ARGO_CD_UPDATE_CLUSTER_INFO_TIMEOUT"
)

var (
clusterInfoTimeout = env.ParseDurationFromEnv(EnvClusterInfoTimeout, defaultSecretUpdateInterval, defaultSecretUpdateInterval, 1*time.Minute)
)

type clusterInfoUpdater struct {
Expand All @@ -30,6 +38,7 @@ type clusterInfoUpdater struct {
clusterFilter func(cluster *appv1.Cluster) bool
projGetter func(app *appv1.Application) (*appv1.AppProject, error)
namespace string
lastUpdated time.Time
}

func NewClusterInfoUpdater(
Expand All @@ -41,12 +50,12 @@ func NewClusterInfoUpdater(
projGetter func(app *appv1.Application) (*appv1.AppProject, error),
namespace string) *clusterInfoUpdater {

return &clusterInfoUpdater{infoSource, db, appLister, cache, clusterFilter, projGetter, namespace}
return &clusterInfoUpdater{infoSource, db, appLister, cache, clusterFilter, projGetter, namespace, time.Time{}}
}

func (c *clusterInfoUpdater) Run(ctx context.Context) {
c.updateClusters()
ticker := time.NewTicker(secretUpdateInterval)
ticker := time.NewTicker(clusterInfoTimeout)
for {
select {
case <-ctx.Done():
Expand All @@ -59,13 +68,23 @@ func (c *clusterInfoUpdater) Run(ctx context.Context) {
}

func (c *clusterInfoUpdater) updateClusters() {
if time.Since(c.lastUpdated) < clusterInfoTimeout {
return
}

ctx, cancel := context.WithTimeout(context.Background(), clusterInfoTimeout)
defer func() {
cancel()
c.lastUpdated = time.Now()
}()

infoByServer := make(map[string]*cache.ClusterInfo)
clustersInfo := c.infoSource.GetClustersInfo()
for i := range clustersInfo {
info := clustersInfo[i]
infoByServer[info.Server] = &info
}
clusters, err := c.db.ListClusters(context.Background())
clusters, err := c.db.ListClusters(ctx)
if err != nil {
log.Warnf("Failed to save clusters info: %v", err)
return
Expand All @@ -82,15 +101,15 @@ func (c *clusterInfoUpdater) updateClusters() {
}
_ = kube.RunAllAsync(len(clustersFiltered), func(i int) error {
cluster := clustersFiltered[i]
if err := c.updateClusterInfo(cluster, infoByServer[cluster.Server]); err != nil {
if err := c.updateClusterInfo(ctx, cluster, infoByServer[cluster.Server]); err != nil {
log.Warnf("Failed to save clusters info: %v", err)
}
return nil
})
log.Debugf("Successfully saved info of %d clusters", len(clustersFiltered))
}

func (c *clusterInfoUpdater) updateClusterInfo(cluster appv1.Cluster, info *cache.ClusterInfo) error {
func (c *clusterInfoUpdater) updateClusterInfo(ctx context.Context, cluster appv1.Cluster, info *cache.ClusterInfo) error {
apps, err := c.appLister.List(labels.Everything())
if err != nil {
return fmt.Errorf("error while fetching the apps list: %w", err)
Expand All @@ -103,7 +122,7 @@ func (c *clusterInfoUpdater) updateClusterInfo(cluster appv1.Cluster, info *cach
continue
}
}
if err := argo.ValidateDestination(context.Background(), &a.Spec.Destination, c.db); err != nil {
if err := argo.ValidateDestination(ctx, &a.Spec.Destination, c.db); err != nil {
continue
}
if a.Spec.Destination.Server == cluster.Server {
Expand Down
2 changes: 1 addition & 1 deletion controller/clusterinfoupdater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func TestClusterSecretUpdater(t *testing.T) {
lister := applisters.NewApplicationLister(appInformer.GetIndexer()).Applications(fakeNamespace)
updater := NewClusterInfoUpdater(nil, argoDB, lister, appCache, nil, nil, fakeNamespace)

err = updater.updateClusterInfo(*cluster, info)
err = updater.updateClusterInfo(context.Background(), *cluster, info)
assert.NoError(t, err, "Invoking updateClusterInfo failed.")

var clusterInfo v1alpha1.ClusterInfo
Expand Down
2 changes: 2 additions & 0 deletions docs/operator-manual/high_availability.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ controller replicas. To enable sharding increase the number of replicas in `argo
and repeat the number of replicas in the `ARGOCD_CONTROLLER_REPLICAS` environment variable. The strategic merge patch below
demonstrates changes required to configure two controller replicas.

* By default, the controller will update the cluster information every 10 seconds. If there is a problem with your cluster network environment that is causing the update time to take a long time, you can try modifying the environment variable `ARGO_CD_UPDATE_CLUSTER_INFO_TIMEOUT` to increase the timeout (the unit is seconds).

```yaml
apiVersion: apps/v1
kind: StatefulSet
Expand Down

0 comments on commit 6827ad5

Please sign in to comment.