Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add timeout for update cluster info #14511

Merged
merged 2 commits into from
Aug 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,7 @@ import (
argosettings "github.com/argoproj/argo-cd/v2/util/settings"
)

// TODO: load this using Cobra.
func getSubmoduleEnabled() bool {
return env.ParseBoolFromEnv(common.EnvGitSubmoduleEnabled, true)
}
var gitSubmoduleEnabled = env.ParseBoolFromEnv(common.EnvGitSubmoduleEnabled, true)

func NewCommand() *cobra.Command {
var (
Expand Down Expand Up @@ -156,7 +153,7 @@ func NewCommand() *cobra.Command {
}

repoClientset := apiclient.NewRepoServerClientset(argocdRepoServer, repoServerTimeoutSeconds, tlsConfig)
argoCDService, err := services.NewArgoCDService(argoCDDB, getSubmoduleEnabled(), repoClientset, enableNewGitFileGlobbing)
argoCDService, err := services.NewArgoCDService(argoCDDB, gitSubmoduleEnabled, repoClientset, enableNewGitFileGlobbing)
errors.CheckError(err)

terminalGenerators := map[string]generators.Generator{
Expand Down
47 changes: 15 additions & 32 deletions cmd/argocd-repo-server/commands/argocd_repo_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,33 +35,16 @@ import (

const (
// CLIName is the name of the CLI
cliName = "argocd-repo-server"
gnuPGSourcePath = "/app/config/gpg/source"

defaultPauseGenerationAfterFailedGenerationAttempts = 3
defaultPauseGenerationOnFailureForMinutes = 60
defaultPauseGenerationOnFailureForRequests = 0
cliName = "argocd-repo-server"
)

func getGnuPGSourcePath() string {
return env.StringFromEnv(common.EnvGPGDataPath, gnuPGSourcePath)
}

func getPauseGenerationAfterFailedGenerationAttempts() int {
return env.ParseNumFromEnv(common.EnvPauseGenerationAfterFailedAttempts, defaultPauseGenerationAfterFailedGenerationAttempts, 0, math.MaxInt32)
}

func getPauseGenerationOnFailureForMinutes() int {
return env.ParseNumFromEnv(common.EnvPauseGenerationMinutes, defaultPauseGenerationOnFailureForMinutes, 0, math.MaxInt32)
}

func getPauseGenerationOnFailureForRequests() int {
return env.ParseNumFromEnv(common.EnvPauseGenerationRequests, defaultPauseGenerationOnFailureForRequests, 0, math.MaxInt32)
}

func getSubmoduleEnabled() bool {
return env.ParseBoolFromEnv(common.EnvGitSubmoduleEnabled, true)
}
var (
gnuPGSourcePath = env.StringFromEnv(common.EnvGPGDataPath, "/app/config/gpg/source")
pauseGenerationAfterFailedGenerationAttempts = env.ParseNumFromEnv(common.EnvPauseGenerationAfterFailedAttempts, 3, 0, math.MaxInt32)
pauseGenerationOnFailureForMinutes = env.ParseNumFromEnv(common.EnvPauseGenerationMinutes, 60, 0, math.MaxInt32)
pauseGenerationOnFailureForRequests = env.ParseNumFromEnv(common.EnvPauseGenerationRequests, 0, 0, math.MaxInt32)
gitSubmoduleEnabled = env.ParseBoolFromEnv(common.EnvGitSubmoduleEnabled, true)
)

func NewCommand() *cobra.Command {
var (
Expand Down Expand Up @@ -124,10 +107,10 @@ func NewCommand() *cobra.Command {
cacheutil.CollectMetrics(redisClient, metricsServer)
server, err := reposerver.NewServer(metricsServer, cache, tlsConfigCustomizer, repository.RepoServerInitConstants{
ParallelismLimit: parallelismLimit,
PauseGenerationAfterFailedGenerationAttempts: getPauseGenerationAfterFailedGenerationAttempts(),
PauseGenerationOnFailureForMinutes: getPauseGenerationOnFailureForMinutes(),
PauseGenerationOnFailureForRequests: getPauseGenerationOnFailureForRequests(),
SubmoduleEnabled: getSubmoduleEnabled(),
PauseGenerationAfterFailedGenerationAttempts: pauseGenerationAfterFailedGenerationAttempts,
PauseGenerationOnFailureForMinutes: pauseGenerationOnFailureForMinutes,
PauseGenerationOnFailureForRequests: pauseGenerationOnFailureForRequests,
SubmoduleEnabled: gitSubmoduleEnabled,
MaxCombinedDirectoryManifestsSize: maxCombinedDirectoryManifestsQuantity,
CMPTarExcludedGlobs: cmpTarExcludedGlobs,
AllowOutOfBoundsSymlinks: allowOutOfBoundsSymlinks,
Expand Down Expand Up @@ -181,12 +164,12 @@ func NewCommand() *cobra.Command {
err = gpg.InitializeGnuPG()
errors.CheckError(err)

log.Infof("Populating GnuPG keyring with keys from %s", getGnuPGSourcePath())
added, removed, err := gpg.SyncKeyRingFromDirectory(getGnuPGSourcePath())
log.Infof("Populating GnuPG keyring with keys from %s", gnuPGSourcePath)
added, removed, err := gpg.SyncKeyRingFromDirectory(gnuPGSourcePath)
errors.CheckError(err)
log.Infof("Loaded %d (and removed %d) keys from keyring", len(added), len(removed))

go func() { errors.CheckError(reposerver.StartGPGWatcher(getGnuPGSourcePath())) }()
go func() { errors.CheckError(reposerver.StartGPGWatcher(gnuPGSourcePath)) }()
}

log.Infof("argocd-repo-server is listening on %s", listener.Addr())
Expand Down
9 changes: 2 additions & 7 deletions cmd/argocd-server/commands/argocd_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,10 @@ const (
)

var (
failureRetryCount = 0
failureRetryPeriodMilliSeconds = 100
failureRetryCount = env.ParseNumFromEnv(failureRetryCountEnv, 0, 0, 10)
failureRetryPeriodMilliSeconds = env.ParseNumFromEnv(failureRetryPeriodMilliSecondsEnv, 100, 0, 1000)
)

func init() {
failureRetryCount = env.ParseNumFromEnv(failureRetryCountEnv, failureRetryCount, 0, 10)
failureRetryPeriodMilliSeconds = env.ParseNumFromEnv(failureRetryPeriodMilliSecondsEnv, failureRetryPeriodMilliSeconds, 0, 1000)
}

// NewCommand returns a new instance of an argocd command
func NewCommand() *cobra.Command {
var (
Expand Down
35 changes: 27 additions & 8 deletions controller/clusterinfoupdater.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ package controller
import (
"context"
"fmt"
"time"

"github.com/argoproj/argo-cd/v2/util/env"
"github.com/argoproj/gitops-engine/pkg/cache"
"github.com/argoproj/gitops-engine/pkg/utils/kube"
log "github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"time"

"github.com/argoproj/argo-cd/v2/controller/metrics"
appv1 "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"
Expand All @@ -19,7 +21,13 @@ import (
)

const (
secretUpdateInterval = 10 * time.Second
defaultSecretUpdateInterval = 10 * time.Second

EnvClusterInfoTimeout = "ARGO_CD_UPDATE_CLUSTER_INFO_TIMEOUT"
yyzxw marked this conversation as resolved.
Show resolved Hide resolved
)

var (
clusterInfoTimeout = env.ParseDurationFromEnv(EnvClusterInfoTimeout, defaultSecretUpdateInterval, defaultSecretUpdateInterval, 1*time.Minute)
)

type clusterInfoUpdater struct {
Expand All @@ -30,6 +38,7 @@ type clusterInfoUpdater struct {
clusterFilter func(cluster *appv1.Cluster) bool
projGetter func(app *appv1.Application) (*appv1.AppProject, error)
namespace string
lastUpdated time.Time
}

func NewClusterInfoUpdater(
Expand All @@ -41,12 +50,12 @@ func NewClusterInfoUpdater(
projGetter func(app *appv1.Application) (*appv1.AppProject, error),
namespace string) *clusterInfoUpdater {

return &clusterInfoUpdater{infoSource, db, appLister, cache, clusterFilter, projGetter, namespace}
return &clusterInfoUpdater{infoSource, db, appLister, cache, clusterFilter, projGetter, namespace, time.Time{}}
}

func (c *clusterInfoUpdater) Run(ctx context.Context) {
c.updateClusters()
ticker := time.NewTicker(secretUpdateInterval)
ticker := time.NewTicker(clusterInfoTimeout)
for {
select {
case <-ctx.Done():
Expand All @@ -59,13 +68,23 @@ func (c *clusterInfoUpdater) Run(ctx context.Context) {
}

func (c *clusterInfoUpdater) updateClusters() {
if time.Since(c.lastUpdated) < clusterInfoTimeout {
return
}

ctx, cancel := context.WithTimeout(context.Background(), clusterInfoTimeout)
defer func() {
cancel()
c.lastUpdated = time.Now()
}()

infoByServer := make(map[string]*cache.ClusterInfo)
clustersInfo := c.infoSource.GetClustersInfo()
for i := range clustersInfo {
info := clustersInfo[i]
infoByServer[info.Server] = &info
}
clusters, err := c.db.ListClusters(context.Background())
clusters, err := c.db.ListClusters(ctx)
if err != nil {
log.Warnf("Failed to save clusters info: %v", err)
return
Expand All @@ -82,15 +101,15 @@ func (c *clusterInfoUpdater) updateClusters() {
}
_ = kube.RunAllAsync(len(clustersFiltered), func(i int) error {
cluster := clustersFiltered[i]
if err := c.updateClusterInfo(cluster, infoByServer[cluster.Server]); err != nil {
if err := c.updateClusterInfo(ctx, cluster, infoByServer[cluster.Server]); err != nil {
log.Warnf("Failed to save clusters info: %v", err)
}
return nil
})
log.Debugf("Successfully saved info of %d clusters", len(clustersFiltered))
}

func (c *clusterInfoUpdater) updateClusterInfo(cluster appv1.Cluster, info *cache.ClusterInfo) error {
func (c *clusterInfoUpdater) updateClusterInfo(ctx context.Context, cluster appv1.Cluster, info *cache.ClusterInfo) error {
apps, err := c.appLister.List(labels.Everything())
if err != nil {
return fmt.Errorf("error while fetching the apps list: %w", err)
Expand All @@ -103,7 +122,7 @@ func (c *clusterInfoUpdater) updateClusterInfo(cluster appv1.Cluster, info *cach
continue
}
}
if err := argo.ValidateDestination(context.Background(), &a.Spec.Destination, c.db); err != nil {
if err := argo.ValidateDestination(ctx, &a.Spec.Destination, c.db); err != nil {
continue
}
if a.Spec.Destination.Server == cluster.Server {
Expand Down
2 changes: 1 addition & 1 deletion controller/clusterinfoupdater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func TestClusterSecretUpdater(t *testing.T) {
lister := applisters.NewApplicationLister(appInformer.GetIndexer()).Applications(fakeNamespace)
updater := NewClusterInfoUpdater(nil, argoDB, lister, appCache, nil, nil, fakeNamespace)

err = updater.updateClusterInfo(*cluster, info)
err = updater.updateClusterInfo(context.Background(), *cluster, info)
assert.NoError(t, err, "Invoking updateClusterInfo failed.")

var clusterInfo v1alpha1.ClusterInfo
Expand Down
2 changes: 2 additions & 0 deletions docs/operator-manual/high_availability.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ controller replicas. To enable sharding increase the number of replicas in `argo
and repeat the number of replicas in the `ARGOCD_CONTROLLER_REPLICAS` environment variable. The strategic merge patch below
demonstrates changes required to configure two controller replicas.

* By default, the controller will update the cluster information every 10 seconds. If there is a problem with your cluster network environment that is causing the update time to take a long time, you can try modifying the environment variable `ARGO_CD_UPDATE_CLUSTER_INFO_TIMEOUT` to increase the timeout (the unit is seconds).

```yaml
apiVersion: apps/v1
kind: StatefulSet
Expand Down
8 changes: 1 addition & 7 deletions util/config/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,7 @@ func unmarshalObject(data []byte, obj interface{}) error {
if err != nil {
return err
}

err = json.Unmarshal(jsonData, &obj)
if err != nil {
return err
}

return err
return json.Unmarshal(jsonData, &obj)
}

// MarshalLocalYAMLFile writes JSON or YAML to a file on disk.
Expand Down