Skip to content

Commit

Permalink
Adds consistent hashing with bound loads sharding algorithm
Browse files Browse the repository at this point in the history
Signed-off-by: Akram Ben Aissi <akram.benaissi@gmail.com>
  • Loading branch information
akram committed Dec 7, 2023
1 parent 3fa89b0 commit a2714e0
Show file tree
Hide file tree
Showing 5 changed files with 173 additions and 4 deletions.
7 changes: 6 additions & 1 deletion common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,12 @@ const (
// AppControllerHeartbeatUpdateRetryCount is the retry count for updating the Shard Mapping to the Shard Mapping ConfigMap used by Application Controller
AppControllerHeartbeatUpdateRetryCount = 3
// NoShardingAlgorithm uses an algorithm that does not perform shard but insteads always returns the same shard.
NoShardingAlgorithm = "no-sharding"
NoShardingAlgorithm = "no-sharding"
// ConsistentHashingWithBoundedLoadsAlgorithm uses an algorithm that tries to use an equal distribution accross
// all shards but is optimised to handled sharding and/or cluster addings or removal. In case of sharding or
// cluster changes, this algorithm minimise the changes between shard and clusters assignments.
ConsistentHashingWithBoundedLoadsAlgorithm = "consistent-hashing"

DefaultShardingAlgorithm = LegacyShardingAlgorithm
)

Expand Down
57 changes: 57 additions & 0 deletions controller/sharding/sharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@ import (

"github.com/argoproj/argo-cd/v2/common"
"github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"
slices "golang.org/x/exp/slices"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"

consistent "github.com/akram/go-consistent-hashing-with-bounded-loads"
"github.com/argoproj/argo-cd/v2/util/db"
"github.com/argoproj/argo-cd/v2/util/env"
"github.com/argoproj/argo-cd/v2/util/settings"
Expand Down Expand Up @@ -83,6 +85,8 @@ func GetDistributionFunction(clusters clusterAccessor, shardingAlgorithm string,
distributionFunction = RoundRobinDistributionFunction(clusters, replicasCount)
case common.LegacyShardingAlgorithm:
distributionFunction = LegacyDistributionFunction(replicasCount)
case common.ConsistentHashingWithBoundedLoadsAlgorithm:
distributionFunction = ConsistentHashingWithBoundedLoadsDistributionFunction(clusters, replicasCount)
default:
log.Warnf("distribution type %s is not supported, defaulting to %s", shardingAlgorithm, common.DefaultShardingAlgorithm)
}
Expand Down Expand Up @@ -157,6 +161,59 @@ func RoundRobinDistributionFunction(clusters clusterAccessor, replicas int) Dist
}
}

// ConsistentHashingWithBoundedLoadsDistributionFunction returns a DistributionFunction using an almost homogeneous distribution algorithm:
// for a given cluster the function will return the shard number based on a consistent hashing with bounded loads algorithm.
// This function ensures an almost homogenous distribution: each shards got assigned the fairly similar number of
// clusters +/-10% , but with it is resilient to sharding and/or number of clusters changes.
func ConsistentHashingWithBoundedLoadsDistributionFunction(clusters clusterAccessor, replicas int) DistributionFunction {
return func(c *v1alpha1.Cluster) int {
if replicas > 0 {
if c == nil { // in-cluster does not necessarly have a secret assigned. So we are receiving a nil cluster here.
return 0
}

// if Shard is manually set and the assigned value is lower than the number of replicas,
// then its value is returned otherwise it is the default calculated value
if c.Shard != nil && int(*c.Shard) < replicas {
return int(*c.Shard)
} else {
// if the cluster is not in the clusters list anymore, we should unassign it from any shard, so we
// return the reserved value of -1
if !slices.Contains(clusters(), c) {
log.Warnf("Cluster with id=%s not found in cluster map.", c.ID)
return -1
}
consistentHashing := createConsistentHashingWithBoundLoads(replicas)
clusterIndex, err := consistentHashing.Get(c.ID)
if err != nil {
log.Warnf("Cluster with id=%s not found in cluster map.", c.ID)
return -1
}
shard, err := strconv.Atoi(clusterIndex)
if err != nil {
log.Errorf("Consistent Hashing was supposed to return a shard index but it returned %d", err)
return -1
}
log.Debugf("Cluster with id=%s will be processed by shard %d", c.ID, shard)
return shard
}
}
log.Warnf("The number of replicas (%d) is lower than 1", replicas)
return -1
}
}

func createConsistentHashingWithBoundLoads(replicas int) *consistent.Consistent {
consistentHashing := consistent.New()
// Adding a shard with id "-1" as a reserved value for clusters that does not have an assigned shard
// this happens for clusters that are removed for the clusters list
//consistentHashing.Add("-1")
for i := 0; i < replicas; i++ {
consistentHashing.Add(strconv.Itoa(i))
}
return consistentHashing
}

// NoShardingDistributionFunction returns a DistributionFunction that will process all shards
// the function is created for API compatibility purposes.
func NoShardingDistributionFunction() DistributionFunction {
Expand Down
103 changes: 103 additions & 0 deletions controller/sharding/sharding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,109 @@ func TestGetShardByIndexModuloReplicasCountDistributionFunctionWhenClusterIsAdde
assert.Equal(t, -1, distributionFunction(&cluster6))
}

func TestConsistentHashingWhenClusterIsAddedAndRemoved(t *testing.T) {
db := dbmocks.ArgoDB{}
clusterCount := 133
prefix := "cluster"

clusters := []v1alpha1.Cluster{}
for i := 0; i < clusterCount; i++ {
id := fmt.Sprintf("%06d", i)
cluster := fmt.Sprintf("%s-%s", prefix, id)
clusters = append(clusters, createCluster(cluster, id))
}
clusterAccessor := getClusterAccessor(clusters)
clusterList := &v1alpha1.ClusterList{Items: clusters}
db.On("ListClusters", mock.Anything).Return(clusterList, nil)
// Test with replicas set to 3
replicasCount := 3
db.On("GetApplicationControllerReplicas").Return(replicasCount)
distributionFunction := ConsistentHashingWithBoundedLoadsDistributionFunction(clusterAccessor, replicasCount)
assert.Equal(t, 0, distributionFunction(nil))
distributionMap := map[int]int{}
assignementMap := map[string]int{}
for i := 0; i < clusterCount; i++ {
assignedShard := distributionFunction(&clusters[i])
assignementMap[clusters[i].ID] = assignedShard
distributionMap[assignedShard]++

}

// We check that the distribution does not differ for more than 20%
var sum float64
sum = 0
for shard, count := range distributionMap {
if shard != -1 {
sum = (sum + float64(count))
}
}
average := sum / float64(replicasCount)
failedTests := false
for shard, count := range distributionMap {
if shard != -1 {
if float64(count) > average*float64(1.1) || float64(count) < average*float64(0.9) {
fmt.Printf("Cluster distribution differs for more than 20%%: %d for shard %d (average: %f)\n", count, shard, average)
failedTests = true
}
if failedTests {
t.Fail()
}
}
}

// Now we will decrease the number of replicas to 2, and we should see only clusters that were attached to shard 2 to be reassigned
replicasCount = 2
distributionFunction = ConsistentHashingWithBoundedLoadsDistributionFunction(getClusterAccessor(clusterList.Items), replicasCount)
removedCluster := clusterList.Items[len(clusterList.Items)-1]
for i := 0; i < clusterCount; i++ {
c := &clusters[i]
assignedShard := distributionFunction(c)
prevıouslyAssignedShard := assignementMap[clusters[i].ID]
if prevıouslyAssignedShard != 2 && prevıouslyAssignedShard != assignedShard {
fmt.Printf("Previously assigned %s cluster has moved from replica %d to %d", c.ID, prevıouslyAssignedShard, assignedShard)
t.Fail()
}
}

// Now, we remove the last added cluster, it should be unassigned
removedCluster = clusterList.Items[len(clusterList.Items)-1]
clusterList.Items = clusterList.Items[:len(clusterList.Items)-1]
distributionFunction = ConsistentHashingWithBoundedLoadsDistributionFunction(getClusterAccessor(clusterList.Items), replicasCount)
assert.Equal(t, -1, distributionFunction(&removedCluster))
}

func TestConsistentHashingWhenClusterWithZeroReplicas(t *testing.T) {
db := dbmocks.ArgoDB{}
clusters := []v1alpha1.Cluster{createCluster("cluster-01", "01")}
clusterAccessor := getClusterAccessor(clusters)
clusterList := &v1alpha1.ClusterList{Items: clusters}
db.On("ListClusters", mock.Anything).Return(clusterList, nil)

// Test with replicas set to 0
replicasCount := 0
db.On("GetApplicationControllerReplicas").Return(replicasCount)
distributionFunction := ConsistentHashingWithBoundedLoadsDistributionFunction(clusterAccessor, replicasCount)
assert.Equal(t, -1, distributionFunction(nil))
}

func TestConsistentHashingWhenClusterWithFixedShard(t *testing.T) {
db := dbmocks.ArgoDB{}
var fixedShard int64 = 1
cluster := &v1alpha1.Cluster{ID: "1", Shard: &fixedShard}
clusters := []v1alpha1.Cluster{*cluster}

clusterAccessor := getClusterAccessor(clusters)
clusterList := &v1alpha1.ClusterList{Items: clusters}
db.On("ListClusters", mock.Anything).Return(clusterList, nil)

// Test with replicas set to 5
replicasCount := 5
db.On("GetApplicationControllerReplicas").Return(replicasCount)
distributionFunction := ConsistentHashingWithBoundedLoadsDistributionFunction(clusterAccessor, replicasCount)
assert.Equal(t, fixedShard, int64(distributionFunction(cluster)))

}

func TestGetShardByIndexModuloReplicasCountDistributionFunction(t *testing.T) {
clusters, db, cluster1, cluster2, _, _, _ := createTestClusters()
replicasCount := 2
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
module github.com/argoproj/argo-cd/v2

go 1.21

toolchain go1.21.0
go 1.21.0

require (
code.gitea.io/sdk/gitea v0.15.1
Expand All @@ -11,6 +9,7 @@ require (
github.com/Masterminds/semver/v3 v3.2.1
github.com/Masterminds/sprig/v3 v3.2.3
github.com/TomOnTime/utfutil v0.0.0-20180511104225-09c41003ee1d
github.com/akram/go-consistent-hashing-with-bounded-loads v0.0.0-20231206173235-2eb81718078d
github.com/alicebob/miniredis/v2 v2.30.4
github.com/antonmedv/expr v1.15.2
github.com/argoproj/gitops-engine v0.7.1-0.20231102154024-c0c2dd1f6f48
Expand Down Expand Up @@ -132,6 +131,7 @@ require (
github.com/googleapis/enterprise-certificate-proxy v0.2.5 // indirect
github.com/googleapis/gax-go/v2 v2.12.0 // indirect
github.com/kylelemons/godebug v1.1.0 // indirect
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 // indirect
github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8 // indirect
github.com/tidwall/gjson v1.14.4 // indirect
github.com/tidwall/match v1.1.1 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,8 @@ github.com/ajstarks/deck v0.0.0-20200831202436-30c9fc6549a9/go.mod h1:JynElWSGnm
github.com/ajstarks/deck/generate v0.0.0-20210309230005-c3f852c02e19/go.mod h1:T13YZdzov6OU0A1+RfKZiZN9ca6VeKdBdyDV+BY97Tk=
github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw=
github.com/ajstarks/svgo v0.0.0-20211024235047-1546f124cd8b/go.mod h1:1KcenG0jGWcpt8ov532z81sp/kMMUG485J2InIOyADM=
github.com/akram/go-consistent-hashing-with-bounded-loads v0.0.0-20231206173235-2eb81718078d h1:cqzzANYt5GRtd6m82Fky33Nyy9SbrTtBvsMeFVwHAMM=
github.com/akram/go-consistent-hashing-with-bounded-loads v0.0.0-20231206173235-2eb81718078d/go.mod h1:pN0s7kPxwU7qa0awc0WP6VUMSHzDJodPatu3B14/ids=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
Expand Down Expand Up @@ -1403,6 +1405,8 @@ github.com/microsoft/azure-devops-go-api/azuredevops v1.0.0-b5 h1:YH424zrwLTlyHS
github.com/microsoft/azure-devops-go-api/azuredevops v1.0.0-b5/go.mod h1:PoGiBqKSQK1vIfQ+yVaFcGjDySHvym6FM1cNYnwzbrY=
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8/go.mod h1:mC1jAcsrzbxHt8iiaC+zU4b1ylILSosueou12R++wfY=
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 h1:lYpkrQH5ajf0OXOcUbGjvZxxijuBwbbmlSxLiuofa+g=
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1/go.mod h1:pD8RvIylQ358TN4wwqatJ8rNavkEINozVn9DtGI3dfQ=
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3/go.mod h1:RagcQ7I8IeTMnF8JTXieKnO4Z6JCsikNEzj0DwauVzE=
github.com/minio/md5-simd v1.1.2/go.mod h1:MzdKDxYpY2BT9XQFocsiZf/NKVtR7nkE4RoEpN+20RM=
github.com/minio/minio-go/v7 v7.0.58/go.mod h1:NUDy4A4oXPq1l2yK6LTSvCEzAMeIcoz9lcj5dbzSrRE=
Expand Down

0 comments on commit a2714e0

Please sign in to comment.