Skip to content

Commit

Permalink
add comment and rename
Browse files Browse the repository at this point in the history
  • Loading branch information
luluz66 committed May 9, 2024
1 parent 610eeaf commit b80a2ba
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 49 deletions.
105 changes: 61 additions & 44 deletions enterprise/server/raft/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"flag"
"math"
"slices"
"sort"
"sync"
"time"

Expand All @@ -20,8 +19,8 @@ import (
)

var (
enableDriver = flag.Bool("cache.raft.enable_driver", true, "If true, enable placement driver")
minNumReplicasPerRange = flag.Int("cache.raft.min_num_replicas_per_range", 3, "The minimum number of replicas each range should have")
enableDriver = flag.Bool("cache.raft.enable_driver", true, "If true, enable placement driver")
minReplicasPerRange = flag.Int("cache.raft.min_replicas_per_range", 3, "The minimum number of replicas each range should have")
)

type DriverAction int
Expand All @@ -31,7 +30,7 @@ const (
DriverNoop
DriverRemoveReplica
DriverRemoveDeadReplica
DirverAddReplica
DriverAddReplica
DriverReplaceDeadReplica
DriverConsiderRebalance
)
Expand All @@ -55,7 +54,7 @@ func (a DriverAction) Priority() float64 {
switch a {
case DriverReplaceDeadReplica:
return 400
case DirverAddReplica:
case DriverAddReplica:
return 300
case DriverRemoveDeadReplica:
return 200
Expand All @@ -69,6 +68,25 @@ func (a DriverAction) Priority() float64 {
}
}

func (a DriverAction) String() string {
switch a {
case DriverNoop:
return "no-op"
case DriverRemoveReplica:
return "remove-replica"
case DriverRemoveDeadReplica:
return "remove-dead-replica"
case DriverAddReplica:
return "add-replica"
case DriverReplaceDeadReplica:
return "replace-dead-replica"
case DriverConsiderRebalance:
return "consider-rebalance"
default:
return "unknown"
}
}

type IReplica interface {
RangeDescriptor() *rfpb.RangeDescriptor
ReplicaID() uint64
Expand All @@ -82,33 +100,40 @@ type IStore interface {
RemoveReplica(ctx context.Context, req *rfpb.RemoveReplicaRequest) (*rfpb.RemoveReplicaResponse, error)
}

// computeQuorum computes a quorum, which a majority of members from a peer set.
// In raft, when a quorum of nodes is unavailable, the cluster becomes
// unavailable.
func computeQuorum(numNodes int) int {
return (numNodes / 2) + 1
}

func (rq *ReplicateQueue) computeAction(replicas []*rfpb.ReplicaDescriptor) (DriverAction, float64) {
// computeAction computes the action needed and its priority.
func (rq *Queue) computeAction(replicas []*rfpb.ReplicaDescriptor) (DriverAction, float64) {
if rq.storeMap == nil {
action := DriverNoop
return action, action.Priority()
}
curReplicas := len(replicas)
desiredQuorum := computeQuorum(*minNumReplicasPerRange)
desiredQuorum := computeQuorum(*minReplicasPerRange)
quorum := computeQuorum(curReplicas)

if curReplicas < *minNumReplicasPerRange {
action := DirverAddReplica
if curReplicas < *minReplicasPerRange {
action := DriverAddReplica
adjustedPriority := action.Priority() + float64(desiredQuorum-curReplicas)
return action, adjustedPriority
}
replicasByStatus := rq.storeMap.DivideByStatus(replicas)
numLiveReplicas := len(replicasByStatus.LiveReplicas) + len(replicasByStatus.SuspectReplicas)
numDeadReplicas := len(replicasByStatus.DeadReplicas)

if numLiveReplicas < quorum {
// The cluster is unavailable since we don't have enough live nodes.
// There is no point of doing anything right now.
action := DriverNoop
return action, action.Priority()
}

if curReplicas <= *minNumReplicasPerRange && numDeadReplicas > 0 {
if curReplicas <= *minReplicasPerRange && numDeadReplicas > 0 {
action := DriverReplaceDeadReplica
return action, action.Priority()
}
Expand All @@ -118,7 +143,7 @@ func (rq *ReplicateQueue) computeAction(replicas []*rfpb.ReplicaDescriptor) (Dri
return action, action.Priority()
}

if curReplicas > *minNumReplicasPerRange {
if curReplicas > *minReplicasPerRange {
action := DriverRemoveDeadReplica
ajustedPriority := action.Priority() - float64(curReplicas%2)
return action, ajustedPriority
Expand All @@ -134,7 +159,7 @@ type pqItem struct {

priority float64
insertTime time.Time
// The index is needed by update and is maintained by the heap.Interface methods.

index int // The index of the item in the heap.
processing bool
requeue bool
Expand Down Expand Up @@ -178,7 +203,9 @@ func (pq *priorityQueue) getItemWithMinPriority() *pqItem {
return old[n-1]
}

type ReplicateQueue struct {
// The Queue is responsible for up-replicate, down-replicate and reblance ranges
// across the stores.
type Queue struct {
pq *priorityQueue
storeMap *storemap.StoreMap
store IStore
Expand All @@ -191,17 +218,17 @@ type ReplicateQueue struct {
started bool
}

func NewReplicateQueue(store IStore, gossipManager interfaces.GossipService) *ReplicateQueue {
func NewQueue(store IStore, gossipManager interfaces.GossipService) *Queue {
storeMap := storemap.New(gossipManager)
return &ReplicateQueue{
return &Queue{
storeMap: storeMap,
pq: &priorityQueue{},
store: store,
maxSize: 100,
}
}

func (rq *ReplicateQueue) shouldQueue(repl IReplica) (bool, float64) {
func (rq *Queue) shouldQueue(repl IReplica) (bool, float64) {
rd := repl.RangeDescriptor()

if !rq.store.HaveLease(rd.GetRangeId()) {
Expand All @@ -221,7 +248,10 @@ func (rq *ReplicateQueue) shouldQueue(repl IReplica) (bool, float64) {
return false, 0
}

func (rq *ReplicateQueue) MaybeAdd(replica IReplica) {
// MaybeAdd adds a replica to the queue if the store has the lease for the range,
// and there is work needs to be done.
// When the queue is full, it deletes the least important replica from the queue.
func (rq *Queue) MaybeAdd(replica IReplica) {
shouldQueue, priority := rq.shouldQueue(replica)
if !shouldQueue {
return
Expand Down Expand Up @@ -260,7 +290,7 @@ func (rq *ReplicateQueue) MaybeAdd(replica IReplica) {
}
}

func (rq *ReplicateQueue) remove(item *pqItem) {
func (rq *Queue) remove(item *pqItem) {
if item.processing {
item.requeue = false
return
Expand All @@ -271,7 +301,7 @@ func (rq *ReplicateQueue) remove(item *pqItem) {
rq.pqItemMap.Delete(item.shardID)
}

func (rq *ReplicateQueue) Start() {
func (rq *Queue) Start() {
rq.mu.Lock()
started := rq.started
rq.mu.Unlock()
Expand Down Expand Up @@ -316,7 +346,7 @@ func (rq *ReplicateQueue) Start() {
}
}

func (rq *ReplicateQueue) Stop() {
func (rq *Queue) Stop() {
rq.mu.Lock()
defer rq.mu.Unlock()
if !rq.started {
Expand All @@ -326,6 +356,7 @@ func (rq *ReplicateQueue) Stop() {
rq.started = false
}

// findReplacingReplica finds a dead replica to be removed.
func findReplacingReplica(replicas []*rfpb.ReplicaDescriptor, replicaByStatus *storemap.ReplicasByStatus) *rfpb.ReplicaDescriptor {
if len(replicaByStatus.DeadReplicas) == 0 {
// nothing to be removed
Expand All @@ -349,13 +380,12 @@ func storeHasReplica(node *rfpb.NodeDescriptor, existing []*rfpb.ReplicaDescript
return false
}

func (rq *ReplicateQueue) allocateTarget(existing []*rfpb.ReplicaDescriptor) *rfpb.NodeDescriptor {
// allocateTarget finds a target node for rd to up-replicate.
func (rq *Queue) allocateTarget(rd *rfpb.RangeDescriptor) *rfpb.NodeDescriptor {
storesWithStats := rq.storeMap.GetStoresWithStats()
// Rank stores
// check free bytes
var candidates []*candidate
for _, su := range storesWithStats.Usages {
if storeHasReplica(su.GetNode(), existing) {
if storeHasReplica(su.GetNode(), rd.GetReplicas()) {
continue
}
candidates = append(candidates, &candidate{
Expand All @@ -368,7 +398,6 @@ func (rq *ReplicateQueue) allocateTarget(existing []*rfpb.ReplicaDescriptor) *rf
if len(candidates) == 0 {
return nil
}
sort.Sort(sort.Reverse(byScore(candidates)))
slices.SortFunc(candidates, func(a, b *candidate) int {
if n := a.compare(b); n != 0 {
if n < 0 {
Expand All @@ -388,8 +417,8 @@ type change struct {
removeOp *rfpb.RemoveReplicaRequest
}

func (rq *ReplicateQueue) addReplica(rd *rfpb.RangeDescriptor) *change {
target := rq.allocateTarget(rd.GetReplicas())
func (rq *Queue) addReplica(rd *rfpb.RangeDescriptor) *change {
target := rq.allocateTarget(rd)
if target == nil {
log.Debugf("cannot find targets for range descriptor:%+v", rd)
return nil
Expand All @@ -403,7 +432,7 @@ func (rq *ReplicateQueue) addReplica(rd *rfpb.RangeDescriptor) *change {
}
}

func (rq *ReplicateQueue) replaceDeadReplica(rd *rfpb.RangeDescriptor, replicasByStatus *storemap.ReplicasByStatus) *change {
func (rq *Queue) replaceDeadReplica(rd *rfpb.RangeDescriptor, replicasByStatus *storemap.ReplicasByStatus) *change {
replacing := findReplacingReplica(rd.GetReplicas(), replicasByStatus)
if replacing == nil {
// nothing to remove
Expand All @@ -419,7 +448,7 @@ func (rq *ReplicateQueue) replaceDeadReplica(rd *rfpb.RangeDescriptor, replicasB
return change
}

func (rq *ReplicateQueue) applyChange(ctx context.Context, change *change) error {
func (rq *Queue) applyChange(ctx context.Context, change *change) error {
var rd *rfpb.RangeDescriptor
if change.addOp != nil {
rsp, err := rq.store.AddReplica(ctx, change.addOp)
Expand All @@ -445,7 +474,7 @@ func (rq *ReplicateQueue) applyChange(ctx context.Context, change *change) error

}

func (rq *ReplicateQueue) processReplica(repl IReplica) (bool, error) {
func (rq *Queue) processReplica(repl IReplica) (bool, error) {
rd := repl.RangeDescriptor()
if !rq.store.HaveLease(rd.GetRangeId()) {
// the store doesn't have the lease of this range.
Expand All @@ -460,7 +489,7 @@ func (rq *ReplicateQueue) processReplica(repl IReplica) (bool, error) {

switch action {
case DriverNoop:
case DirverAddReplica:
case DriverAddReplica:
log.Debugf("add replica: %d", repl.ShardID())
change = rq.addReplica(rd)
case DriverReplaceDeadReplica:
Expand Down Expand Up @@ -493,7 +522,7 @@ func (rq *ReplicateQueue) processReplica(repl IReplica) (bool, error) {
return true, err
}

func (rq *ReplicateQueue) postProcess(repl IReplica) {
func (rq *Queue) postProcess(repl IReplica) {
itemIface, ok := rq.pqItemMap.Load(repl.ShardID())
if !ok {
alert.UnexpectedEvent("unexpected_pq_item_not_found")
Expand Down Expand Up @@ -552,18 +581,6 @@ func (c *candidate) compare(o *candidate) float64 {
return 0
}

type byScore []*candidate

func (x byScore) Len() int { return len(x) }
func (x byScore) Less(i, j int) bool {
res := x[i].compare(x[j])
if res != 0 {
return res < 0
}
return x[i].usage.GetNode().GetNhid() < x[j].usage.GetNode().GetNhid()
}
func (x byScore) Swap(i, j int) { x[i], x[j] = x[j], x[i] }

func replicaCountMeanLevel(storesWithStats *storemap.StoresWithStats, su *rfpb.StoreUsage) meanLevel {
maxReplicaCount := aboveMeanReplicaCountThreshold(storesWithStats.ReplicaCount.Mean)
minReplicaCount := belowMeanReplicaCountThreshold(storesWithStats.ReplicaCount.Mean)
Expand Down
10 changes: 5 additions & 5 deletions enterprise/server/raft/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ type Store struct {
updateTagsWorker *updateTagsWorker
txnCoordinator *txn.Coordinator

replicateQueue *driver.ReplicateQueue
driverQueue *driver.Queue
}

// registryHolder implements NodeRegistryFactory. When nodeHost is created, it
Expand Down Expand Up @@ -208,7 +208,7 @@ func NewWithArgs(env environment.Env, rootDir string, nodeHost *dragonboat.NodeH

usages, err := usagetracker.New(s, gossipManager, s.NodeDescriptor(), partitions, s.AddEventListener())

s.replicateQueue = driver.NewReplicateQueue(s, gossipManager)
s.driverQueue = driver.NewQueue(s, gossipManager)

if err != nil {
return nil, err
Expand Down Expand Up @@ -436,7 +436,7 @@ func (s *Store) Start() error {
return nil
})
eg.Go(func() error {
s.replicateQueue.Start()
s.driverQueue.Start()
return nil
})

Expand All @@ -454,7 +454,7 @@ func (s *Store) Stop(ctx context.Context) error {
s.egCancel()
s.leaseKeeper.Stop()
s.liveness.Release()
s.replicateQueue.Stop()
s.driverQueue.Stop()
s.eg.Wait()
}
s.updateTagsWorker.Stop()
Expand Down Expand Up @@ -1855,7 +1855,7 @@ func (store *Store) scan(ctx context.Context) {
log.Debug("scan started")
replicas := store.getLeasedReplicas()
for _, repl := range replicas {
store.replicateQueue.MaybeAdd(repl)
store.driverQueue.MaybeAdd(repl)
}
}
}

0 comments on commit b80a2ba

Please sign in to comment.