Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Fixes sharding placement algorithm and allows development of alternative algorithms #13018

Merged
merged 6 commits into from
Jun 5, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
cacheutil "github.com/argoproj/argo-cd/v2/util/cache"
appstatecache "github.com/argoproj/argo-cd/v2/util/cache/appstate"
"github.com/argoproj/argo-cd/v2/util/cli"
"github.com/argoproj/argo-cd/v2/util/db"
"github.com/argoproj/argo-cd/v2/util/env"
"github.com/argoproj/argo-cd/v2/util/errors"
kubeutil "github.com/argoproj/argo-cd/v2/util/kube"
Expand Down Expand Up @@ -62,6 +63,7 @@ func NewCommand() *cobra.Command {
otlpAddress string
applicationNamespaces []string
persistResourceHealth bool
shardingAlgorithm string
)
var command = cobra.Command{
Use: cliName,
Expand Down Expand Up @@ -134,7 +136,7 @@ func NewCommand() *cobra.Command {
appController.InvalidateProjectsCache()
}))
kubectl := kubeutil.NewKubectl()
clusterFilter := getClusterFilter()
clusterFilter := getClusterFilter(kubeClient, settingsMgr, shardingAlgorithm)
appController, err = controller.NewApplicationController(
namespace,
settingsMgr,
Expand All @@ -152,7 +154,8 @@ func NewCommand() *cobra.Command {
kubectlParallelismLimit,
persistResourceHealth,
clusterFilter,
applicationNamespaces)
applicationNamespaces,
)
errors.CheckError(err)
cacheutil.CollectMetrics(redisClient, appController.GetMetricsServer())

Expand Down Expand Up @@ -195,13 +198,14 @@ func NewCommand() *cobra.Command {
command.Flags().StringVar(&otlpAddress, "otlp-address", env.StringFromEnv("ARGOCD_APPLICATION_CONTROLLER_OTLP_ADDRESS", ""), "OpenTelemetry collector address to send traces to")
command.Flags().StringSliceVar(&applicationNamespaces, "application-namespaces", env.StringsFromEnv("ARGOCD_APPLICATION_NAMESPACES", []string{}, ","), "List of additional namespaces that applications are allowed to be reconciled from")
command.Flags().BoolVar(&persistResourceHealth, "persist-resource-health", env.ParseBoolFromEnv("ARGOCD_APPLICATION_CONTROLLER_PERSIST_RESOURCE_HEALTH", true), "Enables storing the managed resources health in the Application CRD")
command.Flags().StringVar(&shardingAlgorithm, "sharding-method", env.StringFromEnv(common.EnvControllerShardingAlgorithm, common.DefaultShardingAlgorithm), "Enables choice of sharding method. Supported sharding methods are : [legacy, round-robin] ")
cacheSrc = appstatecache.AddCacheFlagsToCmd(&command, func(client *redis.Client) {
redisClient = client
})
return &command
}

func getClusterFilter() func(cluster *v1alpha1.Cluster) bool {
func getClusterFilter(kubeClient *kubernetes.Clientset, settingsMgr *settings.SettingsManager, shardingAlgorithm string) sharding.ClusterFilterFunction {
replicas := env.ParseNumFromEnv(common.EnvControllerReplicas, 0, 0, math.MaxInt32)
shard := env.ParseNumFromEnv(common.EnvControllerShard, -1, -math.MaxInt32, math.MaxInt32)
var clusterFilter func(cluster *v1alpha1.Cluster) bool
Expand All @@ -212,7 +216,10 @@ func getClusterFilter() func(cluster *v1alpha1.Cluster) bool {
errors.CheckError(err)
}
log.Infof("Processing clusters from shard %d", shard)
clusterFilter = sharding.GetClusterFilter(replicas, shard)
db := db.NewDB(settingsMgr.GetNamespace(), settingsMgr, kubeClient)
log.Infof("Using filter function: %s", shardingAlgorithm)
distributionFunction := sharding.GetDistributionFunction(db, shardingAlgorithm)
clusterFilter = sharding.GetClusterFilter(distributionFunction, shard)
} else {
log.Info("Processing all cluster shards")
}
Expand Down
8 changes: 6 additions & 2 deletions cmd/argocd/commands/admin/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/utils/pointer"

cmdutil "github.com/argoproj/argo-cd/v2/cmd/util"
"github.com/argoproj/argo-cd/v2/common"
Expand Down Expand Up @@ -115,10 +116,13 @@ func loadClusters(ctx context.Context, kubeClient *kubernetes.Clientset, appClie
}
batch := clustersList.Items[batchStart:batchEnd]
_ = kube.RunAllAsync(len(batch), func(i int) error {
cluster := batch[i]
clusterShard := 0
cluster := batch[i]
if replicas > 0 {
clusterShard = sharding.GetShardByID(cluster.ID, replicas)
distributionFunction := sharding.GetDistributionFunction(argoDB, common.DefaultShardingAlgorithm)
distributionFunction(&cluster)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this line be clusterShard=distributionFunction(&cluster) ?

cluster.Shard = pointer.Int64Ptr(int64(clusterShard))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cluster.Shard should not be updated. That field is meant for users to explicitly set static shard value so that right application controller instance with that shard value picks it up.

log.Infof("Cluster with uid: %s will be processed by shard %d", cluster.ID, clusterShard)
}

if shard != -1 && clusterShard != shard {
Expand Down
8 changes: 8 additions & 0 deletions common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,12 @@ const (

// PasswordPatten is the default password patten
PasswordPatten = `^.{8,32}$`

//LegacyShardingAlgorithm is the default value for Sharding Algorithm it uses an `uid` based distribution (non-uniform)
LegacyShardingAlgorithm = "legacy"
//RoundRobinShardingAlgorithm is a flag value that can be opted for Sharding Algorithm it uses an equal distribution accross all shards
RoundRobinShardingAlgorithm = "round-robin"
DefaultShardingAlgorithm = LegacyShardingAlgorithm
)

// Dex related constants
Expand Down Expand Up @@ -203,6 +209,8 @@ const (
EnvControllerReplicas = "ARGOCD_CONTROLLER_REPLICAS"
// EnvControllerShard is the shard number that should be handled by controller
EnvControllerShard = "ARGOCD_CONTROLLER_SHARD"
// EnvControllerShardingAlgorithm is the distribution sharding algorithm to be used: legacy or round-robin
EnvControllerShardingAlgorithm = "ARGOCD_CONTROLLER_SHARDING_ALGORITHM"
akram marked this conversation as resolved.
Show resolved Hide resolved
// EnvEnableGRPCTimeHistogramEnv enables gRPC metrics collection
EnvEnableGRPCTimeHistogramEnv = "ARGOCD_ENABLE_GRPC_TIME_HISTOGRAM"
// EnvGithubAppCredsExpirationDuration controls the caching of Github app credentials. This value is in minutes (default: 60)
Expand Down
15 changes: 11 additions & 4 deletions controller/appcontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,17 @@ import (
"github.com/argoproj/argo-cd/v2/common"
statecache "github.com/argoproj/argo-cd/v2/controller/cache"
"github.com/argoproj/argo-cd/v2/controller/metrics"
"github.com/argoproj/argo-cd/v2/controller/sharding"
"github.com/argoproj/argo-cd/v2/pkg/apis/application"
appv1 "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"
argov1alpha "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"
appclientset "github.com/argoproj/argo-cd/v2/pkg/client/clientset/versioned"
"github.com/argoproj/argo-cd/v2/pkg/client/informers/externalversions/application/v1alpha1"
applisters "github.com/argoproj/argo-cd/v2/pkg/client/listers/application/v1alpha1"
"github.com/argoproj/argo-cd/v2/reposerver/apiclient"
"github.com/argoproj/argo-cd/v2/util/argo"
argodiff "github.com/argoproj/argo-cd/v2/util/argo/diff"

appstatecache "github.com/argoproj/argo-cd/v2/util/cache/appstate"
"github.com/argoproj/argo-cd/v2/util/db"
"github.com/argoproj/argo-cd/v2/util/errors"
Expand Down Expand Up @@ -229,10 +232,12 @@ func (ctrl *ApplicationController) InvalidateProjectsCache(names ...string) {
ctrl.projByNameCache.Delete(name)
}
} else {
ctrl.projByNameCache.Range(func(key, _ interface{}) bool {
ctrl.projByNameCache.Delete(key)
return true
})
if ctrl != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this nil check really needed ?

ctrl.projByNameCache.Range(func(key, _ interface{}) bool {
ctrl.projByNameCache.Delete(key)
return true
})
}
}
}

Expand Down Expand Up @@ -2010,3 +2015,5 @@ func (ctrl *ApplicationController) toAppKey(appName string) string {
func (ctrl *ApplicationController) toAppQualifiedName(appName, appNamespace string) string {
return fmt.Sprintf("%s/%s", appNamespace, appName)
}

type ClusterFilterFunction func(c *argov1alpha.Cluster, distributionFunction sharding.DistributionFunction) bool
1 change: 0 additions & 1 deletion controller/appcontroller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,6 @@ metadata:
namespace: ` + test.FakeArgoCDNamespace + `
type: Opaque
`

var fakeApp = `
apiVersion: argoproj.io/v1alpha1
kind: Application
Expand Down
155 changes: 133 additions & 22 deletions controller/sharding/sharding.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,127 @@
package sharding

import (
"context"
"fmt"
"hash/fnv"
"math"
"os"
"sort"
"strconv"
"strings"

"github.com/argoproj/argo-cd/v2/common"
"github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"

"github.com/argoproj/argo-cd/v2/util/db"
"github.com/argoproj/argo-cd/v2/util/env"
log "github.com/sirupsen/logrus"
)

// Make it overridable for testing
var osHostnameFunction = os.Hostname

type DistributionFunction func(c *v1alpha1.Cluster) int
type ClusterFilterFunction func(c *v1alpha1.Cluster) bool

// GetClusterFilter returns a ClusterFilterFunction which is a function taking a cluster as a parameter
// and returns wheter or not the cluster should be processed by a given shard. It calls the distributionFunction
// to determine which shard will process the cluster, and if the given shard is equal to the calculated shard
// the function will return true.
func GetClusterFilter(distributionFunction DistributionFunction, shard int) ClusterFilterFunction {
replicas := env.ParseNumFromEnv(common.EnvControllerReplicas, 0, 0, math.MaxInt32)
return func(c *v1alpha1.Cluster) bool {
clusterShard := 0
if c != nil && c.Shard != nil {
requestedShard := int(*c.Shard)
if requestedShard < replicas {
clusterShard = requestedShard
} else {
log.Warnf("Specified cluster shard (%d) for cluster: %s is greater than the number of available shard. Assigning automatically.", requestedShard, c.Name)
}
} else {
clusterShard = distributionFunction(c)
}
return clusterShard == shard
}
}

// GetDistributionFunction returns which DistributionFunction should be used based on the passed algorithm and
// the current datas.
func GetDistributionFunction(db db.ArgoDB, shardingAlgorithm string) DistributionFunction {
log.Infof("Using filter function: %s", shardingAlgorithm)
distributionFunction := LegacyDistributionFunction()
switch shardingAlgorithm {
case common.RoundRobinShardingAlgorithm:
distributionFunction = RoundRobinDistributionFunction(db)
case common.LegacyShardingAlgorithm:
distributionFunction = LegacyDistributionFunction()
default:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default behaviour should be legacy, So can't we just have 2 cases case filterFunctionName == "hash" and default ? Also it would be better if this logic could be moved to cmd/argocd-application-controller/controllers/argocd_application_control.go where it would be possible to switch the algorithm using command line flags.

log.Warnf("distribution type %s is not supported, defaulting to %s", shardingAlgorithm, common.DefaultShardingAlgorithm)
}
return distributionFunction
}

// LegacyDistributionFunction returns a DistributionFunction using a stable distribution algorithm:
// for a given cluster the function will return the shard number based on the cluster id. This function
// is lightweight and can be distributed easily, however, it does not ensure an homogenous distribution as
// some shards may get assigned more clusters than others. It is the legacy function distribution that is
// kept for compatibility reasons
func LegacyDistributionFunction() DistributionFunction {
replicas := env.ParseNumFromEnv(common.EnvControllerReplicas, 0, 0, math.MaxInt32)
return func(c *v1alpha1.Cluster) int {
if replicas == 0 {
return -1
}
if c == nil {
return 0
}
id := c.ID
log.Debugf("Calculating cluster shard for cluster id: %s", id)
if id == "" {
return 0
} else {
h := fnv.New32a()
_, _ = h.Write([]byte(id))
shard := int32(h.Sum32() % uint32(replicas))
log.Infof("Cluster with id=%s will be processed by shard %d", id, shard)
return int(shard)
}
}
}

// RoundRobinDistributionFunction returns a DistributionFunction using an homogeneous distribution algorithm:
// for a given cluster the function will return the shard number based on the modulo of the cluster rank in
// the cluster's list sorted by uid on the shard number.
// This function ensures an homogenous distribution: each shards got assigned the same number of
// clusters +/-1 , but with the drawback of a reshuffling of clusters accross shards in case of some changes
// in the cluster list
func RoundRobinDistributionFunction(db db.ArgoDB) DistributionFunction {
replicas := env.ParseNumFromEnv(common.EnvControllerReplicas, 0, 0, math.MaxInt32)
return func(c *v1alpha1.Cluster) int {
if replicas > 0 {
if c == nil { // in-cluster does not necessarly have a secret assigned. So we are receiving a nil cluster here.
return 0
} else {
clusterIndexdByClusterIdMap := createClusterIndexByClusterIdMap(db)
clusterIndex, ok := clusterIndexdByClusterIdMap[c.ID]
if !ok {
log.Warnf("Cluster with id=%s not found in cluster map.", c.ID)
return -1
}
shard := int(clusterIndex % replicas)
log.Infof("Cluster with id=%s will be processed by shard %d", c.ID, shard)
return shard
}
}
log.Warnf("The number of replicas (%d) is lower than 1", replicas)
return -1
}
}

// InferShard extracts the shard index based on its hostname.
func InferShard() (int, error) {
hostname, err := os.Hostname()
hostname, err := osHostnameFunction()
if err != nil {
return 0, err
}
Expand All @@ -23,31 +133,32 @@ func InferShard() (int, error) {
if err != nil {
return 0, fmt.Errorf("hostname should ends with shard number separated by '-' but got: %s", hostname)
}
return shard, nil
return int(shard), nil
}

// GetShardByID calculates cluster shard as `clusterSecret.UID % replicas count`
func GetShardByID(id string, replicas int) int {
if id == "" {
return 0
} else {
h := fnv.New32a()
_, _ = h.Write([]byte(id))
return int(h.Sum32() % uint32(replicas))
func getSortedClustersList(db db.ArgoDB) []v1alpha1.Cluster {
ctx := context.Background()
clustersList, dbErr := db.ListClusters(ctx)
if dbErr != nil {
log.Warnf("Error while querying clusters list from database: %v", dbErr)
akram marked this conversation as resolved.
Show resolved Hide resolved
return []v1alpha1.Cluster{}
}
clusters := clustersList.Items
sort.Slice(clusters, func(i, j int) bool {
return clusters[i].ID < clusters[j].ID
})
return clusters
}

func GetClusterFilter(replicas int, shard int) func(c *v1alpha1.Cluster) bool {
return func(c *v1alpha1.Cluster) bool {
clusterShard := 0
// cluster might be nil if app is using invalid cluster URL, assume shard 0 in this case.
if c != nil {
if c.Shard != nil {
clusterShard = int(*c.Shard)
} else {
clusterShard = GetShardByID(c.ID, replicas)
}
}
return clusterShard == shard
func createClusterIndexByClusterIdMap(db db.ArgoDB) map[string]int {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add some comments for the newly added functions ?

clusters := getSortedClustersList(db)
log.Debugf("ClustersList has %d items", len(clusters))
clusterById := make(map[string]v1alpha1.Cluster)
clusterIndexedByClusterId := make(map[string]int)
for i, cluster := range clusters {
log.Debugf("Adding cluster with id=%s and name=%s to cluster's map", cluster.ID, cluster.Name)
clusterById[cluster.ID] = cluster
clusterIndexedByClusterId[cluster.ID] = i
}
return clusterIndexedByClusterId
}