From 629cf2a99e7bd42e751c6597c8bc758977095143 Mon Sep 17 00:00:00 2001 From: Akram Ben Aissi Date: Thu, 7 Dec 2023 01:07:58 +0100 Subject: [PATCH] Adds consistent hashing with bound loads sharding algorithm Signed-off-by: Akram Ben Aissi --- common/common.go | 7 +- controller/sharding/sharding.go | 57 +++++++++++++++ controller/sharding/sharding_test.go | 103 +++++++++++++++++++++++++++ go.mod | 7 +- go.sum | 6 ++ 5 files changed, 176 insertions(+), 4 deletions(-) diff --git a/common/common.go b/common/common.go index 7375a468fb3c3..ae384a57f8c6a 100644 --- a/common/common.go +++ b/common/common.go @@ -118,7 +118,12 @@ const ( // AppControllerHeartbeatUpdateRetryCount is the retry count for updating the Shard Mapping to the Shard Mapping ConfigMap used by Application Controller AppControllerHeartbeatUpdateRetryCount = 3 // NoShardingAlgorithm uses an algorithm that does not perform shard but insteads always returns the same shard. - NoShardingAlgorithm = "no-sharding" + NoShardingAlgorithm = "no-sharding" + // ConsistentHashingWithBoundedLoadsAlgorithm uses an algorithm that tries to use an equal distribution accross + // all shards but is optimised to handled sharding and/or cluster addings or removal. In case of sharding or + // cluster changes, this algorithm minimise the changes between shard and clusters assignments. + ConsistentHashingWithBoundedLoadsAlgorithm = "consistent-hashing" + DefaultShardingAlgorithm = LegacyShardingAlgorithm ) diff --git a/controller/sharding/sharding.go b/controller/sharding/sharding.go index 5650e3b8d2c30..d42f11d8e3225 100644 --- a/controller/sharding/sharding.go +++ b/controller/sharding/sharding.go @@ -14,10 +14,12 @@ import ( "github.com/argoproj/argo-cd/v2/common" "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1" + slices "golang.org/x/exp/slices" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" + consistent "github.com/akram/go-consistent-hashing-with-bounded-loads" "github.com/argoproj/argo-cd/v2/util/db" "github.com/argoproj/argo-cd/v2/util/env" "github.com/argoproj/argo-cd/v2/util/settings" @@ -83,6 +85,8 @@ func GetDistributionFunction(clusters clusterAccessor, shardingAlgorithm string, distributionFunction = RoundRobinDistributionFunction(clusters, replicasCount) case common.LegacyShardingAlgorithm: distributionFunction = LegacyDistributionFunction(replicasCount) + case common.ConsistentHashingWithBoundedLoadsAlgorithm: + distributionFunction = ConsistentHashingWithBoundedLoadsDistributionFunction(clusters, replicasCount) default: log.Warnf("distribution type %s is not supported, defaulting to %s", shardingAlgorithm, common.DefaultShardingAlgorithm) } @@ -157,6 +161,59 @@ func RoundRobinDistributionFunction(clusters clusterAccessor, replicas int) Dist } } +// ConsistentHashingWithBoundedLoadsDistributionFunction returns a DistributionFunction using an almost homogeneous distribution algorithm: +// for a given cluster the function will return the shard number based on a consistent hashing with bounded loads algorithm. +// This function ensures an almost homogenous distribution: each shards got assigned the fairly similar number of +// clusters +/-10% , but with it is resilient to sharding and/or number of clusters changes. +func ConsistentHashingWithBoundedLoadsDistributionFunction(clusters clusterAccessor, replicas int) DistributionFunction { + return func(c *v1alpha1.Cluster) int { + if replicas > 0 { + if c == nil { // in-cluster does not necessarly have a secret assigned. So we are receiving a nil cluster here. + return 0 + } + + // if Shard is manually set and the assigned value is lower than the number of replicas, + // then its value is returned otherwise it is the default calculated value + if c.Shard != nil && int(*c.Shard) < replicas { + return int(*c.Shard) + } else { + // if the cluster is not in the clusters list anymore, we should unassign it from any shard, so we + // return the reserved value of -1 + if !slices.Contains(clusters(), c) { + log.Warnf("Cluster with id=%s not found in cluster map.", c.ID) + return -1 + } + consistentHashing := createConsistentHashingWithBoundLoads(replicas) + clusterIndex, err := consistentHashing.Get(c.ID) + if err != nil { + log.Warnf("Cluster with id=%s not found in cluster map.", c.ID) + return -1 + } + shard, err := strconv.Atoi(clusterIndex) + if err != nil { + log.Errorf("Consistent Hashing was supposed to return a shard index but it returned %d", err) + return -1 + } + log.Debugf("Cluster with id=%s will be processed by shard %d", c.ID, shard) + return shard + } + } + log.Warnf("The number of replicas (%d) is lower than 1", replicas) + return -1 + } +} + +func createConsistentHashingWithBoundLoads(replicas int) *consistent.Consistent { + consistentHashing := consistent.New() + // Adding a shard with id "-1" as a reserved value for clusters that does not have an assigned shard + // this happens for clusters that are removed for the clusters list + //consistentHashing.Add("-1") + for i := 0; i < replicas; i++ { + consistentHashing.Add(strconv.Itoa(i)) + } + return consistentHashing +} + // NoShardingDistributionFunction returns a DistributionFunction that will process all shards // the function is created for API compatibility purposes. func NoShardingDistributionFunction() DistributionFunction { diff --git a/controller/sharding/sharding_test.go b/controller/sharding/sharding_test.go index 0992f7a9dfd7f..6930848fd15fd 100644 --- a/controller/sharding/sharding_test.go +++ b/controller/sharding/sharding_test.go @@ -266,6 +266,109 @@ func TestGetShardByIndexModuloReplicasCountDistributionFunctionWhenClusterIsAdde assert.Equal(t, -1, distributionFunction(&cluster6)) } +func TestConsistentHashingWhenClusterIsAddedAndRemoved(t *testing.T) { + db := dbmocks.ArgoDB{} + clusterCount := 133 + prefix := "cluster" + + clusters := []v1alpha1.Cluster{} + for i := 0; i < clusterCount; i++ { + id := fmt.Sprintf("%06d", i) + cluster := fmt.Sprintf("%s-%s", prefix, id) + clusters = append(clusters, createCluster(cluster, id)) + } + clusterAccessor := getClusterAccessor(clusters) + clusterList := &v1alpha1.ClusterList{Items: clusters} + db.On("ListClusters", mock.Anything).Return(clusterList, nil) + // Test with replicas set to 3 + replicasCount := 3 + db.On("GetApplicationControllerReplicas").Return(replicasCount) + distributionFunction := ConsistentHashingWithBoundedLoadsDistributionFunction(clusterAccessor, replicasCount) + assert.Equal(t, 0, distributionFunction(nil)) + distributionMap := map[int]int{} + assignementMap := map[string]int{} + for i := 0; i < clusterCount; i++ { + assignedShard := distributionFunction(&clusters[i]) + assignementMap[clusters[i].ID] = assignedShard + distributionMap[assignedShard]++ + + } + + // We check that the distribution does not differ for more than 20% + var sum float64 + sum = 0 + for shard, count := range distributionMap { + if shard != -1 { + sum = (sum + float64(count)) + } + } + average := sum / float64(replicasCount) + failedTests := false + for shard, count := range distributionMap { + if shard != -1 { + if float64(count) > average*float64(1.1) || float64(count) < average*float64(0.9) { + fmt.Printf("Cluster distribution differs for more than 20%%: %d for shard %d (average: %f)\n", count, shard, average) + failedTests = true + } + if failedTests { + t.Fail() + } + } + } + + // Now we will decrease the number of replicas to 2, and we should see only clusters that were attached to shard 2 to be reassigned + replicasCount = 2 + distributionFunction = ConsistentHashingWithBoundedLoadsDistributionFunction(getClusterAccessor(clusterList.Items), replicasCount) + removedCluster := clusterList.Items[len(clusterList.Items)-1] + for i := 0; i < clusterCount; i++ { + c := &clusters[i] + assignedShard := distributionFunction(c) + prevıouslyAssignedShard := assignementMap[clusters[i].ID] + if prevıouslyAssignedShard != 2 && prevıouslyAssignedShard != assignedShard { + fmt.Printf("Previously assigned %s cluster has moved from replica %d to %d", c.ID, prevıouslyAssignedShard, assignedShard) + t.Fail() + } + } + + // Now, we remove the last added cluster, it should be unassigned + removedCluster = clusterList.Items[len(clusterList.Items)-1] + clusterList.Items = clusterList.Items[:len(clusterList.Items)-1] + distributionFunction = ConsistentHashingWithBoundedLoadsDistributionFunction(getClusterAccessor(clusterList.Items), replicasCount) + assert.Equal(t, -1, distributionFunction(&removedCluster)) +} + +func TestConsistentHashingWhenClusterWithZeroReplicas(t *testing.T) { + db := dbmocks.ArgoDB{} + clusters := []v1alpha1.Cluster{createCluster("cluster-01", "01")} + clusterAccessor := getClusterAccessor(clusters) + clusterList := &v1alpha1.ClusterList{Items: clusters} + db.On("ListClusters", mock.Anything).Return(clusterList, nil) + + // Test with replicas set to 0 + replicasCount := 0 + db.On("GetApplicationControllerReplicas").Return(replicasCount) + distributionFunction := ConsistentHashingWithBoundedLoadsDistributionFunction(clusterAccessor, replicasCount) + assert.Equal(t, -1, distributionFunction(nil)) +} + +func TestConsistentHashingWhenClusterWithFixedShard(t *testing.T) { + db := dbmocks.ArgoDB{} + var fixedShard int64 = 1 + cluster := &v1alpha1.Cluster{ID: "1", Shard: &fixedShard} + clusters := []v1alpha1.Cluster{*cluster} + + clusterAccessor := getClusterAccessor(clusters) + clusterList := &v1alpha1.ClusterList{Items: clusters} + db.On("ListClusters", mock.Anything).Return(clusterList, nil) + + // Test with replicas set to 5 + replicasCount := 5 + db.On("GetApplicationControllerReplicas").Return(replicasCount) + distributionFunction := ConsistentHashingWithBoundedLoadsDistributionFunction(clusterAccessor, replicasCount) + assert.Equal(t, fixedShard, int64(distributionFunction(cluster))) + +} + func TestGetShardByIndexModuloReplicasCountDistributionFunction(t *testing.T) { clusters, db, cluster1, cluster2, _, _, _ := createTestClusters() replicasCount := 2 diff --git a/go.mod b/go.mod index 8b91fe20eef1b..f12c40f0d3d2b 100644 --- a/go.mod +++ b/go.mod @@ -1,8 +1,6 @@ module github.com/argoproj/argo-cd/v2 -go 1.21 - -toolchain go1.21.0 +go 1.21.0 require ( code.gitea.io/sdk/gitea v0.15.1 @@ -114,6 +112,7 @@ require ( github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0 // indirect github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.0 // indirect github.com/AzureAD/microsoft-authentication-library-for-go v0.5.2 // indirect + github.com/akram/go-consistent-hashing-with-bounded-loads v0.0.0-20231206173235-2eb81718078d // indirect github.com/aws/aws-sdk-go-v2 v1.17.3 // indirect github.com/aws/aws-sdk-go-v2/config v1.18.8 // indirect github.com/aws/aws-sdk-go-v2/credentials v1.13.8 // indirect @@ -132,6 +131,7 @@ require ( github.com/googleapis/enterprise-certificate-proxy v0.2.5 // indirect github.com/googleapis/gax-go/v2 v2.12.0 // indirect github.com/kylelemons/godebug v1.1.0 // indirect + github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 // indirect github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8 // indirect github.com/tidwall/gjson v1.14.4 // indirect github.com/tidwall/match v1.1.1 // indirect @@ -234,6 +234,7 @@ require ( github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/monochromegane/go-gitignore v0.0.0-20200626010858-205db1a8cc00 // indirect + github.com/montanaflynn/stats v0.7.1 github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/oklog/ulid v1.3.1 // indirect github.com/opencontainers/go-digest v1.0.0 // indirect diff --git a/go.sum b/go.sum index dbe7107f59cba..c82849c92db8e 100644 --- a/go.sum +++ b/go.sum @@ -675,6 +675,8 @@ github.com/ajstarks/deck v0.0.0-20200831202436-30c9fc6549a9/go.mod h1:JynElWSGnm github.com/ajstarks/deck/generate v0.0.0-20210309230005-c3f852c02e19/go.mod h1:T13YZdzov6OU0A1+RfKZiZN9ca6VeKdBdyDV+BY97Tk= github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw= github.com/ajstarks/svgo v0.0.0-20211024235047-1546f124cd8b/go.mod h1:1KcenG0jGWcpt8ov532z81sp/kMMUG485J2InIOyADM= +github.com/akram/go-consistent-hashing-with-bounded-loads v0.0.0-20231206173235-2eb81718078d h1:cqzzANYt5GRtd6m82Fky33Nyy9SbrTtBvsMeFVwHAMM= +github.com/akram/go-consistent-hashing-with-bounded-loads v0.0.0-20231206173235-2eb81718078d/go.mod h1:pN0s7kPxwU7qa0awc0WP6VUMSHzDJodPatu3B14/ids= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= @@ -1403,6 +1405,8 @@ github.com/microsoft/azure-devops-go-api/azuredevops v1.0.0-b5 h1:YH424zrwLTlyHS github.com/microsoft/azure-devops-go-api/azuredevops v1.0.0-b5/go.mod h1:PoGiBqKSQK1vIfQ+yVaFcGjDySHvym6FM1cNYnwzbrY= github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg= github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8/go.mod h1:mC1jAcsrzbxHt8iiaC+zU4b1ylILSosueou12R++wfY= +github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 h1:lYpkrQH5ajf0OXOcUbGjvZxxijuBwbbmlSxLiuofa+g= +github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1/go.mod h1:pD8RvIylQ358TN4wwqatJ8rNavkEINozVn9DtGI3dfQ= github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3/go.mod h1:RagcQ7I8IeTMnF8JTXieKnO4Z6JCsikNEzj0DwauVzE= github.com/minio/md5-simd v1.1.2/go.mod h1:MzdKDxYpY2BT9XQFocsiZf/NKVtR7nkE4RoEpN+20RM= github.com/minio/minio-go/v7 v7.0.58/go.mod h1:NUDy4A4oXPq1l2yK6LTSvCEzAMeIcoz9lcj5dbzSrRE= @@ -1439,6 +1443,8 @@ github.com/monochromegane/go-gitignore v0.0.0-20200626010858-205db1a8cc00 h1:n6/ github.com/monochromegane/go-gitignore v0.0.0-20200626010858-205db1a8cc00/go.mod h1:Pm3mSP3c5uWn86xMLZ5Sa7JB9GsEZySvHYXCTK4E9q4= github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc= github.com/montanaflynn/stats v0.6.6/go.mod h1:etXPPgVO6n31NxCd9KQUMvCM+ve0ruNzt6R8Bnaayow= +github.com/montanaflynn/stats v0.7.1 h1:etflOAAHORrCC44V+aR6Ftzort912ZU+YLiSTuV8eaE= +github.com/montanaflynn/stats v0.7.1/go.mod h1:etXPPgVO6n31NxCd9KQUMvCM+ve0ruNzt6R8Bnaayow= github.com/moul/http2curl v1.0.0/go.mod h1:8UbvGypXm98wA/IqH45anm5Y2Z6ep6O31QGOAZ3H0fQ= github.com/munnerz/goautoneg v0.0.0-20120707110453-a547fc61f48d/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=