From 08e729870812f77405040c6aa306708308474d33 Mon Sep 17 00:00:00 2001 From: Akram Ben Aissi Date: Thu, 7 Dec 2023 01:07:58 +0100 Subject: [PATCH] Adds consistent hashing with bound loads sharding algorithm Signed-off-by: Akram Ben Aissi --- common/common.go | 7 +- controller/sharding/consistent/consistent.go | 288 +++++++++++++++++++ controller/sharding/sharding.go | 57 ++++ controller/sharding/sharding_test.go | 103 +++++++ go.mod | 7 +- go.sum | 2 + 6 files changed, 459 insertions(+), 5 deletions(-) create mode 100644 controller/sharding/consistent/consistent.go diff --git a/common/common.go b/common/common.go index 2f053d7a28198..782b40d2527db 100644 --- a/common/common.go +++ b/common/common.go @@ -117,7 +117,12 @@ const ( RoundRobinShardingAlgorithm = "round-robin" // AppControllerHeartbeatUpdateRetryCount is the retry count for updating the Shard Mapping to the Shard Mapping ConfigMap used by Application Controller AppControllerHeartbeatUpdateRetryCount = 3 - DefaultShardingAlgorithm = LegacyShardingAlgorithm + // ConsistentHashingWithBoundedLoadsAlgorithm uses an algorithm that tries to use an equal distribution accross + // all shards but is optimised to handled sharding and/or cluster addings or removal. In case of sharding or + // cluster changes, this algorithm minimise the changes between shard and clusters assignments. + ConsistentHashingWithBoundedLoadsAlgorithm = "consistent-hashing" + + DefaultShardingAlgorithm = LegacyShardingAlgorithm ) // Dex related constants diff --git a/controller/sharding/consistent/consistent.go b/controller/sharding/consistent/consistent.go new file mode 100644 index 0000000000000..aad5b144155d2 --- /dev/null +++ b/controller/sharding/consistent/consistent.go @@ -0,0 +1,288 @@ +// An implementation of Consistent Hashing and +// Consistent Hashing With Bounded Loads. +// +// https://en.wikipedia.org/wiki/Consistent_hashing +// +// https://research.googleblog.com/2017/04/consistent-hashing-with-bounded-loads.html +package consistent + +import ( + "encoding/binary" + "errors" + "fmt" + "math" + "sync" + "sync/atomic" + + "github.com/google/btree" + + blake2b "github.com/minio/blake2b-simd" +) + +var ErrNoHosts = errors.New("no hosts added") + +type Host struct { + Name string + Load int64 +} + +type Consistent struct { + servers map[uint64]string + clients *btree.BTree + loadMap map[string]*Host + totalLoad int64 + replicationFactor int + + sync.RWMutex +} + +type item struct { + value uint64 +} + +func (i item) Less(than btree.Item) bool { + return i.value < than.(item).value +} + +func New() *Consistent { + return &Consistent{ + servers: map[uint64]string{}, + clients: btree.New(2), + loadMap: map[string]*Host{}, + replicationFactor: 1000, + } +} + +func NewWithReplicationFactor(replicationFactor int) *Consistent { + return &Consistent{ + servers: map[uint64]string{}, + clients: btree.New(2), + loadMap: map[string]*Host{}, + replicationFactor: replicationFactor, + } +} +func (c *Consistent) Add(server string) { + c.Lock() + defer c.Unlock() + + if _, ok := c.loadMap[server]; ok { + return + } + + c.loadMap[server] = &Host{Name: server, Load: 0} + for i := 0; i < c.replicationFactor; i++ { + h := c.hash(fmt.Sprintf("%s%d", server, i)) + c.servers[h] = server + c.clients.ReplaceOrInsert(item{h}) + } +} + +// Get returns the server that owns the given client. +// As described in https://en.wikipedia.org/wiki/Consistent_hashing +// It returns ErrNoHosts if the ring has no servers in it. +func (c *Consistent) Get(client string) (string, error) { + c.RLock() + defer c.RUnlock() + + if c.clients.Len() == 0 { + return "", ErrNoHosts + } + + h := c.hash(client) + var foundItem btree.Item + c.clients.AscendGreaterOrEqual(item{h}, func(i btree.Item) bool { + foundItem = i + return false // stop the iteration + }) + + if foundItem == nil { + // If no host found, wrap around to the first one. + foundItem = c.clients.Min() + } + + host := c.servers[foundItem.(item).value] + + return host, nil +} + +// GetLeast returns the least loaded host that can serve the key. +// It uses Consistent Hashing With Bounded loads. +// https://research.googleblog.com/2017/04/consistent-hashing-with-bounded-loads.html +// It returns ErrNoHosts if the ring has no hosts in it. +func (c *Consistent) GetLeast(client string) (string, error) { + c.RLock() + defer c.RUnlock() + + if c.clients.Len() == 0 { + return "", ErrNoHosts + } + + h := c.hash(client) + idx := c.search(h) + + i := idx + for { + x := item{uint64(i)} + key := c.clients.Get(x) + if key != nil { + host := c.servers[key.(*item).value] + if c.loadOK(host) { + return host, nil + } + i++ + if i >= c.clients.Len() { + i = 0 + } + } else { + return client, nil + } + } +} + +func (c *Consistent) search(key uint64) int { + idx := 0 + found := false + + c.clients.Ascend(func(i btree.Item) bool { + if i.(item).value >= key { + found = true + return false // stop the iteration + } + idx++ + return true + }) + + if !found { + idx = 0 + } + + return idx +} + +// Sets the load of `server` to the given `load` +func (c *Consistent) UpdateLoad(server string, load int64) { + c.Lock() + defer c.Unlock() + + if _, ok := c.loadMap[server]; !ok { + return + } + c.totalLoad -= c.loadMap[server].Load + c.loadMap[server].Load = load + c.totalLoad += load +} + +// Increments the load of host by 1 +// +// should only be used with if you obtained a host with GetLeast +func (c *Consistent) Inc(server string) { + c.Lock() + defer c.Unlock() + + if _, ok := c.loadMap[server]; !ok { + return + } + atomic.AddInt64(&c.loadMap[server].Load, 1) + atomic.AddInt64(&c.totalLoad, 1) +} + +// Decrements the load of host by 1 +// +// should only be used with if you obtained a host with GetLeast +func (c *Consistent) Done(server string) { + c.Lock() + defer c.Unlock() + + if _, ok := c.loadMap[server]; !ok { + return + } + atomic.AddInt64(&c.loadMap[server].Load, -1) + atomic.AddInt64(&c.totalLoad, -1) +} + +// Deletes host from the ring +func (c *Consistent) Remove(server string) bool { + c.Lock() + defer c.Unlock() + + for i := 0; i < c.replicationFactor; i++ { + h := c.hash(fmt.Sprintf("%s%d", server, i)) + delete(c.servers, h) + c.delSlice(h) + } + delete(c.loadMap, server) + return true +} + +// Return the list of servers in the ring +func (c *Consistent) Servers() (servers []string) { + c.RLock() + defer c.RUnlock() + for k := range c.loadMap { + servers = append(servers, k) + } + return servers +} + +// Returns the loads of all the hosts +func (c *Consistent) GetLoads() map[string]int64 { + loads := map[string]int64{} + + for k, v := range c.loadMap { + loads[k] = v.Load + } + return loads +} + +// Returns the maximum load of the single host +// which is: +// (total_load/number_of_hosts)*1.25 +// total_load = is the total number of active requests served by hosts +// for more info: +// https://research.googleblog.com/2017/04/consistent-hashing-with-bounded-loads.html +func (c *Consistent) MaxLoad() int64 { + if c.totalLoad == 0 { + c.totalLoad = 1 + } + var avgLoadPerNode float64 + avgLoadPerNode = float64(c.totalLoad / int64(len(c.loadMap))) + if avgLoadPerNode == 0 { + avgLoadPerNode = 1 + } + avgLoadPerNode = math.Ceil(avgLoadPerNode * 1.25) + return int64(avgLoadPerNode) +} + +func (c *Consistent) loadOK(server string) bool { + // a safety check if someone performed c.Done more than needed + if c.totalLoad < 0 { + c.totalLoad = 0 + } + + var avgLoadPerNode float64 + avgLoadPerNode = float64((c.totalLoad + 1) / int64(len(c.loadMap))) + if avgLoadPerNode == 0 { + avgLoadPerNode = 1 + } + avgLoadPerNode = math.Ceil(avgLoadPerNode * 1.25) + + bserver, ok := c.loadMap[server] + if !ok { + panic(fmt.Sprintf("given host(%s) not in loadsMap", bserver.Name)) + } + + if float64(bserver.Load)+1 <= avgLoadPerNode { + return true + } + + return false +} + +func (c *Consistent) delSlice(val uint64) { + c.clients.Delete(item{val}) +} + +func (c *Consistent) hash(key string) uint64 { + out := blake2b.Sum512([]byte(key)) + return binary.LittleEndian.Uint64(out[:]) +} diff --git a/controller/sharding/sharding.go b/controller/sharding/sharding.go index 2b86ed3f82bc6..eabbf913012f7 100644 --- a/controller/sharding/sharding.go +++ b/controller/sharding/sharding.go @@ -13,7 +13,9 @@ import ( "encoding/json" "github.com/argoproj/argo-cd/v2/common" + "github.com/argoproj/argo-cd/v2/controller/sharding/consistent" "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1" + slices "golang.org/x/exp/slices" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" @@ -81,6 +83,8 @@ func GetDistributionFunction(clusters clusterAccessor, shardingAlgorithm string, distributionFunction = RoundRobinDistributionFunction(clusters, replicasCount) case common.LegacyShardingAlgorithm: distributionFunction = LegacyDistributionFunction(replicasCount) + case common.ConsistentHashingWithBoundedLoadsAlgorithm: + distributionFunction = ConsistentHashingWithBoundedLoadsDistributionFunction(clusters, replicasCount) default: log.Warnf("distribution type %s is not supported, defaulting to %s", shardingAlgorithm, common.DefaultShardingAlgorithm) } @@ -155,6 +159,59 @@ func RoundRobinDistributionFunction(clusters clusterAccessor, replicas int) Dist } } +// ConsistentHashingWithBoundedLoadsDistributionFunction returns a DistributionFunction using an almost homogeneous distribution algorithm: +// for a given cluster the function will return the shard number based on a consistent hashing with bounded loads algorithm. +// This function ensures an almost homogenous distribution: each shards got assigned the fairly similar number of +// clusters +/-10% , but with it is resilient to sharding and/or number of clusters changes. +func ConsistentHashingWithBoundedLoadsDistributionFunction(clusters clusterAccessor, replicas int) DistributionFunction { + return func(c *v1alpha1.Cluster) int { + if replicas > 0 { + if c == nil { // in-cluster does not necessarly have a secret assigned. So we are receiving a nil cluster here. + return 0 + } + + // if Shard is manually set and the assigned value is lower than the number of replicas, + // then its value is returned otherwise it is the default calculated value + if c.Shard != nil && int(*c.Shard) < replicas { + return int(*c.Shard) + } else { + // if the cluster is not in the clusters list anymore, we should unassign it from any shard, so we + // return the reserved value of -1 + if !slices.Contains(clusters(), c) { + log.Warnf("Cluster with id=%s not found in cluster map.", c.ID) + return -1 + } + consistentHashing := createConsistentHashingWithBoundLoads(replicas) + clusterIndex, err := consistentHashing.Get(c.ID) + if err != nil { + log.Warnf("Cluster with id=%s not found in cluster map.", c.ID) + return -1 + } + shard, err := strconv.Atoi(clusterIndex) + if err != nil { + log.Errorf("Consistent Hashing was supposed to return a shard index but it returned %d", err) + return -1 + } + log.Debugf("Cluster with id=%s will be processed by shard %d", c.ID, shard) + return shard + } + } + log.Warnf("The number of replicas (%d) is lower than 1", replicas) + return -1 + } +} + +func createConsistentHashingWithBoundLoads(replicas int) *consistent.Consistent { + consistentHashing := consistent.New() + // Adding a shard with id "-1" as a reserved value for clusters that does not have an assigned shard + // this happens for clusters that are removed for the clusters list + //consistentHashing.Add("-1") + for i := 0; i < replicas; i++ { + consistentHashing.Add(strconv.Itoa(i)) + } + return consistentHashing +} + // NoShardingDistributionFunction returns a DistributionFunction that will process all cluster by shard 0 // the function is created for API compatibility purposes and is not supposed to be activated. func NoShardingDistributionFunction() DistributionFunction { diff --git a/controller/sharding/sharding_test.go b/controller/sharding/sharding_test.go index 0992f7a9dfd7f..6930848fd15fd 100644 --- a/controller/sharding/sharding_test.go +++ b/controller/sharding/sharding_test.go @@ -266,6 +266,109 @@ func TestGetShardByIndexModuloReplicasCountDistributionFunctionWhenClusterIsAdde assert.Equal(t, -1, distributionFunction(&cluster6)) } +func TestConsistentHashingWhenClusterIsAddedAndRemoved(t *testing.T) { + db := dbmocks.ArgoDB{} + clusterCount := 133 + prefix := "cluster" + + clusters := []v1alpha1.Cluster{} + for i := 0; i < clusterCount; i++ { + id := fmt.Sprintf("%06d", i) + cluster := fmt.Sprintf("%s-%s", prefix, id) + clusters = append(clusters, createCluster(cluster, id)) + } + clusterAccessor := getClusterAccessor(clusters) + clusterList := &v1alpha1.ClusterList{Items: clusters} + db.On("ListClusters", mock.Anything).Return(clusterList, nil) + // Test with replicas set to 3 + replicasCount := 3 + db.On("GetApplicationControllerReplicas").Return(replicasCount) + distributionFunction := ConsistentHashingWithBoundedLoadsDistributionFunction(clusterAccessor, replicasCount) + assert.Equal(t, 0, distributionFunction(nil)) + distributionMap := map[int]int{} + assignementMap := map[string]int{} + for i := 0; i < clusterCount; i++ { + assignedShard := distributionFunction(&clusters[i]) + assignementMap[clusters[i].ID] = assignedShard + distributionMap[assignedShard]++ + + } + + // We check that the distribution does not differ for more than 20% + var sum float64 + sum = 0 + for shard, count := range distributionMap { + if shard != -1 { + sum = (sum + float64(count)) + } + } + average := sum / float64(replicasCount) + failedTests := false + for shard, count := range distributionMap { + if shard != -1 { + if float64(count) > average*float64(1.1) || float64(count) < average*float64(0.9) { + fmt.Printf("Cluster distribution differs for more than 20%%: %d for shard %d (average: %f)\n", count, shard, average) + failedTests = true + } + if failedTests { + t.Fail() + } + } + } + + // Now we will decrease the number of replicas to 2, and we should see only clusters that were attached to shard 2 to be reassigned + replicasCount = 2 + distributionFunction = ConsistentHashingWithBoundedLoadsDistributionFunction(getClusterAccessor(clusterList.Items), replicasCount) + removedCluster := clusterList.Items[len(clusterList.Items)-1] + for i := 0; i < clusterCount; i++ { + c := &clusters[i] + assignedShard := distributionFunction(c) + prevıouslyAssignedShard := assignementMap[clusters[i].ID] + if prevıouslyAssignedShard != 2 && prevıouslyAssignedShard != assignedShard { + fmt.Printf("Previously assigned %s cluster has moved from replica %d to %d", c.ID, prevıouslyAssignedShard, assignedShard) + t.Fail() + } + } + + // Now, we remove the last added cluster, it should be unassigned + removedCluster = clusterList.Items[len(clusterList.Items)-1] + clusterList.Items = clusterList.Items[:len(clusterList.Items)-1] + distributionFunction = ConsistentHashingWithBoundedLoadsDistributionFunction(getClusterAccessor(clusterList.Items), replicasCount) + assert.Equal(t, -1, distributionFunction(&removedCluster)) +} + +func TestConsistentHashingWhenClusterWithZeroReplicas(t *testing.T) { + db := dbmocks.ArgoDB{} + clusters := []v1alpha1.Cluster{createCluster("cluster-01", "01")} + clusterAccessor := getClusterAccessor(clusters) + clusterList := &v1alpha1.ClusterList{Items: clusters} + db.On("ListClusters", mock.Anything).Return(clusterList, nil) + + // Test with replicas set to 0 + replicasCount := 0 + db.On("GetApplicationControllerReplicas").Return(replicasCount) + distributionFunction := ConsistentHashingWithBoundedLoadsDistributionFunction(clusterAccessor, replicasCount) + assert.Equal(t, -1, distributionFunction(nil)) +} + +func TestConsistentHashingWhenClusterWithFixedShard(t *testing.T) { + db := dbmocks.ArgoDB{} + var fixedShard int64 = 1 + cluster := &v1alpha1.Cluster{ID: "1", Shard: &fixedShard} + clusters := []v1alpha1.Cluster{*cluster} + + clusterAccessor := getClusterAccessor(clusters) + clusterList := &v1alpha1.ClusterList{Items: clusters} + db.On("ListClusters", mock.Anything).Return(clusterList, nil) + + // Test with replicas set to 5 + replicasCount := 5 + db.On("GetApplicationControllerReplicas").Return(replicasCount) + distributionFunction := ConsistentHashingWithBoundedLoadsDistributionFunction(clusterAccessor, replicasCount) + assert.Equal(t, fixedShard, int64(distributionFunction(cluster))) + +} + func TestGetShardByIndexModuloReplicasCountDistributionFunction(t *testing.T) { clusters, db, cluster1, cluster2, _, _, _ := createTestClusters() replicasCount := 2 diff --git a/go.mod b/go.mod index 07dd99e4beff1..514e94af0e2ec 100644 --- a/go.mod +++ b/go.mod @@ -1,8 +1,6 @@ module github.com/argoproj/argo-cd/v2 -go 1.21 - -toolchain go1.21.0 +go 1.21.0 require ( code.gitea.io/sdk/gitea v0.15.1 @@ -39,6 +37,7 @@ require ( github.com/gogo/protobuf v1.3.2 github.com/golang-jwt/jwt/v4 v4.5.0 github.com/golang/protobuf v1.5.3 + github.com/google/btree v1.1.2 github.com/google/go-cmp v0.6.0 github.com/google/go-github/v35 v35.3.0 github.com/google/go-jsonnet v0.20.0 @@ -60,6 +59,7 @@ require ( github.com/mattn/go-isatty v0.0.19 github.com/mattn/go-zglob v0.0.4 github.com/microsoft/azure-devops-go-api/azuredevops v1.0.0-b5 + github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 github.com/olekukonko/tablewriter v0.0.5 github.com/patrickmn/go-cache v2.1.0+incompatible github.com/prometheus/client_golang v1.16.0 @@ -197,7 +197,6 @@ require ( github.com/go-telegram-bot-api/telegram-bot-api/v5 v5.5.1 // indirect github.com/golang/glog v1.1.2 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect - github.com/google/btree v1.1.2 // indirect github.com/google/gnostic v0.6.9 // indirect github.com/google/go-github/v41 v41.0.0 // indirect github.com/google/go-github/v53 v53.2.0 // indirect diff --git a/go.sum b/go.sum index 0c5e889f6bdf6..41ccb1de5bfb0 100644 --- a/go.sum +++ b/go.sum @@ -1399,6 +1399,8 @@ github.com/microsoft/azure-devops-go-api/azuredevops v1.0.0-b5 h1:YH424zrwLTlyHS github.com/microsoft/azure-devops-go-api/azuredevops v1.0.0-b5/go.mod h1:PoGiBqKSQK1vIfQ+yVaFcGjDySHvym6FM1cNYnwzbrY= github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg= github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8/go.mod h1:mC1jAcsrzbxHt8iiaC+zU4b1ylILSosueou12R++wfY= +github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 h1:lYpkrQH5ajf0OXOcUbGjvZxxijuBwbbmlSxLiuofa+g= +github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1/go.mod h1:pD8RvIylQ358TN4wwqatJ8rNavkEINozVn9DtGI3dfQ= github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3/go.mod h1:RagcQ7I8IeTMnF8JTXieKnO4Z6JCsikNEzj0DwauVzE= github.com/minio/md5-simd v1.1.2/go.mod h1:MzdKDxYpY2BT9XQFocsiZf/NKVtR7nkE4RoEpN+20RM= github.com/minio/minio-go/v7 v7.0.58/go.mod h1:NUDy4A4oXPq1l2yK6LTSvCEzAMeIcoz9lcj5dbzSrRE=