-
Notifications
You must be signed in to change notification settings - Fork 5.5k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Adds consistent hashing with bound loads sharding algorithm
Signed-off-by: Akram Ben Aissi <akram.benaissi@gmail.com>
- Loading branch information
Showing
6 changed files
with
459 additions
and
5 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,288 @@ | ||
// An implementation of Consistent Hashing and | ||
// Consistent Hashing With Bounded Loads. | ||
// | ||
// https://en.wikipedia.org/wiki/Consistent_hashing | ||
// | ||
// https://research.googleblog.com/2017/04/consistent-hashing-with-bounded-loads.html | ||
package consistent | ||
|
||
import ( | ||
"encoding/binary" | ||
"errors" | ||
"fmt" | ||
"math" | ||
"sync" | ||
"sync/atomic" | ||
|
||
"github.com/google/btree" | ||
|
||
blake2b "github.com/minio/blake2b-simd" | ||
) | ||
|
||
var ErrNoHosts = errors.New("no hosts added") | ||
|
||
type Host struct { | ||
Name string | ||
Load int64 | ||
} | ||
|
||
type Consistent struct { | ||
servers map[uint64]string | ||
clients *btree.BTree | ||
loadMap map[string]*Host | ||
totalLoad int64 | ||
replicationFactor int | ||
|
||
sync.RWMutex | ||
} | ||
|
||
type item struct { | ||
value uint64 | ||
} | ||
|
||
func (i item) Less(than btree.Item) bool { | ||
return i.value < than.(item).value | ||
} | ||
|
||
func New() *Consistent { | ||
return &Consistent{ | ||
servers: map[uint64]string{}, | ||
clients: btree.New(2), | ||
loadMap: map[string]*Host{}, | ||
replicationFactor: 1000, | ||
} | ||
} | ||
|
||
func NewWithReplicationFactor(replicationFactor int) *Consistent { | ||
return &Consistent{ | ||
servers: map[uint64]string{}, | ||
clients: btree.New(2), | ||
loadMap: map[string]*Host{}, | ||
replicationFactor: replicationFactor, | ||
} | ||
} | ||
func (c *Consistent) Add(server string) { | ||
c.Lock() | ||
defer c.Unlock() | ||
|
||
if _, ok := c.loadMap[server]; ok { | ||
return | ||
} | ||
|
||
c.loadMap[server] = &Host{Name: server, Load: 0} | ||
for i := 0; i < c.replicationFactor; i++ { | ||
h := c.hash(fmt.Sprintf("%s%d", server, i)) | ||
c.servers[h] = server | ||
c.clients.ReplaceOrInsert(item{h}) | ||
} | ||
} | ||
|
||
// Get returns the server that owns the given client. | ||
// As described in https://en.wikipedia.org/wiki/Consistent_hashing | ||
// It returns ErrNoHosts if the ring has no servers in it. | ||
func (c *Consistent) Get(client string) (string, error) { | ||
c.RLock() | ||
defer c.RUnlock() | ||
|
||
if c.clients.Len() == 0 { | ||
return "", ErrNoHosts | ||
} | ||
|
||
h := c.hash(client) | ||
var foundItem btree.Item | ||
c.clients.AscendGreaterOrEqual(item{h}, func(i btree.Item) bool { | ||
foundItem = i | ||
return false // stop the iteration | ||
}) | ||
|
||
if foundItem == nil { | ||
// If no host found, wrap around to the first one. | ||
foundItem = c.clients.Min() | ||
} | ||
|
||
host := c.servers[foundItem.(item).value] | ||
|
||
return host, nil | ||
} | ||
|
||
// GetLeast returns the least loaded host that can serve the key. | ||
// It uses Consistent Hashing With Bounded loads. | ||
// https://research.googleblog.com/2017/04/consistent-hashing-with-bounded-loads.html | ||
// It returns ErrNoHosts if the ring has no hosts in it. | ||
func (c *Consistent) GetLeast(client string) (string, error) { | ||
c.RLock() | ||
defer c.RUnlock() | ||
|
||
if c.clients.Len() == 0 { | ||
return "", ErrNoHosts | ||
} | ||
|
||
h := c.hash(client) | ||
idx := c.search(h) | ||
|
||
i := idx | ||
for { | ||
x := item{uint64(i)} | ||
key := c.clients.Get(x) | ||
if key != nil { | ||
host := c.servers[key.(*item).value] | ||
if c.loadOK(host) { | ||
return host, nil | ||
} | ||
i++ | ||
if i >= c.clients.Len() { | ||
i = 0 | ||
} | ||
} else { | ||
return client, nil | ||
} | ||
} | ||
} | ||
|
||
func (c *Consistent) search(key uint64) int { | ||
idx := 0 | ||
found := false | ||
|
||
c.clients.Ascend(func(i btree.Item) bool { | ||
if i.(item).value >= key { | ||
found = true | ||
return false // stop the iteration | ||
} | ||
idx++ | ||
return true | ||
}) | ||
|
||
if !found { | ||
idx = 0 | ||
} | ||
|
||
return idx | ||
} | ||
|
||
// Sets the load of `server` to the given `load` | ||
func (c *Consistent) UpdateLoad(server string, load int64) { | ||
c.Lock() | ||
defer c.Unlock() | ||
|
||
if _, ok := c.loadMap[server]; !ok { | ||
return | ||
} | ||
c.totalLoad -= c.loadMap[server].Load | ||
c.loadMap[server].Load = load | ||
c.totalLoad += load | ||
} | ||
|
||
// Increments the load of host by 1 | ||
// | ||
// should only be used with if you obtained a host with GetLeast | ||
func (c *Consistent) Inc(server string) { | ||
c.Lock() | ||
defer c.Unlock() | ||
|
||
if _, ok := c.loadMap[server]; !ok { | ||
return | ||
} | ||
atomic.AddInt64(&c.loadMap[server].Load, 1) | ||
atomic.AddInt64(&c.totalLoad, 1) | ||
} | ||
|
||
// Decrements the load of host by 1 | ||
// | ||
// should only be used with if you obtained a host with GetLeast | ||
func (c *Consistent) Done(server string) { | ||
c.Lock() | ||
defer c.Unlock() | ||
|
||
if _, ok := c.loadMap[server]; !ok { | ||
return | ||
} | ||
atomic.AddInt64(&c.loadMap[server].Load, -1) | ||
atomic.AddInt64(&c.totalLoad, -1) | ||
} | ||
|
||
// Deletes host from the ring | ||
func (c *Consistent) Remove(server string) bool { | ||
c.Lock() | ||
defer c.Unlock() | ||
|
||
for i := 0; i < c.replicationFactor; i++ { | ||
h := c.hash(fmt.Sprintf("%s%d", server, i)) | ||
delete(c.servers, h) | ||
c.delSlice(h) | ||
} | ||
delete(c.loadMap, server) | ||
return true | ||
} | ||
|
||
// Return the list of servers in the ring | ||
func (c *Consistent) Servers() (servers []string) { | ||
c.RLock() | ||
defer c.RUnlock() | ||
for k := range c.loadMap { | ||
servers = append(servers, k) | ||
} | ||
return servers | ||
} | ||
|
||
// Returns the loads of all the hosts | ||
func (c *Consistent) GetLoads() map[string]int64 { | ||
loads := map[string]int64{} | ||
|
||
for k, v := range c.loadMap { | ||
loads[k] = v.Load | ||
} | ||
return loads | ||
} | ||
|
||
// Returns the maximum load of the single host | ||
// which is: | ||
// (total_load/number_of_hosts)*1.25 | ||
// total_load = is the total number of active requests served by hosts | ||
// for more info: | ||
// https://research.googleblog.com/2017/04/consistent-hashing-with-bounded-loads.html | ||
func (c *Consistent) MaxLoad() int64 { | ||
if c.totalLoad == 0 { | ||
c.totalLoad = 1 | ||
} | ||
var avgLoadPerNode float64 | ||
avgLoadPerNode = float64(c.totalLoad / int64(len(c.loadMap))) | ||
if avgLoadPerNode == 0 { | ||
avgLoadPerNode = 1 | ||
} | ||
avgLoadPerNode = math.Ceil(avgLoadPerNode * 1.25) | ||
return int64(avgLoadPerNode) | ||
} | ||
|
||
func (c *Consistent) loadOK(server string) bool { | ||
// a safety check if someone performed c.Done more than needed | ||
if c.totalLoad < 0 { | ||
c.totalLoad = 0 | ||
} | ||
|
||
var avgLoadPerNode float64 | ||
avgLoadPerNode = float64((c.totalLoad + 1) / int64(len(c.loadMap))) | ||
if avgLoadPerNode == 0 { | ||
avgLoadPerNode = 1 | ||
} | ||
avgLoadPerNode = math.Ceil(avgLoadPerNode * 1.25) | ||
|
||
bserver, ok := c.loadMap[server] | ||
if !ok { | ||
panic(fmt.Sprintf("given host(%s) not in loadsMap", bserver.Name)) | ||
} | ||
|
||
if float64(bserver.Load)+1 <= avgLoadPerNode { | ||
return true | ||
} | ||
|
||
return false | ||
} | ||
|
||
func (c *Consistent) delSlice(val uint64) { | ||
c.clients.Delete(item{val}) | ||
} | ||
|
||
func (c *Consistent) hash(key string) uint64 { | ||
out := blake2b.Sum512([]byte(key)) | ||
return binary.LittleEndian.Uint64(out[:]) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.