Skip to content

Commit

Permalink
Adds consistent hashing with bound loads sharding algorithm
Browse files Browse the repository at this point in the history
Signed-off-by: Akram Ben Aissi <akram.benaissi@gmail.com>
  • Loading branch information
akram committed Jan 25, 2024
1 parent 8932036 commit 5cf46ad
Show file tree
Hide file tree
Showing 6 changed files with 459 additions and 5 deletions.
7 changes: 6 additions & 1 deletion common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,12 @@ const (
RoundRobinShardingAlgorithm = "round-robin"
// AppControllerHeartbeatUpdateRetryCount is the retry count for updating the Shard Mapping to the Shard Mapping ConfigMap used by Application Controller
AppControllerHeartbeatUpdateRetryCount = 3
DefaultShardingAlgorithm = LegacyShardingAlgorithm
// ConsistentHashingWithBoundedLoadsAlgorithm uses an algorithm that tries to use an equal distribution accross
// all shards but is optimised to handled sharding and/or cluster addings or removal. In case of sharding or
// cluster changes, this algorithm minimise the changes between shard and clusters assignments.
ConsistentHashingWithBoundedLoadsAlgorithm = "consistent-hashing"

DefaultShardingAlgorithm = LegacyShardingAlgorithm
)

// Dex related constants
Expand Down
288 changes: 288 additions & 0 deletions controller/sharding/consistent/consistent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,288 @@
// An implementation of Consistent Hashing and
// Consistent Hashing With Bounded Loads.
//
// https://en.wikipedia.org/wiki/Consistent_hashing
//
// https://research.googleblog.com/2017/04/consistent-hashing-with-bounded-loads.html
package consistent

import (
"encoding/binary"
"errors"
"fmt"
"math"
"sync"
"sync/atomic"

"github.com/google/btree"

blake2b "github.com/minio/blake2b-simd"
)

var ErrNoHosts = errors.New("no hosts added")

type Host struct {
Name string
Load int64
}

type Consistent struct {
servers map[uint64]string
clients *btree.BTree
loadMap map[string]*Host
totalLoad int64
replicationFactor int

sync.RWMutex
}

type item struct {
value uint64
}

func (i item) Less(than btree.Item) bool {
return i.value < than.(item).value
}

func New() *Consistent {
return &Consistent{
servers: map[uint64]string{},
clients: btree.New(2),
loadMap: map[string]*Host{},
replicationFactor: 1000,
}
}

func NewWithReplicationFactor(replicationFactor int) *Consistent {
return &Consistent{
servers: map[uint64]string{},
clients: btree.New(2),
loadMap: map[string]*Host{},
replicationFactor: replicationFactor,
}
}
func (c *Consistent) Add(server string) {
c.Lock()
defer c.Unlock()

if _, ok := c.loadMap[server]; ok {
return
}

c.loadMap[server] = &Host{Name: server, Load: 0}
for i := 0; i < c.replicationFactor; i++ {
h := c.hash(fmt.Sprintf("%s%d", server, i))
c.servers[h] = server
c.clients.ReplaceOrInsert(item{h})
}
}

// Get returns the server that owns the given client.
// As described in https://en.wikipedia.org/wiki/Consistent_hashing
// It returns ErrNoHosts if the ring has no servers in it.
func (c *Consistent) Get(client string) (string, error) {
c.RLock()
defer c.RUnlock()

if c.clients.Len() == 0 {
return "", ErrNoHosts
}

h := c.hash(client)
var foundItem btree.Item
c.clients.AscendGreaterOrEqual(item{h}, func(i btree.Item) bool {
foundItem = i
return false // stop the iteration
})

if foundItem == nil {
// If no host found, wrap around to the first one.
foundItem = c.clients.Min()
}

host := c.servers[foundItem.(item).value]

return host, nil
}

// GetLeast returns the least loaded host that can serve the key.
// It uses Consistent Hashing With Bounded loads.
// https://research.googleblog.com/2017/04/consistent-hashing-with-bounded-loads.html
// It returns ErrNoHosts if the ring has no hosts in it.
func (c *Consistent) GetLeast(client string) (string, error) {
c.RLock()
defer c.RUnlock()

if c.clients.Len() == 0 {
return "", ErrNoHosts
}

h := c.hash(client)
idx := c.search(h)

i := idx
for {
x := item{uint64(i)}
key := c.clients.Get(x)
if key != nil {
host := c.servers[key.(*item).value]
if c.loadOK(host) {
return host, nil
}
i++
if i >= c.clients.Len() {
i = 0
}
} else {
return client, nil
}
}
}

func (c *Consistent) search(key uint64) int {
idx := 0
found := false

c.clients.Ascend(func(i btree.Item) bool {
if i.(item).value >= key {
found = true
return false // stop the iteration
}
idx++
return true
})

if !found {
idx = 0
}

return idx
}

// Sets the load of `server` to the given `load`
func (c *Consistent) UpdateLoad(server string, load int64) {
c.Lock()
defer c.Unlock()

if _, ok := c.loadMap[server]; !ok {
return
}
c.totalLoad -= c.loadMap[server].Load
c.loadMap[server].Load = load
c.totalLoad += load
}

// Increments the load of host by 1
//
// should only be used with if you obtained a host with GetLeast
func (c *Consistent) Inc(server string) {
c.Lock()
defer c.Unlock()

if _, ok := c.loadMap[server]; !ok {
return
}
atomic.AddInt64(&c.loadMap[server].Load, 1)
atomic.AddInt64(&c.totalLoad, 1)
}

// Decrements the load of host by 1
//
// should only be used with if you obtained a host with GetLeast
func (c *Consistent) Done(server string) {
c.Lock()
defer c.Unlock()

if _, ok := c.loadMap[server]; !ok {
return
}
atomic.AddInt64(&c.loadMap[server].Load, -1)
atomic.AddInt64(&c.totalLoad, -1)
}

// Deletes host from the ring
func (c *Consistent) Remove(server string) bool {
c.Lock()
defer c.Unlock()

for i := 0; i < c.replicationFactor; i++ {
h := c.hash(fmt.Sprintf("%s%d", server, i))
delete(c.servers, h)
c.delSlice(h)
}
delete(c.loadMap, server)
return true
}

// Return the list of servers in the ring
func (c *Consistent) Servers() (servers []string) {
c.RLock()
defer c.RUnlock()
for k := range c.loadMap {
servers = append(servers, k)
}
return servers
}

// Returns the loads of all the hosts
func (c *Consistent) GetLoads() map[string]int64 {
loads := map[string]int64{}

for k, v := range c.loadMap {
loads[k] = v.Load
}
return loads
}

// Returns the maximum load of the single host
// which is:
// (total_load/number_of_hosts)*1.25
// total_load = is the total number of active requests served by hosts
// for more info:
// https://research.googleblog.com/2017/04/consistent-hashing-with-bounded-loads.html
func (c *Consistent) MaxLoad() int64 {
if c.totalLoad == 0 {
c.totalLoad = 1
}
var avgLoadPerNode float64
avgLoadPerNode = float64(c.totalLoad / int64(len(c.loadMap)))
if avgLoadPerNode == 0 {
avgLoadPerNode = 1
}
avgLoadPerNode = math.Ceil(avgLoadPerNode * 1.25)
return int64(avgLoadPerNode)
}

func (c *Consistent) loadOK(server string) bool {
// a safety check if someone performed c.Done more than needed
if c.totalLoad < 0 {
c.totalLoad = 0
}

var avgLoadPerNode float64
avgLoadPerNode = float64((c.totalLoad + 1) / int64(len(c.loadMap)))
if avgLoadPerNode == 0 {
avgLoadPerNode = 1
}
avgLoadPerNode = math.Ceil(avgLoadPerNode * 1.25)

bserver, ok := c.loadMap[server]
if !ok {
panic(fmt.Sprintf("given host(%s) not in loadsMap", bserver.Name))
}

if float64(bserver.Load)+1 <= avgLoadPerNode {
return true
}

return false
}

func (c *Consistent) delSlice(val uint64) {
c.clients.Delete(item{val})
}

func (c *Consistent) hash(key string) uint64 {
out := blake2b.Sum512([]byte(key))
return binary.LittleEndian.Uint64(out[:])
}
57 changes: 57 additions & 0 deletions controller/sharding/sharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ import (
"encoding/json"

"github.com/argoproj/argo-cd/v2/common"
"github.com/argoproj/argo-cd/v2/controller/sharding/consistent"
"github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"
slices "golang.org/x/exp/slices"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -81,6 +83,8 @@ func GetDistributionFunction(clusters clusterAccessor, shardingAlgorithm string,
distributionFunction = RoundRobinDistributionFunction(clusters, replicasCount)
case common.LegacyShardingAlgorithm:
distributionFunction = LegacyDistributionFunction(replicasCount)
case common.ConsistentHashingWithBoundedLoadsAlgorithm:
distributionFunction = ConsistentHashingWithBoundedLoadsDistributionFunction(clusters, replicasCount)
default:
log.Warnf("distribution type %s is not supported, defaulting to %s", shardingAlgorithm, common.DefaultShardingAlgorithm)
}
Expand Down Expand Up @@ -155,6 +159,59 @@ func RoundRobinDistributionFunction(clusters clusterAccessor, replicas int) Dist
}
}

// ConsistentHashingWithBoundedLoadsDistributionFunction returns a DistributionFunction using an almost homogeneous distribution algorithm:
// for a given cluster the function will return the shard number based on a consistent hashing with bounded loads algorithm.
// This function ensures an almost homogenous distribution: each shards got assigned the fairly similar number of
// clusters +/-10% , but with it is resilient to sharding and/or number of clusters changes.
func ConsistentHashingWithBoundedLoadsDistributionFunction(clusters clusterAccessor, replicas int) DistributionFunction {
return func(c *v1alpha1.Cluster) int {
if replicas > 0 {
if c == nil { // in-cluster does not necessarly have a secret assigned. So we are receiving a nil cluster here.
return 0
}

// if Shard is manually set and the assigned value is lower than the number of replicas,
// then its value is returned otherwise it is the default calculated value
if c.Shard != nil && int(*c.Shard) < replicas {
return int(*c.Shard)
} else {
// if the cluster is not in the clusters list anymore, we should unassign it from any shard, so we
// return the reserved value of -1
if !slices.Contains(clusters(), c) {
log.Warnf("Cluster with id=%s not found in cluster map.", c.ID)
return -1
}
consistentHashing := createConsistentHashingWithBoundLoads(replicas)
clusterIndex, err := consistentHashing.Get(c.ID)
if err != nil {
log.Warnf("Cluster with id=%s not found in cluster map.", c.ID)
return -1
}
shard, err := strconv.Atoi(clusterIndex)
if err != nil {
log.Errorf("Consistent Hashing was supposed to return a shard index but it returned %d", err)
return -1
}
log.Debugf("Cluster with id=%s will be processed by shard %d", c.ID, shard)
return shard
}
}
log.Warnf("The number of replicas (%d) is lower than 1", replicas)
return -1
}
}

func createConsistentHashingWithBoundLoads(replicas int) *consistent.Consistent {
consistentHashing := consistent.New()
// Adding a shard with id "-1" as a reserved value for clusters that does not have an assigned shard
// this happens for clusters that are removed for the clusters list
//consistentHashing.Add("-1")
for i := 0; i < replicas; i++ {
consistentHashing.Add(strconv.Itoa(i))
}
return consistentHashing
}

// NoShardingDistributionFunction returns a DistributionFunction that will process all cluster by shard 0
// the function is created for API compatibility purposes and is not supposed to be activated.
func NoShardingDistributionFunction() DistributionFunction {
Expand Down
Loading

0 comments on commit 5cf46ad

Please sign in to comment.