Skip to content

Commit

Permalink
feat(loadbalance): do not cache all the keys for Consistent Hash (#1370)
Browse files Browse the repository at this point in the history
  • Loading branch information
DMwangnima committed Jun 7, 2024
1 parent b9d6a55 commit bb385e7
Show file tree
Hide file tree
Showing 2 changed files with 187 additions and 197 deletions.
220 changes: 70 additions & 150 deletions pkg/loadbalance/consist.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,13 @@ package loadbalance
import (
"context"
"sort"
"strconv"
"sync"
"sync/atomic"
"time"

"github.com/bytedance/gopkg/util/xxhash3"
"golang.org/x/sync/singleflight"

"github.com/cloudwego/kitex/pkg/discovery"
"github.com/cloudwego/kitex/pkg/gofunc"
"github.com/cloudwego/kitex/pkg/utils"
)

Expand Down Expand Up @@ -76,6 +73,7 @@ type ConsistentHashOption struct {
// It is recommended to set it to true, but be careful to reduce the VirtualFactor appropriately
Weighted bool

// Deprecated: This implementation will not cache all the keys anymore, ExpireDuration will not take effect
// Whether or not to perform expiration processing
// The implementation will cache all the keys
// If never expired it may cause memory to keep growing and eventually OOM
Expand All @@ -88,20 +86,14 @@ type ConsistentHashOption struct {
// NewConsistentHashOption creates a default ConsistentHashOption.
func NewConsistentHashOption(f KeyFunc) ConsistentHashOption {
return ConsistentHashOption{
GetKey: f,
Replica: 0,
VirtualFactor: 100,
Weighted: true,
ExpireDuration: 2 * time.Minute,
GetKey: f,
Replica: 0,
VirtualFactor: 100,
Weighted: true,
}
}

var (
consistPickerPool sync.Pool
consistBalancers []*consistBalancer
consistBalancersLock sync.RWMutex
consistBalancerDaemonOnce sync.Once
)
var consistPickerPool sync.Pool

func init() {
consistPickerPool.New = newConsistPicker
Expand All @@ -119,13 +111,9 @@ type realNode struct {
type consistResult struct {
Primary discovery.Instance
Replicas []discovery.Instance
Touch atomic.Value
}

type consistInfo struct {
cachedConsistResult sync.Map
sfg singleflight.Group // To prevent multiple builds on the first request for the same key

realNodes []realNode
virtualNodes []virtualNode
}
Expand All @@ -147,10 +135,10 @@ func (v *vNodeType) Swap(i, j int) {
}

type consistPicker struct {
cb *consistBalancer
info *consistInfo
index int
result *consistResult
cb *consistBalancer
info *consistInfo
// index int
// result *consistResult
}

func newConsistPicker() interface{} {
Expand All @@ -160,8 +148,8 @@ func newConsistPicker() interface{} {
func (cp *consistPicker) zero() {
cp.info = nil
cp.cb = nil
cp.index = 0
cp.result = nil
// cp.index = 0
// cp.result = nil
}

func (cp *consistPicker) Recycle() {
Expand All @@ -174,43 +162,33 @@ func (cp *consistPicker) Next(ctx context.Context, request interface{}) discover
if len(cp.info.realNodes) == 0 {
return nil
}
if cp.result == nil {
key := cp.cb.opt.GetKey(ctx, request)
if key == "" {
return nil
}
cp.result = cp.getConsistResult(xxhash3.HashString(key))
cp.index = 0
return cp.result.Primary
}
if cp.index < len(cp.result.Replicas) {
cp.index++
return cp.result.Replicas[cp.index-1]
}
return nil
}

func (cp *consistPicker) getConsistResult(key uint64) *consistResult {
var cr *consistResult
cri, ok := cp.info.cachedConsistResult.Load(key)
if !ok {
cri, _, _ = cp.info.sfg.Do(strconv.FormatUint(key, 10), func() (interface{}, error) {
cr := buildConsistResult(cp.cb, cp.info, key)
if cp.cb.opt.ExpireDuration > 0 {
cr.Touch.Store(time.Now())
}
return cr, nil
})
cp.info.cachedConsistResult.Store(key, cri)
}
cr = cri.(*consistResult)
if cp.cb.opt.ExpireDuration > 0 {
cr.Touch.Store(time.Now())
key := cp.cb.opt.GetKey(ctx, request)
if key == "" {
return nil
}
return cr
res := buildConsistResult(cp.info, xxhash3.HashString(key))
return res.Primary
// Todo(DMwangnima): Optimise Replica-related logic
// This comment part is previous implementation considering connecting to Replica
// Since we would create a new picker each time, the Replica logic is unreachable, so just comment it out for now

//if cp.result == nil {
// key := cp.cb.opt.GetKey(ctx, request)
// if key == "" {
// return nil
// }
// cp.result = buildConsistResult(cp.cb, cp.info, xxhash3.HashString(key))
// //cp.index = 0
// return cp.result.Primary
//}
//if cp.index < len(cp.result.Replicas) {
// cp.index++
// return cp.result.Replicas[cp.index-1]
//}
//return nil
}

func buildConsistResult(cb *consistBalancer, info *consistInfo, key uint64) *consistResult {
func buildConsistResult(info *consistInfo, key uint64) *consistResult {
cr := &consistResult{}
index := sort.Search(len(info.virtualNodes), func(i int) bool {
return info.virtualNodes[i].hash > key
Expand All @@ -220,42 +198,45 @@ func buildConsistResult(cb *consistBalancer, info *consistInfo, key uint64) *con
index = 0
}
cr.Primary = info.virtualNodes[index].RealNode.Ins
replicas := int(cb.opt.Replica)
// remove the primary node
if len(info.realNodes)-1 < replicas {
replicas = len(info.realNodes) - 1
}
if replicas > 0 {
used := make(map[discovery.Instance]struct{}, replicas) // should be 1 + replicas - 1
used[cr.Primary] = struct{}{}
cr.Replicas = make([]discovery.Instance, replicas)
for i := 0; i < replicas; i++ {
// find the next instance which is not used
// replicas are adjusted before so we can guarantee that we can find one
for {
index++
if index == len(info.virtualNodes) {
index = 0
}
ins := info.virtualNodes[index].RealNode.Ins
if _, ok := used[ins]; !ok {
used[ins] = struct{}{}
cr.Replicas[i] = ins
break
}
}
}
}
return cr
// Todo(DMwangnima): Optimise Replica-related logic
// This comment part is previous implementation considering connecting to Replica
// Since we would create a new picker each time, the Replica logic is unreachable, so just comment it out
// for better performance

//replicas := int(cb.opt.Replica)
//// remove the primary node
//if len(info.realNodes)-1 < replicas {
// replicas = len(info.realNodes) - 1
//}
//if replicas > 0 {
// used := make(map[discovery.Instance]struct{}, replicas) // should be 1 + replicas - 1
// used[cr.Primary] = struct{}{}
// cr.Replicas = make([]discovery.Instance, replicas)
// for i := 0; i < replicas; i++ {
// // find the next instance which is not used
// // replicas are adjusted before so we can guarantee that we can find one
// for {
// index++
// if index == len(info.virtualNodes) {
// index = 0
// }
// ins := info.virtualNodes[index].RealNode.Ins
// if _, ok := used[ins]; !ok {
// used[ins] = struct{}{}
// cr.Replicas[i] = ins
// break
// }
// }
// }
//}
//return cr
}

type consistBalancer struct {
cachedConsistInfo sync.Map
// The main purpose of this lock is to improve performance and prevent Change from being performed while expire
// which may cause Change to do a lot of extra computation and memory allocation
updateLock sync.Mutex
opt ConsistentHashOption
sfg singleflight.Group
opt ConsistentHashOption
sfg singleflight.Group
}

// NewConsistBalancer creates a new consist balancer with the given option.
Expand All @@ -269,47 +250,9 @@ func NewConsistBalancer(opt ConsistentHashOption) Loadbalancer {
cb := &consistBalancer{
opt: opt,
}
if cb.opt.ExpireDuration > 0 {
cb.AddToDaemon()
}
return cb
}

// AddToDaemon adds a balancer to the daemon expire routine.
func (cb *consistBalancer) AddToDaemon() {
// do delete func
consistBalancerDaemonOnce.Do(func() {
gofunc.GoFunc(context.Background(), func() {
for range time.Tick(2 * time.Minute) {
consistBalancersLock.RLock()
now := time.Now()
for _, lb := range consistBalancers {
if lb.opt.ExpireDuration > 0 {
lb.updateLock.Lock()
lb.cachedConsistInfo.Range(func(key, value interface{}) bool {
ci := value.(*consistInfo)
ci.cachedConsistResult.Range(func(key, value interface{}) bool {
t := value.(*consistResult).Touch.Load().(time.Time)
if now.After(t.Add(cb.opt.ExpireDuration)) {
ci.cachedConsistResult.Delete(key)
}
return true
})
return true
})
lb.updateLock.Unlock()
}
}
consistBalancersLock.RUnlock()
}
})
})

consistBalancersLock.Lock()
consistBalancers = append(consistBalancers, cb)
consistBalancersLock.Unlock()
}

// GetPicker implements the Loadbalancer interface.
func (cb *consistBalancer) GetPicker(e discovery.Result) Picker {
var ci *consistInfo
Expand Down Expand Up @@ -413,25 +356,6 @@ func (cb *consistBalancer) getVirtualNodeLen(rNode realNode) int {

func (cb *consistBalancer) updateConsistInfo(e discovery.Result) {
newInfo := cb.newConsistInfo(e)
infoI, loaded := cb.cachedConsistInfo.LoadOrStore(e.CacheKey, newInfo)
if !loaded {
return
}
info := infoI.(*consistInfo)
// Warm up.
// The reason for not modifying info directly is that there is no guarantee of concurrency security.
info.cachedConsistResult.Range(func(key, value interface{}) bool {
cr := buildConsistResult(cb, newInfo, key.(uint64))
if cb.opt.ExpireDuration > 0 {
t := value.(*consistResult).Touch.Load().(time.Time)
if time.Now().After(t.Add(cb.opt.ExpireDuration)) {
return true
}
cr.Touch.Store(t)
}
newInfo.cachedConsistResult.Store(key, cr)
return true
})
cb.cachedConsistInfo.Store(e.CacheKey, newInfo)
}

Expand All @@ -442,9 +366,7 @@ func (cb *consistBalancer) Rebalance(change discovery.Change) {
}
// TODO: Use TreeMap to optimize performance when updating.
// Now, due to the lack of a good red-black tree implementation, we can only build the full amount once per update.
cb.updateLock.Lock()
cb.updateConsistInfo(change.Result)
cb.updateLock.Unlock()
}

// Delete implements the Rebalancer interface.
Expand All @@ -454,9 +376,7 @@ func (cb *consistBalancer) Delete(change discovery.Change) {
}
// FIXME: If Delete and Rebalance occur together (Discovery OnDelete and OnChange are triggered at the same time),
// it may cause the delete to fail and eventually lead to a resource leak.
cb.updateLock.Lock()
cb.cachedConsistInfo.Delete(change.Result.CacheKey)
cb.updateLock.Unlock()
}

func (cb *consistBalancer) Name() string {
Expand Down
Loading

0 comments on commit bb385e7

Please sign in to comment.