Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Consistent hashing with bounded loads algorithm for sharding #16564

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func NewCommand() *cobra.Command {
command.Flags().StringSliceVar(&otlpAttrs, "otlp-attrs", env.StringsFromEnv("ARGOCD_APPLICATION_CONTROLLER_OTLP_ATTRS", []string{}, ","), "List of OpenTelemetry collector extra attrs when send traces, each attribute is separated by a colon(e.g. key:value)")
command.Flags().StringSliceVar(&applicationNamespaces, "application-namespaces", env.StringsFromEnv("ARGOCD_APPLICATION_NAMESPACES", []string{}, ","), "List of additional namespaces that applications are allowed to be reconciled from")
command.Flags().BoolVar(&persistResourceHealth, "persist-resource-health", env.ParseBoolFromEnv("ARGOCD_APPLICATION_CONTROLLER_PERSIST_RESOURCE_HEALTH", true), "Enables storing the managed resources health in the Application CRD")
command.Flags().StringVar(&shardingAlgorithm, "sharding-method", env.StringFromEnv(common.EnvControllerShardingAlgorithm, common.DefaultShardingAlgorithm), "Enables choice of sharding method. Supported sharding methods are : [legacy, round-robin] ")
command.Flags().StringVar(&shardingAlgorithm, "sharding-method", env.StringFromEnv(common.EnvControllerShardingAlgorithm, common.DefaultShardingAlgorithm), "Enables choice of sharding method. Supported sharding methods are : [legacy, round-robin, consistent-hashing] ")
// global queue rate limit config
command.Flags().Int64Var(&workqueueRateLimit.BucketSize, "wq-bucket-size", env.ParseInt64FromEnv("WORKQUEUE_BUCKET_SIZE", 500, 1, math.MaxInt64), "Set Workqueue Rate Limiter Bucket Size, default 500")
command.Flags().Float64Var(&workqueueRateLimit.BucketQPS, "wq-bucket-qps", env.ParseFloat64FromEnv("WORKQUEUE_BUCKET_QPS", math.MaxFloat64, 1, math.MaxFloat64), "Set Workqueue Rate Limiter Bucket QPS, default set to MaxFloat64 which disables the bucket limiter")
Expand Down
4 changes: 2 additions & 2 deletions cmd/argocd/commands/admin/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func NewClusterShardsCommand(clientOpts *argocdclient.ClientOptions) *cobra.Comm
clientConfig = cli.AddKubectlFlagsToCmd(&command)
command.Flags().IntVar(&shard, "shard", -1, "Cluster shard filter")
command.Flags().IntVar(&replicas, "replicas", 0, "Application controller replicas count. Inferred from number of running controller pods if not specified")
command.Flags().StringVar(&shardingAlgorithm, "sharding-method", common.DefaultShardingAlgorithm, "Sharding method. Defaults: legacy. Supported sharding methods are : [legacy, round-robin] ")
command.Flags().StringVar(&shardingAlgorithm, "sharding-method", common.DefaultShardingAlgorithm, "Sharding method. Defaults: legacy. Supported sharding methods are : [legacy, round-robin, consistent-hashing] ")
command.Flags().BoolVar(&portForwardRedis, "port-forward-redis", true, "Automatically port-forward ha proxy redis from current namespace?")

cacheSrc = appstatecache.AddCacheFlagsToCmd(&command)
Expand Down Expand Up @@ -514,7 +514,7 @@ argocd admin cluster stats target-cluster`,
clientConfig = cli.AddKubectlFlagsToCmd(&command)
command.Flags().IntVar(&shard, "shard", -1, "Cluster shard filter")
command.Flags().IntVar(&replicas, "replicas", 0, "Application controller replicas count. Inferred from number of running controller pods if not specified")
command.Flags().StringVar(&shardingAlgorithm, "sharding-method", common.DefaultShardingAlgorithm, "Sharding method. Defaults: legacy. Supported sharding methods are : [legacy, round-robin] ")
command.Flags().StringVar(&shardingAlgorithm, "sharding-method", common.DefaultShardingAlgorithm, "Sharding method. Defaults: legacy. Supported sharding methods are : [legacy, round-robin, consistent-hashing] ")
command.Flags().BoolVar(&portForwardRedis, "port-forward-redis", true, "Automatically port-forward ha proxy redis from current namespace?")
cacheSrc = appstatecache.AddCacheFlagsToCmd(&command)

Expand Down
8 changes: 7 additions & 1 deletion common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,13 @@ const (
RoundRobinShardingAlgorithm = "round-robin"
// AppControllerHeartbeatUpdateRetryCount is the retry count for updating the Shard Mapping to the Shard Mapping ConfigMap used by Application Controller
AppControllerHeartbeatUpdateRetryCount = 3
DefaultShardingAlgorithm = LegacyShardingAlgorithm

// ConsistentHashingWithBoundedLoadsAlgorithm uses an algorithm that tries to use an equal distribution across
// all shards but is optimised to handle sharding and/or cluster addition or removal. In case of sharding or
// cluster changes, this algorithm minimises the changes between shard and clusters assignments.
ConsistentHashingWithBoundedLoadsAlgorithm = "consistent-hashing"

DefaultShardingAlgorithm = LegacyShardingAlgorithm
)

// Dex related constants
Expand Down
274 changes: 274 additions & 0 deletions controller/sharding/consistent/consistent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,274 @@
// An implementation of Consistent Hashing and
// Consistent Hashing With Bounded Loads.
//
// https://en.wikipedia.org/wiki/Consistent_hashing
//
// https://research.googleblog.com/2017/04/consistent-hashing-with-bounded-loads.html
package consistent

import (
"encoding/binary"
"errors"
"fmt"
"math"
"sync"
"sync/atomic"

"github.com/google/btree"

blake2b "github.com/minio/blake2b-simd"
)

// OptimalExtraCapacityFactor extra factor capacity (1 + ε). The ideal balance
// between keeping the shards uniform while also keeping consistency when
// changing shard numbers.
const OptimalExtraCapacityFactor = 1.25

var ErrNoHosts = errors.New("no hosts added")

type Host struct {
Name string
Load int64
}

type Consistent struct {
servers map[uint64]string
clients *btree.BTree
loadMap map[string]*Host
totalLoad int64
replicationFactor int

sync.RWMutex
}

type item struct {
value uint64
}

func (i item) Less(than btree.Item) bool {
return i.value < than.(item).value
}

func New() *Consistent {
return &Consistent{
servers: map[uint64]string{},
clients: btree.New(2),
loadMap: map[string]*Host{},
replicationFactor: 1000,
}
}

func NewWithReplicationFactor(replicationFactor int) *Consistent {
return &Consistent{
servers: map[uint64]string{},
clients: btree.New(2),
loadMap: map[string]*Host{},
replicationFactor: replicationFactor,
}
}
func (c *Consistent) Add(server string) {
c.Lock()
defer c.Unlock()

if _, ok := c.loadMap[server]; ok {
return
}

c.loadMap[server] = &Host{Name: server, Load: 0}
for i := 0; i < c.replicationFactor; i++ {
h := c.hash(fmt.Sprintf("%s%d", server, i))
c.servers[h] = server
c.clients.ReplaceOrInsert(item{h})
}
}

// Get returns the server that owns the given client.
// As described in https://en.wikipedia.org/wiki/Consistent_hashing
// It returns ErrNoHosts if the ring has no servers in it.
func (c *Consistent) Get(client string) (string, error) {
c.RLock()
defer c.RUnlock()

if c.clients.Len() == 0 {
return "", ErrNoHosts
}

h := c.hash(client)
var foundItem btree.Item
c.clients.AscendGreaterOrEqual(item{h}, func(i btree.Item) bool {
foundItem = i
return false // stop the iteration
})

if foundItem == nil {
// If no host found, wrap around to the first one.
foundItem = c.clients.Min()
}

host := c.servers[foundItem.(item).value]

return host, nil
}

// GetLeast returns the least loaded host that can serve the key.
// It uses Consistent Hashing With Bounded loads.
// https://research.googleblog.com/2017/04/consistent-hashing-with-bounded-loads.html
// It returns ErrNoHosts if the ring has no hosts in it.
func (c *Consistent) GetLeast(client string) (string, error) {
c.RLock()
defer c.RUnlock()

if c.clients.Len() == 0 {
return "", ErrNoHosts
}
h := c.hash(client)
for {
var foundItem btree.Item
c.clients.AscendGreaterOrEqual(item{h}, func(bItem btree.Item) bool {
if h != bItem.(item).value {
foundItem = bItem
return false // stop the iteration
}
return true
})

if foundItem == nil {
// If no host found, wrap around to the first one.
foundItem = c.clients.Min()
}
key := c.clients.Get(foundItem)
if key != nil {
host := c.servers[key.(item).value]
if c.loadOK(host) {
return host, nil
}
h = key.(item).value
} else {
return client, nil
}
}
}

// Sets the load of `server` to the given `load`
func (c *Consistent) UpdateLoad(server string, load int64) {
c.Lock()
defer c.Unlock()

if _, ok := c.loadMap[server]; !ok {
return
}
c.totalLoad -= c.loadMap[server].Load
c.loadMap[server].Load = load
c.totalLoad += load
}

// Increments the load of host by 1
//
// should only be used with if you obtained a host with GetLeast
func (c *Consistent) Inc(server string) {
c.Lock()
defer c.Unlock()

if _, ok := c.loadMap[server]; !ok {
return
}
atomic.AddInt64(&c.loadMap[server].Load, 1)
atomic.AddInt64(&c.totalLoad, 1)
}

// Decrements the load of host by 1
//
// should only be used with if you obtained a host with GetLeast
func (c *Consistent) Done(server string) {
c.Lock()
defer c.Unlock()

if _, ok := c.loadMap[server]; !ok {
return
}
atomic.AddInt64(&c.loadMap[server].Load, -1)
atomic.AddInt64(&c.totalLoad, -1)
}

// Deletes host from the ring
func (c *Consistent) Remove(server string) bool {
c.Lock()
defer c.Unlock()

for i := 0; i < c.replicationFactor; i++ {
h := c.hash(fmt.Sprintf("%s%d", server, i))
delete(c.servers, h)
c.delSlice(h)
}
delete(c.loadMap, server)
return true
}

// Return the list of servers in the ring
func (c *Consistent) Servers() (servers []string) {
c.RLock()
defer c.RUnlock()
for k := range c.loadMap {
servers = append(servers, k)
}
return servers
}

// Returns the loads of all the hosts
func (c *Consistent) GetLoads() map[string]int64 {
loads := map[string]int64{}

for k, v := range c.loadMap {
loads[k] = v.Load
}
return loads
}

// Returns the maximum load of the single host
// which is:
// (total_load/number_of_hosts)*1.25
// total_load = is the total number of active requests served by hosts
// for more info:
// https://research.googleblog.com/2017/04/consistent-hashing-with-bounded-loads.html
func (c *Consistent) MaxLoad() int64 {
if c.totalLoad == 0 {
c.totalLoad = 1
}
var avgLoadPerNode float64
avgLoadPerNode = float64(c.totalLoad / int64(len(c.loadMap)))
if avgLoadPerNode == 0 {
avgLoadPerNode = 1
}
avgLoadPerNode = math.Ceil(avgLoadPerNode * OptimalExtraCapacityFactor)
return int64(avgLoadPerNode)
}

func (c *Consistent) loadOK(server string) bool {
// a safety check if someone performed c.Done more than needed
if c.totalLoad < 0 {
c.totalLoad = 0
}

var avgLoadPerNode float64
avgLoadPerNode = float64((c.totalLoad + 1) / int64(len(c.loadMap)))
if avgLoadPerNode == 0 {
avgLoadPerNode = 1
}
avgLoadPerNode = math.Ceil(avgLoadPerNode * 1.25)

bserver, ok := c.loadMap[server]
if !ok {
panic(fmt.Sprintf("given host(%s) not in loadsMap", bserver.Name))
}

return float64(bserver.Load)+1 <= avgLoadPerNode
}

func (c *Consistent) delSlice(val uint64) {
c.clients.Delete(item{val})
}

func (c *Consistent) hash(key string) uint64 {
out := blake2b.Sum512([]byte(key))
return binary.LittleEndian.Uint64(out[:])
}
Loading