From e8a5098bb89bc12cfb3aeb4214f482e9b042a9f6 Mon Sep 17 00:00:00 2001 From: Akram Ben Aissi Date: Mon, 15 Apr 2024 20:04:20 +0200 Subject: [PATCH] Make the assignement consistent accross all clusters - The assignment or running of the algorithm has to be consistent across all the clusters. Changed the function to return a map where the consistent hash will be used to build the map - Modifications to the createConsistentHashsingWithBoundLoads function. This will create the map for cluster to shard. Note that the list must be consistent across all shards so that is why the cluster list must be sorted before going through the consistent hash algorithm Signed-off-by: Akram Ben Aissi --- controller/sharding/consistent/consistent.go | 47 ++++++---------- controller/sharding/sharding.go | 59 +++++++++++++++----- controller/sharding/sharding_test.go | 15 ++--- 3 files changed, 70 insertions(+), 51 deletions(-) diff --git a/controller/sharding/consistent/consistent.go b/controller/sharding/consistent/consistent.go index aad5b144155d2..ab29de814d8aa 100644 --- a/controller/sharding/consistent/consistent.go +++ b/controller/sharding/consistent/consistent.go @@ -116,49 +116,34 @@ func (c *Consistent) GetLeast(client string) (string, error) { if c.clients.Len() == 0 { return "", ErrNoHosts } - h := c.hash(client) - idx := c.search(h) - - i := idx for { - x := item{uint64(i)} - key := c.clients.Get(x) + var foundItem btree.Item + c.clients.AscendGreaterOrEqual(item{h}, func(bItem btree.Item) bool { + if h != bItem.(item).value { + foundItem = bItem + return false // stop the iteration + } + return true + }) + + if foundItem == nil { + // If no host found, wrap around to the first one. + foundItem = c.clients.Min() + } + key := c.clients.Get(foundItem) if key != nil { - host := c.servers[key.(*item).value] + host := c.servers[key.(item).value] if c.loadOK(host) { return host, nil } - i++ - if i >= c.clients.Len() { - i = 0 - } + h = key.(item).value } else { return client, nil } } } -func (c *Consistent) search(key uint64) int { - idx := 0 - found := false - - c.clients.Ascend(func(i btree.Item) bool { - if i.(item).value >= key { - found = true - return false // stop the iteration - } - idx++ - return true - }) - - if !found { - idx = 0 - } - - return idx -} - // Sets the load of `server` to the given `load` func (c *Consistent) UpdateLoad(server string, load int64) { c.Lock() diff --git a/controller/sharding/sharding.go b/controller/sharding/sharding.go index cf8369bcbf763..3fae87e90337f 100644 --- a/controller/sharding/sharding.go +++ b/controller/sharding/sharding.go @@ -87,7 +87,7 @@ func GetDistributionFunction(clusters clusterAccessor, apps appAccessor, shardin case common.LegacyShardingAlgorithm: distributionFunction = LegacyDistributionFunction(replicasCount) case common.ConsistentHashingWithBoundedLoadsAlgorithm: - distributionFunction = ConsistentHashingWithBoundedLoadsDistributionFunction(clusters, replicasCount) + distributionFunction = ConsistentHashingWithBoundedLoadsDistributionFunction(clusters, apps, replicasCount) default: log.Warnf("distribution type %s is not supported, defaulting to %s", shardingAlgorithm, common.DefaultShardingAlgorithm) } @@ -166,7 +166,7 @@ func RoundRobinDistributionFunction(clusters clusterAccessor, replicas int) Dist // for a given cluster the function will return the shard number based on a consistent hashing with bounded loads algorithm. // This function ensures an almost homogenous distribution: each shards got assigned the fairly similar number of // clusters +/-10% , but with it is resilient to sharding and/or number of clusters changes. -func ConsistentHashingWithBoundedLoadsDistributionFunction(clusters clusterAccessor, replicas int) DistributionFunction { +func ConsistentHashingWithBoundedLoadsDistributionFunction(clusters clusterAccessor, apps appAccessor, replicas int) DistributionFunction { return func(c *v1alpha1.Cluster) int { if replicas > 0 { if c == nil { // in-cluster does not necessarly have a secret assigned. So we are receiving a nil cluster here. @@ -184,17 +184,12 @@ func ConsistentHashingWithBoundedLoadsDistributionFunction(clusters clusterAcces log.Warnf("Cluster with id=%s not found in cluster map.", c.ID) return -1 } - consistentHashing := createConsistentHashingWithBoundLoads(replicas) - clusterIndex, err := consistentHashing.Get(c.ID) - if err != nil { + shardIndexedByCluster := createConsistentHashingWithBoundLoads(replicas, clusters, apps) + shard, ok := shardIndexedByCluster[c.ID] + if !ok { log.Warnf("Cluster with id=%s not found in cluster map.", c.ID) return -1 } - shard, err := strconv.Atoi(clusterIndex) - if err != nil { - log.Errorf("Consistent Hashing was supposed to return a shard index but it returned %d", err) - return -1 - } log.Debugf("Cluster with id=%s will be processed by shard %d", c.ID, shard) return shard } @@ -204,15 +199,53 @@ func ConsistentHashingWithBoundedLoadsDistributionFunction(clusters clusterAcces } } -func createConsistentHashingWithBoundLoads(replicas int) *consistent.Consistent { +func createConsistentHashingWithBoundLoads(replicas int, getCluster clusterAccessor, getApp appAccessor) map[string]int { + clusters := getSortedClustersList(getCluster) + appDistribution := getAppDistribution(getCluster, getApp) + shardIndexedByCluster := make(map[string]int) + appsIndexedByShard := make(map[string]int64) consistentHashing := consistent.New() // Adding a shard with id "-1" as a reserved value for clusters that does not have an assigned shard // this happens for clusters that are removed for the clusters list //consistentHashing.Add("-1") for i := 0; i < replicas; i++ { - consistentHashing.Add(strconv.Itoa(i)) + shard := strconv.Itoa(i) + consistentHashing.Add(shard) + appsIndexedByShard[shard] = 0 + } + + for _, c := range clusters { + clusterIndex, err := consistentHashing.GetLeast(c.ID) + if err != nil { + log.Warnf("Cluster with id=%s not found in cluster map.", c.ID) + } + shardIndexedByCluster[c.ID], err = strconv.Atoi(clusterIndex) + if err != nil { + log.Errorf("Consistent Hashing was supposed to return a shard index but it returned %d", err) + } + numApps, ok := appDistribution[c.Server] + if !ok { + numApps = 0 + } + appsIndexedByShard[clusterIndex] += numApps + consistentHashing.UpdateLoad(clusterIndex, appsIndexedByShard[clusterIndex]) + } + + return shardIndexedByCluster +} + +func getAppDistribution(getCluster clusterAccessor, getApps appAccessor) map[string]int64 { + apps := getApps() + clusters := getCluster() + appDistribution := make(map[string]int64, len(clusters)) + + for _, a := range apps { + if _, ok := appDistribution[a.Spec.Destination.Server]; !ok { + appDistribution[a.Spec.Destination.Server] = 0 + } + appDistribution[a.Spec.Destination.Server]++ } - return consistentHashing + return appDistribution } // NoShardingDistributionFunction returns a DistributionFunction that will process all cluster by shard 0 diff --git a/controller/sharding/sharding_test.go b/controller/sharding/sharding_test.go index 5664ae4579535..c4eead67eed04 100644 --- a/controller/sharding/sharding_test.go +++ b/controller/sharding/sharding_test.go @@ -287,12 +287,13 @@ func TestConsistentHashingWhenClusterIsAddedAndRemoved(t *testing.T) { clusters = append(clusters, createCluster(cluster, id)) } clusterAccessor := getClusterAccessor(clusters) + appAccessor, _, _, _, _, _ := createTestApps() clusterList := &v1alpha1.ClusterList{Items: clusters} db.On("ListClusters", mock.Anything).Return(clusterList, nil) // Test with replicas set to 3 replicasCount := 3 db.On("GetApplicationControllerReplicas").Return(replicasCount) - distributionFunction := ConsistentHashingWithBoundedLoadsDistributionFunction(clusterAccessor, replicasCount) + distributionFunction := ConsistentHashingWithBoundedLoadsDistributionFunction(clusterAccessor, appAccessor, replicasCount) assert.Equal(t, 0, distributionFunction(nil)) distributionMap := map[int]int{} assignementMap := map[string]int{} @@ -327,7 +328,7 @@ func TestConsistentHashingWhenClusterIsAddedAndRemoved(t *testing.T) { // Now we will decrease the number of replicas to 2, and we should see only clusters that were attached to shard 2 to be reassigned replicasCount = 2 - distributionFunction = ConsistentHashingWithBoundedLoadsDistributionFunction(getClusterAccessor(clusterList.Items), replicasCount) + distributionFunction = ConsistentHashingWithBoundedLoadsDistributionFunction(getClusterAccessor(clusterList.Items), appAccessor, replicasCount) removedCluster := clusterList.Items[len(clusterList.Items)-1] for i := 0; i < clusterCount; i++ { c := &clusters[i] @@ -338,11 +339,10 @@ func TestConsistentHashingWhenClusterIsAddedAndRemoved(t *testing.T) { t.Fail() } } - // Now, we remove the last added cluster, it should be unassigned removedCluster = clusterList.Items[len(clusterList.Items)-1] clusterList.Items = clusterList.Items[:len(clusterList.Items)-1] - distributionFunction = ConsistentHashingWithBoundedLoadsDistributionFunction(getClusterAccessor(clusterList.Items), replicasCount) + distributionFunction = ConsistentHashingWithBoundedLoadsDistributionFunction(getClusterAccessor(clusterList.Items), appAccessor, replicasCount) assert.Equal(t, -1, distributionFunction(&removedCluster)) } @@ -352,11 +352,11 @@ func TestConsistentHashingWhenClusterWithZeroReplicas(t *testing.T) { clusterAccessor := getClusterAccessor(clusters) clusterList := &v1alpha1.ClusterList{Items: clusters} db.On("ListClusters", mock.Anything).Return(clusterList, nil) - + appAccessor, _, _, _, _, _ := createTestApps() // Test with replicas set to 0 replicasCount := 0 db.On("GetApplicationControllerReplicas").Return(replicasCount) - distributionFunction := ConsistentHashingWithBoundedLoadsDistributionFunction(clusterAccessor, replicasCount) + distributionFunction := ConsistentHashingWithBoundedLoadsDistributionFunction(clusterAccessor, appAccessor, replicasCount) assert.Equal(t, -1, distributionFunction(nil)) } @@ -373,7 +373,8 @@ func TestConsistentHashingWhenClusterWithFixedShard(t *testing.T) { // Test with replicas set to 5 replicasCount := 5 db.On("GetApplicationControllerReplicas").Return(replicasCount) - distributionFunction := ConsistentHashingWithBoundedLoadsDistributionFunction(clusterAccessor, replicasCount) + appAccessor, _, _, _, _, _ := createTestApps() + distributionFunction := ConsistentHashingWithBoundedLoadsDistributionFunction(clusterAccessor, appAccessor, replicasCount) assert.Equal(t, fixedShard, int64(distributionFunction(cluster))) }