diff --git a/controller/sharding/consistent/consistent.go b/controller/sharding/consistent/consistent.go index aad5b144155d2..ab29de814d8aa 100644 --- a/controller/sharding/consistent/consistent.go +++ b/controller/sharding/consistent/consistent.go @@ -116,49 +116,34 @@ func (c *Consistent) GetLeast(client string) (string, error) { if c.clients.Len() == 0 { return "", ErrNoHosts } - h := c.hash(client) - idx := c.search(h) - - i := idx for { - x := item{uint64(i)} - key := c.clients.Get(x) + var foundItem btree.Item + c.clients.AscendGreaterOrEqual(item{h}, func(bItem btree.Item) bool { + if h != bItem.(item).value { + foundItem = bItem + return false // stop the iteration + } + return true + }) + + if foundItem == nil { + // If no host found, wrap around to the first one. + foundItem = c.clients.Min() + } + key := c.clients.Get(foundItem) if key != nil { - host := c.servers[key.(*item).value] + host := c.servers[key.(item).value] if c.loadOK(host) { return host, nil } - i++ - if i >= c.clients.Len() { - i = 0 - } + h = key.(item).value } else { return client, nil } } } -func (c *Consistent) search(key uint64) int { - idx := 0 - found := false - - c.clients.Ascend(func(i btree.Item) bool { - if i.(item).value >= key { - found = true - return false // stop the iteration - } - idx++ - return true - }) - - if !found { - idx = 0 - } - - return idx -} - // Sets the load of `server` to the given `load` func (c *Consistent) UpdateLoad(server string, load int64) { c.Lock() diff --git a/controller/sharding/sharding.go b/controller/sharding/sharding.go index cf8369bcbf763..3fae87e90337f 100644 --- a/controller/sharding/sharding.go +++ b/controller/sharding/sharding.go @@ -87,7 +87,7 @@ func GetDistributionFunction(clusters clusterAccessor, apps appAccessor, shardin case common.LegacyShardingAlgorithm: distributionFunction = LegacyDistributionFunction(replicasCount) case common.ConsistentHashingWithBoundedLoadsAlgorithm: - distributionFunction = ConsistentHashingWithBoundedLoadsDistributionFunction(clusters, replicasCount) + distributionFunction = ConsistentHashingWithBoundedLoadsDistributionFunction(clusters, apps, replicasCount) default: log.Warnf("distribution type %s is not supported, defaulting to %s", shardingAlgorithm, common.DefaultShardingAlgorithm) } @@ -166,7 +166,7 @@ func RoundRobinDistributionFunction(clusters clusterAccessor, replicas int) Dist // for a given cluster the function will return the shard number based on a consistent hashing with bounded loads algorithm. // This function ensures an almost homogenous distribution: each shards got assigned the fairly similar number of // clusters +/-10% , but with it is resilient to sharding and/or number of clusters changes. -func ConsistentHashingWithBoundedLoadsDistributionFunction(clusters clusterAccessor, replicas int) DistributionFunction { +func ConsistentHashingWithBoundedLoadsDistributionFunction(clusters clusterAccessor, apps appAccessor, replicas int) DistributionFunction { return func(c *v1alpha1.Cluster) int { if replicas > 0 { if c == nil { // in-cluster does not necessarly have a secret assigned. So we are receiving a nil cluster here. @@ -184,17 +184,12 @@ func ConsistentHashingWithBoundedLoadsDistributionFunction(clusters clusterAcces log.Warnf("Cluster with id=%s not found in cluster map.", c.ID) return -1 } - consistentHashing := createConsistentHashingWithBoundLoads(replicas) - clusterIndex, err := consistentHashing.Get(c.ID) - if err != nil { + shardIndexedByCluster := createConsistentHashingWithBoundLoads(replicas, clusters, apps) + shard, ok := shardIndexedByCluster[c.ID] + if !ok { log.Warnf("Cluster with id=%s not found in cluster map.", c.ID) return -1 } - shard, err := strconv.Atoi(clusterIndex) - if err != nil { - log.Errorf("Consistent Hashing was supposed to return a shard index but it returned %d", err) - return -1 - } log.Debugf("Cluster with id=%s will be processed by shard %d", c.ID, shard) return shard } @@ -204,15 +199,53 @@ func ConsistentHashingWithBoundedLoadsDistributionFunction(clusters clusterAcces } } -func createConsistentHashingWithBoundLoads(replicas int) *consistent.Consistent { +func createConsistentHashingWithBoundLoads(replicas int, getCluster clusterAccessor, getApp appAccessor) map[string]int { + clusters := getSortedClustersList(getCluster) + appDistribution := getAppDistribution(getCluster, getApp) + shardIndexedByCluster := make(map[string]int) + appsIndexedByShard := make(map[string]int64) consistentHashing := consistent.New() // Adding a shard with id "-1" as a reserved value for clusters that does not have an assigned shard // this happens for clusters that are removed for the clusters list //consistentHashing.Add("-1") for i := 0; i < replicas; i++ { - consistentHashing.Add(strconv.Itoa(i)) + shard := strconv.Itoa(i) + consistentHashing.Add(shard) + appsIndexedByShard[shard] = 0 + } + + for _, c := range clusters { + clusterIndex, err := consistentHashing.GetLeast(c.ID) + if err != nil { + log.Warnf("Cluster with id=%s not found in cluster map.", c.ID) + } + shardIndexedByCluster[c.ID], err = strconv.Atoi(clusterIndex) + if err != nil { + log.Errorf("Consistent Hashing was supposed to return a shard index but it returned %d", err) + } + numApps, ok := appDistribution[c.Server] + if !ok { + numApps = 0 + } + appsIndexedByShard[clusterIndex] += numApps + consistentHashing.UpdateLoad(clusterIndex, appsIndexedByShard[clusterIndex]) + } + + return shardIndexedByCluster +} + +func getAppDistribution(getCluster clusterAccessor, getApps appAccessor) map[string]int64 { + apps := getApps() + clusters := getCluster() + appDistribution := make(map[string]int64, len(clusters)) + + for _, a := range apps { + if _, ok := appDistribution[a.Spec.Destination.Server]; !ok { + appDistribution[a.Spec.Destination.Server] = 0 + } + appDistribution[a.Spec.Destination.Server]++ } - return consistentHashing + return appDistribution } // NoShardingDistributionFunction returns a DistributionFunction that will process all cluster by shard 0 diff --git a/controller/sharding/sharding_test.go b/controller/sharding/sharding_test.go index 5664ae4579535..c4eead67eed04 100644 --- a/controller/sharding/sharding_test.go +++ b/controller/sharding/sharding_test.go @@ -287,12 +287,13 @@ func TestConsistentHashingWhenClusterIsAddedAndRemoved(t *testing.T) { clusters = append(clusters, createCluster(cluster, id)) } clusterAccessor := getClusterAccessor(clusters) + appAccessor, _, _, _, _, _ := createTestApps() clusterList := &v1alpha1.ClusterList{Items: clusters} db.On("ListClusters", mock.Anything).Return(clusterList, nil) // Test with replicas set to 3 replicasCount := 3 db.On("GetApplicationControllerReplicas").Return(replicasCount) - distributionFunction := ConsistentHashingWithBoundedLoadsDistributionFunction(clusterAccessor, replicasCount) + distributionFunction := ConsistentHashingWithBoundedLoadsDistributionFunction(clusterAccessor, appAccessor, replicasCount) assert.Equal(t, 0, distributionFunction(nil)) distributionMap := map[int]int{} assignementMap := map[string]int{} @@ -327,7 +328,7 @@ func TestConsistentHashingWhenClusterIsAddedAndRemoved(t *testing.T) { // Now we will decrease the number of replicas to 2, and we should see only clusters that were attached to shard 2 to be reassigned replicasCount = 2 - distributionFunction = ConsistentHashingWithBoundedLoadsDistributionFunction(getClusterAccessor(clusterList.Items), replicasCount) + distributionFunction = ConsistentHashingWithBoundedLoadsDistributionFunction(getClusterAccessor(clusterList.Items), appAccessor, replicasCount) removedCluster := clusterList.Items[len(clusterList.Items)-1] for i := 0; i < clusterCount; i++ { c := &clusters[i] @@ -338,11 +339,10 @@ func TestConsistentHashingWhenClusterIsAddedAndRemoved(t *testing.T) { t.Fail() } } - // Now, we remove the last added cluster, it should be unassigned removedCluster = clusterList.Items[len(clusterList.Items)-1] clusterList.Items = clusterList.Items[:len(clusterList.Items)-1] - distributionFunction = ConsistentHashingWithBoundedLoadsDistributionFunction(getClusterAccessor(clusterList.Items), replicasCount) + distributionFunction = ConsistentHashingWithBoundedLoadsDistributionFunction(getClusterAccessor(clusterList.Items), appAccessor, replicasCount) assert.Equal(t, -1, distributionFunction(&removedCluster)) } @@ -352,11 +352,11 @@ func TestConsistentHashingWhenClusterWithZeroReplicas(t *testing.T) { clusterAccessor := getClusterAccessor(clusters) clusterList := &v1alpha1.ClusterList{Items: clusters} db.On("ListClusters", mock.Anything).Return(clusterList, nil) - + appAccessor, _, _, _, _, _ := createTestApps() // Test with replicas set to 0 replicasCount := 0 db.On("GetApplicationControllerReplicas").Return(replicasCount) - distributionFunction := ConsistentHashingWithBoundedLoadsDistributionFunction(clusterAccessor, replicasCount) + distributionFunction := ConsistentHashingWithBoundedLoadsDistributionFunction(clusterAccessor, appAccessor, replicasCount) assert.Equal(t, -1, distributionFunction(nil)) } @@ -373,7 +373,8 @@ func TestConsistentHashingWhenClusterWithFixedShard(t *testing.T) { // Test with replicas set to 5 replicasCount := 5 db.On("GetApplicationControllerReplicas").Return(replicasCount) - distributionFunction := ConsistentHashingWithBoundedLoadsDistributionFunction(clusterAccessor, replicasCount) + appAccessor, _, _, _, _, _ := createTestApps() + distributionFunction := ConsistentHashingWithBoundedLoadsDistributionFunction(clusterAccessor, appAccessor, replicasCount) assert.Equal(t, fixedShard, int64(distributionFunction(cluster))) }