From ca2e87a67acd76fe9eca42e765e0b76f35077e09 Mon Sep 17 00:00:00 2001 From: Akram Ben Aissi Date: Thu, 7 Dec 2023 01:07:58 +0100 Subject: [PATCH 1/4] Adds consistent hashing with bound loads sharding algorithm Signed-off-by: Akram Ben Aissi --- common/common.go | 7 +- controller/sharding/consistent/consistent.go | 288 +++++++++++++++++++ controller/sharding/sharding.go | 57 ++++ controller/sharding/sharding_test.go | 103 +++++++ go.mod | 7 +- go.sum | 2 + 6 files changed, 460 insertions(+), 4 deletions(-) create mode 100644 controller/sharding/consistent/consistent.go diff --git a/common/common.go b/common/common.go index 090cd33965e54..9032f6859f2d0 100644 --- a/common/common.go +++ b/common/common.go @@ -117,7 +117,12 @@ const ( RoundRobinShardingAlgorithm = "round-robin" // AppControllerHeartbeatUpdateRetryCount is the retry count for updating the Shard Mapping to the Shard Mapping ConfigMap used by Application Controller AppControllerHeartbeatUpdateRetryCount = 3 - DefaultShardingAlgorithm = LegacyShardingAlgorithm + // ConsistentHashingWithBoundedLoadsAlgorithm uses an algorithm that tries to use an equal distribution accross + // all shards but is optimised to handled sharding and/or cluster addings or removal. In case of sharding or + // cluster changes, this algorithm minimise the changes between shard and clusters assignments. + ConsistentHashingWithBoundedLoadsAlgorithm = "consistent-hashing" + + DefaultShardingAlgorithm = LegacyShardingAlgorithm ) // Dex related constants diff --git a/controller/sharding/consistent/consistent.go b/controller/sharding/consistent/consistent.go new file mode 100644 index 0000000000000..aad5b144155d2 --- /dev/null +++ b/controller/sharding/consistent/consistent.go @@ -0,0 +1,288 @@ +// An implementation of Consistent Hashing and +// Consistent Hashing With Bounded Loads. +// +// https://en.wikipedia.org/wiki/Consistent_hashing +// +// https://research.googleblog.com/2017/04/consistent-hashing-with-bounded-loads.html +package consistent + +import ( + "encoding/binary" + "errors" + "fmt" + "math" + "sync" + "sync/atomic" + + "github.com/google/btree" + + blake2b "github.com/minio/blake2b-simd" +) + +var ErrNoHosts = errors.New("no hosts added") + +type Host struct { + Name string + Load int64 +} + +type Consistent struct { + servers map[uint64]string + clients *btree.BTree + loadMap map[string]*Host + totalLoad int64 + replicationFactor int + + sync.RWMutex +} + +type item struct { + value uint64 +} + +func (i item) Less(than btree.Item) bool { + return i.value < than.(item).value +} + +func New() *Consistent { + return &Consistent{ + servers: map[uint64]string{}, + clients: btree.New(2), + loadMap: map[string]*Host{}, + replicationFactor: 1000, + } +} + +func NewWithReplicationFactor(replicationFactor int) *Consistent { + return &Consistent{ + servers: map[uint64]string{}, + clients: btree.New(2), + loadMap: map[string]*Host{}, + replicationFactor: replicationFactor, + } +} +func (c *Consistent) Add(server string) { + c.Lock() + defer c.Unlock() + + if _, ok := c.loadMap[server]; ok { + return + } + + c.loadMap[server] = &Host{Name: server, Load: 0} + for i := 0; i < c.replicationFactor; i++ { + h := c.hash(fmt.Sprintf("%s%d", server, i)) + c.servers[h] = server + c.clients.ReplaceOrInsert(item{h}) + } +} + +// Get returns the server that owns the given client. +// As described in https://en.wikipedia.org/wiki/Consistent_hashing +// It returns ErrNoHosts if the ring has no servers in it. +func (c *Consistent) Get(client string) (string, error) { + c.RLock() + defer c.RUnlock() + + if c.clients.Len() == 0 { + return "", ErrNoHosts + } + + h := c.hash(client) + var foundItem btree.Item + c.clients.AscendGreaterOrEqual(item{h}, func(i btree.Item) bool { + foundItem = i + return false // stop the iteration + }) + + if foundItem == nil { + // If no host found, wrap around to the first one. + foundItem = c.clients.Min() + } + + host := c.servers[foundItem.(item).value] + + return host, nil +} + +// GetLeast returns the least loaded host that can serve the key. +// It uses Consistent Hashing With Bounded loads. +// https://research.googleblog.com/2017/04/consistent-hashing-with-bounded-loads.html +// It returns ErrNoHosts if the ring has no hosts in it. +func (c *Consistent) GetLeast(client string) (string, error) { + c.RLock() + defer c.RUnlock() + + if c.clients.Len() == 0 { + return "", ErrNoHosts + } + + h := c.hash(client) + idx := c.search(h) + + i := idx + for { + x := item{uint64(i)} + key := c.clients.Get(x) + if key != nil { + host := c.servers[key.(*item).value] + if c.loadOK(host) { + return host, nil + } + i++ + if i >= c.clients.Len() { + i = 0 + } + } else { + return client, nil + } + } +} + +func (c *Consistent) search(key uint64) int { + idx := 0 + found := false + + c.clients.Ascend(func(i btree.Item) bool { + if i.(item).value >= key { + found = true + return false // stop the iteration + } + idx++ + return true + }) + + if !found { + idx = 0 + } + + return idx +} + +// Sets the load of `server` to the given `load` +func (c *Consistent) UpdateLoad(server string, load int64) { + c.Lock() + defer c.Unlock() + + if _, ok := c.loadMap[server]; !ok { + return + } + c.totalLoad -= c.loadMap[server].Load + c.loadMap[server].Load = load + c.totalLoad += load +} + +// Increments the load of host by 1 +// +// should only be used with if you obtained a host with GetLeast +func (c *Consistent) Inc(server string) { + c.Lock() + defer c.Unlock() + + if _, ok := c.loadMap[server]; !ok { + return + } + atomic.AddInt64(&c.loadMap[server].Load, 1) + atomic.AddInt64(&c.totalLoad, 1) +} + +// Decrements the load of host by 1 +// +// should only be used with if you obtained a host with GetLeast +func (c *Consistent) Done(server string) { + c.Lock() + defer c.Unlock() + + if _, ok := c.loadMap[server]; !ok { + return + } + atomic.AddInt64(&c.loadMap[server].Load, -1) + atomic.AddInt64(&c.totalLoad, -1) +} + +// Deletes host from the ring +func (c *Consistent) Remove(server string) bool { + c.Lock() + defer c.Unlock() + + for i := 0; i < c.replicationFactor; i++ { + h := c.hash(fmt.Sprintf("%s%d", server, i)) + delete(c.servers, h) + c.delSlice(h) + } + delete(c.loadMap, server) + return true +} + +// Return the list of servers in the ring +func (c *Consistent) Servers() (servers []string) { + c.RLock() + defer c.RUnlock() + for k := range c.loadMap { + servers = append(servers, k) + } + return servers +} + +// Returns the loads of all the hosts +func (c *Consistent) GetLoads() map[string]int64 { + loads := map[string]int64{} + + for k, v := range c.loadMap { + loads[k] = v.Load + } + return loads +} + +// Returns the maximum load of the single host +// which is: +// (total_load/number_of_hosts)*1.25 +// total_load = is the total number of active requests served by hosts +// for more info: +// https://research.googleblog.com/2017/04/consistent-hashing-with-bounded-loads.html +func (c *Consistent) MaxLoad() int64 { + if c.totalLoad == 0 { + c.totalLoad = 1 + } + var avgLoadPerNode float64 + avgLoadPerNode = float64(c.totalLoad / int64(len(c.loadMap))) + if avgLoadPerNode == 0 { + avgLoadPerNode = 1 + } + avgLoadPerNode = math.Ceil(avgLoadPerNode * 1.25) + return int64(avgLoadPerNode) +} + +func (c *Consistent) loadOK(server string) bool { + // a safety check if someone performed c.Done more than needed + if c.totalLoad < 0 { + c.totalLoad = 0 + } + + var avgLoadPerNode float64 + avgLoadPerNode = float64((c.totalLoad + 1) / int64(len(c.loadMap))) + if avgLoadPerNode == 0 { + avgLoadPerNode = 1 + } + avgLoadPerNode = math.Ceil(avgLoadPerNode * 1.25) + + bserver, ok := c.loadMap[server] + if !ok { + panic(fmt.Sprintf("given host(%s) not in loadsMap", bserver.Name)) + } + + if float64(bserver.Load)+1 <= avgLoadPerNode { + return true + } + + return false +} + +func (c *Consistent) delSlice(val uint64) { + c.clients.Delete(item{val}) +} + +func (c *Consistent) hash(key string) uint64 { + out := blake2b.Sum512([]byte(key)) + return binary.LittleEndian.Uint64(out[:]) +} diff --git a/controller/sharding/sharding.go b/controller/sharding/sharding.go index c415acf0b8b04..dc329df73dfc1 100644 --- a/controller/sharding/sharding.go +++ b/controller/sharding/sharding.go @@ -14,7 +14,9 @@ import ( "encoding/json" "github.com/argoproj/argo-cd/v2/common" + "github.com/argoproj/argo-cd/v2/controller/sharding/consistent" "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1" + slices "golang.org/x/exp/slices" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" @@ -84,6 +86,8 @@ func GetDistributionFunction(clusters clusterAccessor, apps appAccessor, shardin distributionFunction = RoundRobinDistributionFunction(clusters, replicasCount) case common.LegacyShardingAlgorithm: distributionFunction = LegacyDistributionFunction(replicasCount) + case common.ConsistentHashingWithBoundedLoadsAlgorithm: + distributionFunction = ConsistentHashingWithBoundedLoadsDistributionFunction(clusters, replicasCount) default: log.Warnf("distribution type %s is not supported, defaulting to %s", shardingAlgorithm, common.DefaultShardingAlgorithm) } @@ -158,6 +162,59 @@ func RoundRobinDistributionFunction(clusters clusterAccessor, replicas int) Dist } } +// ConsistentHashingWithBoundedLoadsDistributionFunction returns a DistributionFunction using an almost homogeneous distribution algorithm: +// for a given cluster the function will return the shard number based on a consistent hashing with bounded loads algorithm. +// This function ensures an almost homogenous distribution: each shards got assigned the fairly similar number of +// clusters +/-10% , but with it is resilient to sharding and/or number of clusters changes. +func ConsistentHashingWithBoundedLoadsDistributionFunction(clusters clusterAccessor, replicas int) DistributionFunction { + return func(c *v1alpha1.Cluster) int { + if replicas > 0 { + if c == nil { // in-cluster does not necessarly have a secret assigned. So we are receiving a nil cluster here. + return 0 + } + + // if Shard is manually set and the assigned value is lower than the number of replicas, + // then its value is returned otherwise it is the default calculated value + if c.Shard != nil && int(*c.Shard) < replicas { + return int(*c.Shard) + } else { + // if the cluster is not in the clusters list anymore, we should unassign it from any shard, so we + // return the reserved value of -1 + if !slices.Contains(clusters(), c) { + log.Warnf("Cluster with id=%s not found in cluster map.", c.ID) + return -1 + } + consistentHashing := createConsistentHashingWithBoundLoads(replicas) + clusterIndex, err := consistentHashing.Get(c.ID) + if err != nil { + log.Warnf("Cluster with id=%s not found in cluster map.", c.ID) + return -1 + } + shard, err := strconv.Atoi(clusterIndex) + if err != nil { + log.Errorf("Consistent Hashing was supposed to return a shard index but it returned %d", err) + return -1 + } + log.Debugf("Cluster with id=%s will be processed by shard %d", c.ID, shard) + return shard + } + } + log.Warnf("The number of replicas (%d) is lower than 1", replicas) + return -1 + } +} + +func createConsistentHashingWithBoundLoads(replicas int) *consistent.Consistent { + consistentHashing := consistent.New() + // Adding a shard with id "-1" as a reserved value for clusters that does not have an assigned shard + // this happens for clusters that are removed for the clusters list + //consistentHashing.Add("-1") + for i := 0; i < replicas; i++ { + consistentHashing.Add(strconv.Itoa(i)) + } + return consistentHashing +} + // NoShardingDistributionFunction returns a DistributionFunction that will process all cluster by shard 0 // the function is created for API compatibility purposes and is not supposed to be activated. func NoShardingDistributionFunction() DistributionFunction { diff --git a/controller/sharding/sharding_test.go b/controller/sharding/sharding_test.go index 1c338aac5f271..5664ae4579535 100644 --- a/controller/sharding/sharding_test.go +++ b/controller/sharding/sharding_test.go @@ -275,6 +275,109 @@ func TestGetShardByIndexModuloReplicasCountDistributionFunctionWhenClusterIsAdde assert.Equal(t, -1, distributionFunction(&cluster6)) } +func TestConsistentHashingWhenClusterIsAddedAndRemoved(t *testing.T) { + db := dbmocks.ArgoDB{} + clusterCount := 133 + prefix := "cluster" + + clusters := []v1alpha1.Cluster{} + for i := 0; i < clusterCount; i++ { + id := fmt.Sprintf("%06d", i) + cluster := fmt.Sprintf("%s-%s", prefix, id) + clusters = append(clusters, createCluster(cluster, id)) + } + clusterAccessor := getClusterAccessor(clusters) + clusterList := &v1alpha1.ClusterList{Items: clusters} + db.On("ListClusters", mock.Anything).Return(clusterList, nil) + // Test with replicas set to 3 + replicasCount := 3 + db.On("GetApplicationControllerReplicas").Return(replicasCount) + distributionFunction := ConsistentHashingWithBoundedLoadsDistributionFunction(clusterAccessor, replicasCount) + assert.Equal(t, 0, distributionFunction(nil)) + distributionMap := map[int]int{} + assignementMap := map[string]int{} + for i := 0; i < clusterCount; i++ { + assignedShard := distributionFunction(&clusters[i]) + assignementMap[clusters[i].ID] = assignedShard + distributionMap[assignedShard]++ + + } + + // We check that the distribution does not differ for more than 20% + var sum float64 + sum = 0 + for shard, count := range distributionMap { + if shard != -1 { + sum = (sum + float64(count)) + } + } + average := sum / float64(replicasCount) + failedTests := false + for shard, count := range distributionMap { + if shard != -1 { + if float64(count) > average*float64(1.1) || float64(count) < average*float64(0.9) { + fmt.Printf("Cluster distribution differs for more than 20%%: %d for shard %d (average: %f)\n", count, shard, average) + failedTests = true + } + if failedTests { + t.Fail() + } + } + } + + // Now we will decrease the number of replicas to 2, and we should see only clusters that were attached to shard 2 to be reassigned + replicasCount = 2 + distributionFunction = ConsistentHashingWithBoundedLoadsDistributionFunction(getClusterAccessor(clusterList.Items), replicasCount) + removedCluster := clusterList.Items[len(clusterList.Items)-1] + for i := 0; i < clusterCount; i++ { + c := &clusters[i] + assignedShard := distributionFunction(c) + prevıouslyAssignedShard := assignementMap[clusters[i].ID] + if prevıouslyAssignedShard != 2 && prevıouslyAssignedShard != assignedShard { + fmt.Printf("Previously assigned %s cluster has moved from replica %d to %d", c.ID, prevıouslyAssignedShard, assignedShard) + t.Fail() + } + } + + // Now, we remove the last added cluster, it should be unassigned + removedCluster = clusterList.Items[len(clusterList.Items)-1] + clusterList.Items = clusterList.Items[:len(clusterList.Items)-1] + distributionFunction = ConsistentHashingWithBoundedLoadsDistributionFunction(getClusterAccessor(clusterList.Items), replicasCount) + assert.Equal(t, -1, distributionFunction(&removedCluster)) +} + +func TestConsistentHashingWhenClusterWithZeroReplicas(t *testing.T) { + db := dbmocks.ArgoDB{} + clusters := []v1alpha1.Cluster{createCluster("cluster-01", "01")} + clusterAccessor := getClusterAccessor(clusters) + clusterList := &v1alpha1.ClusterList{Items: clusters} + db.On("ListClusters", mock.Anything).Return(clusterList, nil) + + // Test with replicas set to 0 + replicasCount := 0 + db.On("GetApplicationControllerReplicas").Return(replicasCount) + distributionFunction := ConsistentHashingWithBoundedLoadsDistributionFunction(clusterAccessor, replicasCount) + assert.Equal(t, -1, distributionFunction(nil)) +} + +func TestConsistentHashingWhenClusterWithFixedShard(t *testing.T) { + db := dbmocks.ArgoDB{} + var fixedShard int64 = 1 + cluster := &v1alpha1.Cluster{ID: "1", Shard: &fixedShard} + clusters := []v1alpha1.Cluster{*cluster} + + clusterAccessor := getClusterAccessor(clusters) + clusterList := &v1alpha1.ClusterList{Items: clusters} + db.On("ListClusters", mock.Anything).Return(clusterList, nil) + + // Test with replicas set to 5 + replicasCount := 5 + db.On("GetApplicationControllerReplicas").Return(replicasCount) + distributionFunction := ConsistentHashingWithBoundedLoadsDistributionFunction(clusterAccessor, replicasCount) + assert.Equal(t, fixedShard, int64(distributionFunction(cluster))) + +} + func TestGetShardByIndexModuloReplicasCountDistributionFunction(t *testing.T) { clusters, db, cluster1, cluster2, _, _, _ := createTestClusters() replicasCount := 2 diff --git a/go.mod b/go.mod index c6e1bb004bf7c..ea33f1a522986 100644 --- a/go.mod +++ b/go.mod @@ -1,8 +1,6 @@ module github.com/argoproj/argo-cd/v2 -go 1.21 - -toolchain go1.21.0 +go 1.21.0 require ( code.gitea.io/sdk/gitea v0.18.0 @@ -42,6 +40,7 @@ require ( github.com/gogo/protobuf v1.3.2 github.com/golang-jwt/jwt/v4 v4.5.0 github.com/golang/protobuf v1.5.4 + github.com/google/btree v1.1.2 github.com/google/go-cmp v0.6.0 github.com/google/go-github/v35 v35.3.0 github.com/google/go-jsonnet v0.20.0 @@ -63,6 +62,7 @@ require ( github.com/mattn/go-isatty v0.0.19 github.com/mattn/go-zglob v0.0.4 github.com/microsoft/azure-devops-go-api/azuredevops v1.0.0-b5 + github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 github.com/olekukonko/tablewriter v0.0.5 github.com/patrickmn/go-cache v2.1.0+incompatible github.com/prometheus/client_golang v1.18.0 @@ -210,6 +210,7 @@ require ( github.com/golang/glog v1.1.2 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/google/btree v1.1.2 // indirect + github.com/google/gnostic v0.6.9 // indirect github.com/google/go-github/v41 v41.0.0 // indirect github.com/google/go-github/v53 v53.2.0 // indirect github.com/google/go-querystring v1.1.0 // indirect diff --git a/go.sum b/go.sum index c9209abedde49..f14ed4cdd8d58 100644 --- a/go.sum +++ b/go.sum @@ -1404,6 +1404,8 @@ github.com/microsoft/azure-devops-go-api/azuredevops v1.0.0-b5 h1:YH424zrwLTlyHS github.com/microsoft/azure-devops-go-api/azuredevops v1.0.0-b5/go.mod h1:PoGiBqKSQK1vIfQ+yVaFcGjDySHvym6FM1cNYnwzbrY= github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg= github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8/go.mod h1:mC1jAcsrzbxHt8iiaC+zU4b1ylILSosueou12R++wfY= +github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 h1:lYpkrQH5ajf0OXOcUbGjvZxxijuBwbbmlSxLiuofa+g= +github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1/go.mod h1:pD8RvIylQ358TN4wwqatJ8rNavkEINozVn9DtGI3dfQ= github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3/go.mod h1:RagcQ7I8IeTMnF8JTXieKnO4Z6JCsikNEzj0DwauVzE= github.com/minio/md5-simd v1.1.2/go.mod h1:MzdKDxYpY2BT9XQFocsiZf/NKVtR7nkE4RoEpN+20RM= github.com/minio/minio-go/v7 v7.0.58/go.mod h1:NUDy4A4oXPq1l2yK6LTSvCEzAMeIcoz9lcj5dbzSrRE= From 6c48de6195a8a5ab7370a77f187d93e59722c003 Mon Sep 17 00:00:00 2001 From: Akram Ben Aissi Date: Mon, 15 Apr 2024 20:04:20 +0200 Subject: [PATCH 2/4] Make the assignement consistent accross all clusters - The assignment or running of the algorithm has to be consistent across all the clusters. Changed the function to return a map where the consistent hash will be used to build the map - Modifications to the createConsistentHashsingWithBoundLoads function. This will create the map for cluster to shard. Note that the list must be consistent across all shards so that is why the cluster list must be sorted before going through the consistent hash algorithm Signed-off-by: Akram Ben Aissi --- controller/sharding/consistent/consistent.go | 47 ++++++---------- controller/sharding/sharding.go | 59 +++++++++++++++----- controller/sharding/sharding_test.go | 15 ++--- 3 files changed, 70 insertions(+), 51 deletions(-) diff --git a/controller/sharding/consistent/consistent.go b/controller/sharding/consistent/consistent.go index aad5b144155d2..ab29de814d8aa 100644 --- a/controller/sharding/consistent/consistent.go +++ b/controller/sharding/consistent/consistent.go @@ -116,49 +116,34 @@ func (c *Consistent) GetLeast(client string) (string, error) { if c.clients.Len() == 0 { return "", ErrNoHosts } - h := c.hash(client) - idx := c.search(h) - - i := idx for { - x := item{uint64(i)} - key := c.clients.Get(x) + var foundItem btree.Item + c.clients.AscendGreaterOrEqual(item{h}, func(bItem btree.Item) bool { + if h != bItem.(item).value { + foundItem = bItem + return false // stop the iteration + } + return true + }) + + if foundItem == nil { + // If no host found, wrap around to the first one. + foundItem = c.clients.Min() + } + key := c.clients.Get(foundItem) if key != nil { - host := c.servers[key.(*item).value] + host := c.servers[key.(item).value] if c.loadOK(host) { return host, nil } - i++ - if i >= c.clients.Len() { - i = 0 - } + h = key.(item).value } else { return client, nil } } } -func (c *Consistent) search(key uint64) int { - idx := 0 - found := false - - c.clients.Ascend(func(i btree.Item) bool { - if i.(item).value >= key { - found = true - return false // stop the iteration - } - idx++ - return true - }) - - if !found { - idx = 0 - } - - return idx -} - // Sets the load of `server` to the given `load` func (c *Consistent) UpdateLoad(server string, load int64) { c.Lock() diff --git a/controller/sharding/sharding.go b/controller/sharding/sharding.go index dc329df73dfc1..84f33b9ba1035 100644 --- a/controller/sharding/sharding.go +++ b/controller/sharding/sharding.go @@ -87,7 +87,7 @@ func GetDistributionFunction(clusters clusterAccessor, apps appAccessor, shardin case common.LegacyShardingAlgorithm: distributionFunction = LegacyDistributionFunction(replicasCount) case common.ConsistentHashingWithBoundedLoadsAlgorithm: - distributionFunction = ConsistentHashingWithBoundedLoadsDistributionFunction(clusters, replicasCount) + distributionFunction = ConsistentHashingWithBoundedLoadsDistributionFunction(clusters, apps, replicasCount) default: log.Warnf("distribution type %s is not supported, defaulting to %s", shardingAlgorithm, common.DefaultShardingAlgorithm) } @@ -166,7 +166,7 @@ func RoundRobinDistributionFunction(clusters clusterAccessor, replicas int) Dist // for a given cluster the function will return the shard number based on a consistent hashing with bounded loads algorithm. // This function ensures an almost homogenous distribution: each shards got assigned the fairly similar number of // clusters +/-10% , but with it is resilient to sharding and/or number of clusters changes. -func ConsistentHashingWithBoundedLoadsDistributionFunction(clusters clusterAccessor, replicas int) DistributionFunction { +func ConsistentHashingWithBoundedLoadsDistributionFunction(clusters clusterAccessor, apps appAccessor, replicas int) DistributionFunction { return func(c *v1alpha1.Cluster) int { if replicas > 0 { if c == nil { // in-cluster does not necessarly have a secret assigned. So we are receiving a nil cluster here. @@ -184,17 +184,12 @@ func ConsistentHashingWithBoundedLoadsDistributionFunction(clusters clusterAcces log.Warnf("Cluster with id=%s not found in cluster map.", c.ID) return -1 } - consistentHashing := createConsistentHashingWithBoundLoads(replicas) - clusterIndex, err := consistentHashing.Get(c.ID) - if err != nil { + shardIndexedByCluster := createConsistentHashingWithBoundLoads(replicas, clusters, apps) + shard, ok := shardIndexedByCluster[c.ID] + if !ok { log.Warnf("Cluster with id=%s not found in cluster map.", c.ID) return -1 } - shard, err := strconv.Atoi(clusterIndex) - if err != nil { - log.Errorf("Consistent Hashing was supposed to return a shard index but it returned %d", err) - return -1 - } log.Debugf("Cluster with id=%s will be processed by shard %d", c.ID, shard) return shard } @@ -204,15 +199,53 @@ func ConsistentHashingWithBoundedLoadsDistributionFunction(clusters clusterAcces } } -func createConsistentHashingWithBoundLoads(replicas int) *consistent.Consistent { +func createConsistentHashingWithBoundLoads(replicas int, getCluster clusterAccessor, getApp appAccessor) map[string]int { + clusters := getSortedClustersList(getCluster) + appDistribution := getAppDistribution(getCluster, getApp) + shardIndexedByCluster := make(map[string]int) + appsIndexedByShard := make(map[string]int64) consistentHashing := consistent.New() // Adding a shard with id "-1" as a reserved value for clusters that does not have an assigned shard // this happens for clusters that are removed for the clusters list //consistentHashing.Add("-1") for i := 0; i < replicas; i++ { - consistentHashing.Add(strconv.Itoa(i)) + shard := strconv.Itoa(i) + consistentHashing.Add(shard) + appsIndexedByShard[shard] = 0 + } + + for _, c := range clusters { + clusterIndex, err := consistentHashing.GetLeast(c.ID) + if err != nil { + log.Warnf("Cluster with id=%s not found in cluster map.", c.ID) + } + shardIndexedByCluster[c.ID], err = strconv.Atoi(clusterIndex) + if err != nil { + log.Errorf("Consistent Hashing was supposed to return a shard index but it returned %d", err) + } + numApps, ok := appDistribution[c.Server] + if !ok { + numApps = 0 + } + appsIndexedByShard[clusterIndex] += numApps + consistentHashing.UpdateLoad(clusterIndex, appsIndexedByShard[clusterIndex]) + } + + return shardIndexedByCluster +} + +func getAppDistribution(getCluster clusterAccessor, getApps appAccessor) map[string]int64 { + apps := getApps() + clusters := getCluster() + appDistribution := make(map[string]int64, len(clusters)) + + for _, a := range apps { + if _, ok := appDistribution[a.Spec.Destination.Server]; !ok { + appDistribution[a.Spec.Destination.Server] = 0 + } + appDistribution[a.Spec.Destination.Server]++ } - return consistentHashing + return appDistribution } // NoShardingDistributionFunction returns a DistributionFunction that will process all cluster by shard 0 diff --git a/controller/sharding/sharding_test.go b/controller/sharding/sharding_test.go index 5664ae4579535..c4eead67eed04 100644 --- a/controller/sharding/sharding_test.go +++ b/controller/sharding/sharding_test.go @@ -287,12 +287,13 @@ func TestConsistentHashingWhenClusterIsAddedAndRemoved(t *testing.T) { clusters = append(clusters, createCluster(cluster, id)) } clusterAccessor := getClusterAccessor(clusters) + appAccessor, _, _, _, _, _ := createTestApps() clusterList := &v1alpha1.ClusterList{Items: clusters} db.On("ListClusters", mock.Anything).Return(clusterList, nil) // Test with replicas set to 3 replicasCount := 3 db.On("GetApplicationControllerReplicas").Return(replicasCount) - distributionFunction := ConsistentHashingWithBoundedLoadsDistributionFunction(clusterAccessor, replicasCount) + distributionFunction := ConsistentHashingWithBoundedLoadsDistributionFunction(clusterAccessor, appAccessor, replicasCount) assert.Equal(t, 0, distributionFunction(nil)) distributionMap := map[int]int{} assignementMap := map[string]int{} @@ -327,7 +328,7 @@ func TestConsistentHashingWhenClusterIsAddedAndRemoved(t *testing.T) { // Now we will decrease the number of replicas to 2, and we should see only clusters that were attached to shard 2 to be reassigned replicasCount = 2 - distributionFunction = ConsistentHashingWithBoundedLoadsDistributionFunction(getClusterAccessor(clusterList.Items), replicasCount) + distributionFunction = ConsistentHashingWithBoundedLoadsDistributionFunction(getClusterAccessor(clusterList.Items), appAccessor, replicasCount) removedCluster := clusterList.Items[len(clusterList.Items)-1] for i := 0; i < clusterCount; i++ { c := &clusters[i] @@ -338,11 +339,10 @@ func TestConsistentHashingWhenClusterIsAddedAndRemoved(t *testing.T) { t.Fail() } } - // Now, we remove the last added cluster, it should be unassigned removedCluster = clusterList.Items[len(clusterList.Items)-1] clusterList.Items = clusterList.Items[:len(clusterList.Items)-1] - distributionFunction = ConsistentHashingWithBoundedLoadsDistributionFunction(getClusterAccessor(clusterList.Items), replicasCount) + distributionFunction = ConsistentHashingWithBoundedLoadsDistributionFunction(getClusterAccessor(clusterList.Items), appAccessor, replicasCount) assert.Equal(t, -1, distributionFunction(&removedCluster)) } @@ -352,11 +352,11 @@ func TestConsistentHashingWhenClusterWithZeroReplicas(t *testing.T) { clusterAccessor := getClusterAccessor(clusters) clusterList := &v1alpha1.ClusterList{Items: clusters} db.On("ListClusters", mock.Anything).Return(clusterList, nil) - + appAccessor, _, _, _, _, _ := createTestApps() // Test with replicas set to 0 replicasCount := 0 db.On("GetApplicationControllerReplicas").Return(replicasCount) - distributionFunction := ConsistentHashingWithBoundedLoadsDistributionFunction(clusterAccessor, replicasCount) + distributionFunction := ConsistentHashingWithBoundedLoadsDistributionFunction(clusterAccessor, appAccessor, replicasCount) assert.Equal(t, -1, distributionFunction(nil)) } @@ -373,7 +373,8 @@ func TestConsistentHashingWhenClusterWithFixedShard(t *testing.T) { // Test with replicas set to 5 replicasCount := 5 db.On("GetApplicationControllerReplicas").Return(replicasCount) - distributionFunction := ConsistentHashingWithBoundedLoadsDistributionFunction(clusterAccessor, replicasCount) + appAccessor, _, _, _, _, _ := createTestApps() + distributionFunction := ConsistentHashingWithBoundedLoadsDistributionFunction(clusterAccessor, appAccessor, replicasCount) assert.Equal(t, fixedShard, int64(distributionFunction(cluster))) } From 0cf03cda88a3780803105df41953f937defb0bea Mon Sep 17 00:00:00 2001 From: Akram Ben Aissi Date: Tue, 14 May 2024 09:39:16 +0100 Subject: [PATCH 3/4] Extracting constant and simplifying boolean expression Signed-off-by: Akram Ben Aissi --- common/common.go | 7 ++++--- controller/sharding/consistent/consistent.go | 13 +++++++------ go.mod | 2 -- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/common/common.go b/common/common.go index 9032f6859f2d0..4e68391e1c7ac 100644 --- a/common/common.go +++ b/common/common.go @@ -117,9 +117,10 @@ const ( RoundRobinShardingAlgorithm = "round-robin" // AppControllerHeartbeatUpdateRetryCount is the retry count for updating the Shard Mapping to the Shard Mapping ConfigMap used by Application Controller AppControllerHeartbeatUpdateRetryCount = 3 - // ConsistentHashingWithBoundedLoadsAlgorithm uses an algorithm that tries to use an equal distribution accross - // all shards but is optimised to handled sharding and/or cluster addings or removal. In case of sharding or - // cluster changes, this algorithm minimise the changes between shard and clusters assignments. + + // ConsistentHashingWithBoundedLoadsAlgorithm uses an algorithm that tries to use an equal distribution across + // all shards but is optimised to handle sharding and/or cluster addition or removal. In case of sharding or + // cluster changes, this algorithm minimises the changes between shard and clusters assignments. ConsistentHashingWithBoundedLoadsAlgorithm = "consistent-hashing" DefaultShardingAlgorithm = LegacyShardingAlgorithm diff --git a/controller/sharding/consistent/consistent.go b/controller/sharding/consistent/consistent.go index ab29de814d8aa..6d717b03917d1 100644 --- a/controller/sharding/consistent/consistent.go +++ b/controller/sharding/consistent/consistent.go @@ -19,6 +19,11 @@ import ( blake2b "github.com/minio/blake2b-simd" ) +// OptimalExtraCapacityFactor extra factor capacity (1 + ε). The ideal balance +// between keeping the shards uniform while also keeping consistency when +// changing shard numbers. +const OptimalExtraCapacityFactor = 1.25 + var ErrNoHosts = errors.New("no hosts added") type Host struct { @@ -234,7 +239,7 @@ func (c *Consistent) MaxLoad() int64 { if avgLoadPerNode == 0 { avgLoadPerNode = 1 } - avgLoadPerNode = math.Ceil(avgLoadPerNode * 1.25) + avgLoadPerNode = math.Ceil(avgLoadPerNode * OptimalExtraCapacityFactor) return int64(avgLoadPerNode) } @@ -256,11 +261,7 @@ func (c *Consistent) loadOK(server string) bool { panic(fmt.Sprintf("given host(%s) not in loadsMap", bserver.Name)) } - if float64(bserver.Load)+1 <= avgLoadPerNode { - return true - } - - return false + return float64(bserver.Load)+1 <= avgLoadPerNode } func (c *Consistent) delSlice(val uint64) { diff --git a/go.mod b/go.mod index ea33f1a522986..ad9422c347f4b 100644 --- a/go.mod +++ b/go.mod @@ -209,8 +209,6 @@ require ( github.com/go-telegram-bot-api/telegram-bot-api/v5 v5.5.1 // indirect github.com/golang/glog v1.1.2 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect - github.com/google/btree v1.1.2 // indirect - github.com/google/gnostic v0.6.9 // indirect github.com/google/go-github/v41 v41.0.0 // indirect github.com/google/go-github/v53 v53.2.0 // indirect github.com/google/go-querystring v1.1.0 // indirect From b349f81620faed737a05841057ce56cb9d4a2021 Mon Sep 17 00:00:00 2001 From: Akram Ben Aissi Date: Thu, 30 May 2024 16:09:13 +0100 Subject: [PATCH 4/4] Update docs: consistent-hashing sharding algorithm Signed-off-by: Akram Ben Aissi --- .../commands/argocd_application_controller.go | 2 +- cmd/argocd/commands/admin/cluster.go | 4 ++-- controller/sharding/sharding.go | 4 ++-- docs/operator-manual/high_availability.md | 12 +++++++++--- .../server-commands/argocd-application-controller.md | 2 +- .../commands/argocd_admin_cluster_shards.md | 2 +- .../commands/argocd_admin_cluster_stats.md | 2 +- 7 files changed, 17 insertions(+), 11 deletions(-) diff --git a/cmd/argocd-application-controller/commands/argocd_application_controller.go b/cmd/argocd-application-controller/commands/argocd_application_controller.go index a7c7f92fab2a6..86c5721caea6c 100644 --- a/cmd/argocd-application-controller/commands/argocd_application_controller.go +++ b/cmd/argocd-application-controller/commands/argocd_application_controller.go @@ -220,7 +220,7 @@ func NewCommand() *cobra.Command { command.Flags().StringSliceVar(&otlpAttrs, "otlp-attrs", env.StringsFromEnv("ARGOCD_APPLICATION_CONTROLLER_OTLP_ATTRS", []string{}, ","), "List of OpenTelemetry collector extra attrs when send traces, each attribute is separated by a colon(e.g. key:value)") command.Flags().StringSliceVar(&applicationNamespaces, "application-namespaces", env.StringsFromEnv("ARGOCD_APPLICATION_NAMESPACES", []string{}, ","), "List of additional namespaces that applications are allowed to be reconciled from") command.Flags().BoolVar(&persistResourceHealth, "persist-resource-health", env.ParseBoolFromEnv("ARGOCD_APPLICATION_CONTROLLER_PERSIST_RESOURCE_HEALTH", true), "Enables storing the managed resources health in the Application CRD") - command.Flags().StringVar(&shardingAlgorithm, "sharding-method", env.StringFromEnv(common.EnvControllerShardingAlgorithm, common.DefaultShardingAlgorithm), "Enables choice of sharding method. Supported sharding methods are : [legacy, round-robin] ") + command.Flags().StringVar(&shardingAlgorithm, "sharding-method", env.StringFromEnv(common.EnvControllerShardingAlgorithm, common.DefaultShardingAlgorithm), "Enables choice of sharding method. Supported sharding methods are : [legacy, round-robin, consistent-hashing] ") // global queue rate limit config command.Flags().Int64Var(&workqueueRateLimit.BucketSize, "wq-bucket-size", env.ParseInt64FromEnv("WORKQUEUE_BUCKET_SIZE", 500, 1, math.MaxInt64), "Set Workqueue Rate Limiter Bucket Size, default 500") command.Flags().Float64Var(&workqueueRateLimit.BucketQPS, "wq-bucket-qps", env.ParseFloat64FromEnv("WORKQUEUE_BUCKET_QPS", math.MaxFloat64, 1, math.MaxFloat64), "Set Workqueue Rate Limiter Bucket QPS, default set to MaxFloat64 which disables the bucket limiter") diff --git a/cmd/argocd/commands/admin/cluster.go b/cmd/argocd/commands/admin/cluster.go index 9d70ac3f8c778..8c17c8b7bef7d 100644 --- a/cmd/argocd/commands/admin/cluster.go +++ b/cmd/argocd/commands/admin/cluster.go @@ -219,7 +219,7 @@ func NewClusterShardsCommand(clientOpts *argocdclient.ClientOptions) *cobra.Comm clientConfig = cli.AddKubectlFlagsToCmd(&command) command.Flags().IntVar(&shard, "shard", -1, "Cluster shard filter") command.Flags().IntVar(&replicas, "replicas", 0, "Application controller replicas count. Inferred from number of running controller pods if not specified") - command.Flags().StringVar(&shardingAlgorithm, "sharding-method", common.DefaultShardingAlgorithm, "Sharding method. Defaults: legacy. Supported sharding methods are : [legacy, round-robin] ") + command.Flags().StringVar(&shardingAlgorithm, "sharding-method", common.DefaultShardingAlgorithm, "Sharding method. Defaults: legacy. Supported sharding methods are : [legacy, round-robin, consistent-hashing] ") command.Flags().BoolVar(&portForwardRedis, "port-forward-redis", true, "Automatically port-forward ha proxy redis from current namespace?") cacheSrc = appstatecache.AddCacheFlagsToCmd(&command) @@ -514,7 +514,7 @@ argocd admin cluster stats target-cluster`, clientConfig = cli.AddKubectlFlagsToCmd(&command) command.Flags().IntVar(&shard, "shard", -1, "Cluster shard filter") command.Flags().IntVar(&replicas, "replicas", 0, "Application controller replicas count. Inferred from number of running controller pods if not specified") - command.Flags().StringVar(&shardingAlgorithm, "sharding-method", common.DefaultShardingAlgorithm, "Sharding method. Defaults: legacy. Supported sharding methods are : [legacy, round-robin] ") + command.Flags().StringVar(&shardingAlgorithm, "sharding-method", common.DefaultShardingAlgorithm, "Sharding method. Defaults: legacy. Supported sharding methods are : [legacy, round-robin, consistent-hashing] ") command.Flags().BoolVar(&portForwardRedis, "port-forward-redis", true, "Automatically port-forward ha proxy redis from current namespace?") cacheSrc = appstatecache.AddCacheFlagsToCmd(&command) diff --git a/controller/sharding/sharding.go b/controller/sharding/sharding.go index 84f33b9ba1035..fc09c27cc4107 100644 --- a/controller/sharding/sharding.go +++ b/controller/sharding/sharding.go @@ -138,7 +138,7 @@ func LegacyDistributionFunction(replicas int) DistributionFunction { func RoundRobinDistributionFunction(clusters clusterAccessor, replicas int) DistributionFunction { return func(c *v1alpha1.Cluster) int { if replicas > 0 { - if c == nil { // in-cluster does not necessary have a secret assigned. So we are receiving a nil cluster here. + if c == nil { // in-cluster does not necessarily have a secret assigned. So we are receiving a nil cluster here. return 0 } // if Shard is manually set and the assigned value is lower than the number of replicas, @@ -169,7 +169,7 @@ func RoundRobinDistributionFunction(clusters clusterAccessor, replicas int) Dist func ConsistentHashingWithBoundedLoadsDistributionFunction(clusters clusterAccessor, apps appAccessor, replicas int) DistributionFunction { return func(c *v1alpha1.Cluster) int { if replicas > 0 { - if c == nil { // in-cluster does not necessarly have a secret assigned. So we are receiving a nil cluster here. + if c == nil { // in-cluster does not necessarily have a secret assigned. So we are receiving a nil cluster here. return 0 } diff --git a/docs/operator-manual/high_availability.md b/docs/operator-manual/high_availability.md index 60ea048ffca68..632ac2fb1286b 100644 --- a/docs/operator-manual/high_availability.md +++ b/docs/operator-manual/high_availability.md @@ -82,10 +82,16 @@ spec: ``` * In order to manually set the cluster's shard number, specify the optional `shard` property when creating a cluster. If not specified, it will be calculated on the fly by the application controller. -* The shard distribution algorithm of the `argocd-application-controller` can be set by using the `--sharding-method` parameter. Supported sharding methods are : [legacy (default), round-robin]. `legacy` mode uses an `uid` based distribution (non-uniform). `round-robin` uses an equal distribution across all shards. The `--sharding-method` parameter can also be overridden by setting the key `controller.sharding.algorithm` in the `argocd-cmd-params-cm` `configMap` (preferably) or by setting the `ARGOCD_CONTROLLER_SHARDING_ALGORITHM` environment variable and by specifiying the same possible values. +* The shard distribution algorithm of the `argocd-application-controller` can be set by using the `--sharding-method` parameter. Supported sharding methods are : [legacy (default), round-robin, consistent-hashing]: +- `legacy` mode uses an `uid` based distribution (non-uniform). +- `round-robin` uses an equal distribution across all shards. +- `consistent-hashing` uses the consistent hashing with bounded loads algorithm which tends to equal distribution and also reduces cluster or application reshuffling in case of additions or removals of shards or clusters. -!!! warning "Alpha Feature" - The `round-robin` shard distribution algorithm is an experimental feature. Reshuffling is known to occur in certain scenarios with cluster removal. If the cluster at rank-0 is removed, reshuffling all clusters across shards will occur and may temporarily have negative performance impacts. +The `--sharding-method` parameter can also be overridden by setting the key `controller.sharding.algorithm` in the `argocd-cmd-params-cm` `configMap` (preferably) or by setting the `ARGOCD_CONTROLLER_SHARDING_ALGORITHM` environment variable and by specifiying the same possible values. + +!!! warning "Alpha Features" + The `round-robin` shard distribution algorithm is an experimental feature. Reshuffling is known to occur in certain scenarios with cluster removal. If the cluster at rank-0 is removed, reshuffling all clusters across shards will occur and may temporarily have negative performance impacts. + The `consistent-hashing` shard distribution algorithm is an experimental feature. Extensive benchmark have been documented on the [CNOE blog](https://cnoe.io/blog/argo-cd-application-scalability) with encouraging results. Community feedback is highly appreciated before moving this feature to a production ready state. * A cluster can be manually assigned and forced to a `shard` by patching the `shard` field in the cluster secret to contain the shard number, e.g. ```yaml diff --git a/docs/operator-manual/server-commands/argocd-application-controller.md b/docs/operator-manual/server-commands/argocd-application-controller.md index caab2770e07aa..930dfa414751c 100644 --- a/docs/operator-manual/server-commands/argocd-application-controller.md +++ b/docs/operator-manual/server-commands/argocd-application-controller.md @@ -70,7 +70,7 @@ argocd-application-controller [flags] --sentinelmaster string Redis sentinel master group name. (default "master") --server string The address and port of the Kubernetes API server --server-side-diff-enabled Feature flag to enable ServerSide diff. Default ("false") - --sharding-method string Enables choice of sharding method. Supported sharding methods are : [legacy, round-robin] (default "legacy") + --sharding-method string Enables choice of sharding method. Supported sharding methods are : [legacy, round-robin, consistent-hashing] (default "legacy") --status-processors int Number of application status processors (default 20) --tls-server-name string If provided, this name will be used to validate server certificate. If this is not provided, hostname used to contact the server is used. --token string Bearer token for authentication to the API server diff --git a/docs/user-guide/commands/argocd_admin_cluster_shards.md b/docs/user-guide/commands/argocd_admin_cluster_shards.md index 48f6138d47b4a..44efa4392b9ac 100644 --- a/docs/user-guide/commands/argocd_admin_cluster_shards.md +++ b/docs/user-guide/commands/argocd_admin_cluster_shards.md @@ -43,7 +43,7 @@ argocd admin cluster shards [flags] --sentinelmaster string Redis sentinel master group name. (default "master") --server string The address and port of the Kubernetes API server --shard int Cluster shard filter (default -1) - --sharding-method string Sharding method. Defaults: legacy. Supported sharding methods are : [legacy, round-robin] (default "legacy") + --sharding-method string Sharding method. Defaults: legacy. Supported sharding methods are : [legacy, round-robin, consistent-hashing] (default "legacy") --tls-server-name string If provided, this name will be used to validate server certificate. If this is not provided, hostname used to contact the server is used. --token string Bearer token for authentication to the API server --user string The name of the kubeconfig user to use diff --git a/docs/user-guide/commands/argocd_admin_cluster_stats.md b/docs/user-guide/commands/argocd_admin_cluster_stats.md index c5297ce7e35ed..18aa583f01305 100644 --- a/docs/user-guide/commands/argocd_admin_cluster_stats.md +++ b/docs/user-guide/commands/argocd_admin_cluster_stats.md @@ -57,7 +57,7 @@ argocd admin cluster stats target-cluster --sentinelmaster string Redis sentinel master group name. (default "master") --server string The address and port of the Kubernetes API server --shard int Cluster shard filter (default -1) - --sharding-method string Sharding method. Defaults: legacy. Supported sharding methods are : [legacy, round-robin] (default "legacy") + --sharding-method string Sharding method. Defaults: legacy. Supported sharding methods are : [legacy, round-robin, consistent-hashing] (default "legacy") --tls-server-name string If provided, this name will be used to validate server certificate. If this is not provided, hostname used to contact the server is used. --token string Bearer token for authentication to the API server --user string The name of the kubeconfig user to use