diff --git a/enterprise/server/raft/driver/driver.go b/enterprise/server/raft/driver/driver.go index 61073340ac3..65d0693462f 100644 --- a/enterprise/server/raft/driver/driver.go +++ b/enterprise/server/raft/driver/driver.go @@ -7,7 +7,6 @@ import ( "flag" "math" "slices" - "sort" "sync" "time" @@ -20,8 +19,8 @@ import ( ) var ( - enableDriver = flag.Bool("cache.raft.enable_driver", true, "If true, enable placement driver") - minNumReplicasPerRange = flag.Int("cache.raft.min_num_replicas_per_range", 3, "The minimum number of replicas each range should have") + enableDriver = flag.Bool("cache.raft.enable_driver", true, "If true, enable placement driver") + minReplicasPerRange = flag.Int("cache.raft.min_replicas_per_range", 3, "The minimum number of replicas each range should have") ) type DriverAction int @@ -31,7 +30,7 @@ const ( DriverNoop DriverRemoveReplica DriverRemoveDeadReplica - DirverAddReplica + DriverAddReplica DriverReplaceDeadReplica DriverConsiderRebalance ) @@ -55,7 +54,7 @@ func (a DriverAction) Priority() float64 { switch a { case DriverReplaceDeadReplica: return 400 - case DirverAddReplica: + case DriverAddReplica: return 300 case DriverRemoveDeadReplica: return 200 @@ -69,6 +68,25 @@ func (a DriverAction) Priority() float64 { } } +func (a DriverAction) String() string { + switch a { + case DriverNoop: + return "no-op" + case DriverRemoveReplica: + return "remove-replica" + case DriverRemoveDeadReplica: + return "remove-dead-replica" + case DriverAddReplica: + return "add-replica" + case DriverReplaceDeadReplica: + return "replace-dead-replica" + case DriverConsiderRebalance: + return "consider-rebalance" + default: + return "unknown" + } +} + type IReplica interface { RangeDescriptor() *rfpb.RangeDescriptor ReplicaID() uint64 @@ -82,33 +100,40 @@ type IStore interface { RemoveReplica(ctx context.Context, req *rfpb.RemoveReplicaRequest) (*rfpb.RemoveReplicaResponse, error) } +// computeQuorum computes a quorum, which a majority of members from a peer set. +// In raft, when a quorum of nodes is unavailable, the cluster becomes +// unavailable. func computeQuorum(numNodes int) int { return (numNodes / 2) + 1 } -func (rq *ReplicateQueue) computeAction(replicas []*rfpb.ReplicaDescriptor) (DriverAction, float64) { +// computeAction computes the action needed and its priority. +func (rq *Queue) computeAction(replicas []*rfpb.ReplicaDescriptor) (DriverAction, float64) { if rq.storeMap == nil { action := DriverNoop return action, action.Priority() } curReplicas := len(replicas) - desiredQuorum := computeQuorum(*minNumReplicasPerRange) + desiredQuorum := computeQuorum(*minReplicasPerRange) quorum := computeQuorum(curReplicas) - if curReplicas < *minNumReplicasPerRange { - action := DirverAddReplica + if curReplicas < *minReplicasPerRange { + action := DriverAddReplica adjustedPriority := action.Priority() + float64(desiredQuorum-curReplicas) return action, adjustedPriority } replicasByStatus := rq.storeMap.DivideByStatus(replicas) numLiveReplicas := len(replicasByStatus.LiveReplicas) + len(replicasByStatus.SuspectReplicas) numDeadReplicas := len(replicasByStatus.DeadReplicas) + if numLiveReplicas < quorum { + // The cluster is unavailable since we don't have enough live nodes. + // There is no point of doing anything right now. action := DriverNoop return action, action.Priority() } - if curReplicas <= *minNumReplicasPerRange && numDeadReplicas > 0 { + if curReplicas <= *minReplicasPerRange && numDeadReplicas > 0 { action := DriverReplaceDeadReplica return action, action.Priority() } @@ -118,7 +143,7 @@ func (rq *ReplicateQueue) computeAction(replicas []*rfpb.ReplicaDescriptor) (Dri return action, action.Priority() } - if curReplicas > *minNumReplicasPerRange { + if curReplicas > *minReplicasPerRange { action := DriverRemoveDeadReplica ajustedPriority := action.Priority() - float64(curReplicas%2) return action, ajustedPriority @@ -134,7 +159,7 @@ type pqItem struct { priority float64 insertTime time.Time - // The index is needed by update and is maintained by the heap.Interface methods. + index int // The index of the item in the heap. processing bool requeue bool @@ -178,7 +203,9 @@ func (pq *priorityQueue) getItemWithMinPriority() *pqItem { return old[n-1] } -type ReplicateQueue struct { +// The Queue is responsible for up-replicate, down-replicate and reblance ranges +// across the stores. +type Queue struct { pq *priorityQueue storeMap *storemap.StoreMap store IStore @@ -191,9 +218,9 @@ type ReplicateQueue struct { started bool } -func NewReplicateQueue(store IStore, gossipManager interfaces.GossipService) *ReplicateQueue { +func NewQueue(store IStore, gossipManager interfaces.GossipService) *Queue { storeMap := storemap.New(gossipManager) - return &ReplicateQueue{ + return &Queue{ storeMap: storeMap, pq: &priorityQueue{}, store: store, @@ -201,7 +228,7 @@ func NewReplicateQueue(store IStore, gossipManager interfaces.GossipService) *Re } } -func (rq *ReplicateQueue) shouldQueue(repl IReplica) (bool, float64) { +func (rq *Queue) shouldQueue(repl IReplica) (bool, float64) { rd := repl.RangeDescriptor() if !rq.store.HaveLease(rd.GetRangeId()) { @@ -221,7 +248,10 @@ func (rq *ReplicateQueue) shouldQueue(repl IReplica) (bool, float64) { return false, 0 } -func (rq *ReplicateQueue) MaybeAdd(replica IReplica) { +// MaybeAdd adds a replica to the queue if the store has the lease for the range, +// and there is work needs to be done. +// When the queue is full, it deletes the least important replica from the queue. +func (rq *Queue) MaybeAdd(replica IReplica) { shouldQueue, priority := rq.shouldQueue(replica) if !shouldQueue { return @@ -260,7 +290,7 @@ func (rq *ReplicateQueue) MaybeAdd(replica IReplica) { } } -func (rq *ReplicateQueue) remove(item *pqItem) { +func (rq *Queue) remove(item *pqItem) { if item.processing { item.requeue = false return @@ -271,7 +301,7 @@ func (rq *ReplicateQueue) remove(item *pqItem) { rq.pqItemMap.Delete(item.shardID) } -func (rq *ReplicateQueue) Start() { +func (rq *Queue) Start() { rq.mu.Lock() started := rq.started rq.mu.Unlock() @@ -316,7 +346,7 @@ func (rq *ReplicateQueue) Start() { } } -func (rq *ReplicateQueue) Stop() { +func (rq *Queue) Stop() { rq.mu.Lock() defer rq.mu.Unlock() if !rq.started { @@ -326,6 +356,7 @@ func (rq *ReplicateQueue) Stop() { rq.started = false } +// findReplacingReplica finds a dead replica to be removed. func findReplacingReplica(replicas []*rfpb.ReplicaDescriptor, replicaByStatus *storemap.ReplicasByStatus) *rfpb.ReplicaDescriptor { if len(replicaByStatus.DeadReplicas) == 0 { // nothing to be removed @@ -349,13 +380,12 @@ func storeHasReplica(node *rfpb.NodeDescriptor, existing []*rfpb.ReplicaDescript return false } -func (rq *ReplicateQueue) allocateTarget(existing []*rfpb.ReplicaDescriptor) *rfpb.NodeDescriptor { +// allocateTarget finds a target node for rd to up-replicate. +func (rq *Queue) allocateTarget(rd *rfpb.RangeDescriptor) *rfpb.NodeDescriptor { storesWithStats := rq.storeMap.GetStoresWithStats() - // Rank stores - // check free bytes var candidates []*candidate for _, su := range storesWithStats.Usages { - if storeHasReplica(su.GetNode(), existing) { + if storeHasReplica(su.GetNode(), rd.GetReplicas()) { continue } candidates = append(candidates, &candidate{ @@ -368,7 +398,6 @@ func (rq *ReplicateQueue) allocateTarget(existing []*rfpb.ReplicaDescriptor) *rf if len(candidates) == 0 { return nil } - sort.Sort(sort.Reverse(byScore(candidates))) slices.SortFunc(candidates, func(a, b *candidate) int { if n := a.compare(b); n != 0 { if n < 0 { @@ -388,8 +417,8 @@ type change struct { removeOp *rfpb.RemoveReplicaRequest } -func (rq *ReplicateQueue) addReplica(rd *rfpb.RangeDescriptor) *change { - target := rq.allocateTarget(rd.GetReplicas()) +func (rq *Queue) addReplica(rd *rfpb.RangeDescriptor) *change { + target := rq.allocateTarget(rd) if target == nil { log.Debugf("cannot find targets for range descriptor:%+v", rd) return nil @@ -403,7 +432,7 @@ func (rq *ReplicateQueue) addReplica(rd *rfpb.RangeDescriptor) *change { } } -func (rq *ReplicateQueue) replaceDeadReplica(rd *rfpb.RangeDescriptor, replicasByStatus *storemap.ReplicasByStatus) *change { +func (rq *Queue) replaceDeadReplica(rd *rfpb.RangeDescriptor, replicasByStatus *storemap.ReplicasByStatus) *change { replacing := findReplacingReplica(rd.GetReplicas(), replicasByStatus) if replacing == nil { // nothing to remove @@ -419,7 +448,7 @@ func (rq *ReplicateQueue) replaceDeadReplica(rd *rfpb.RangeDescriptor, replicasB return change } -func (rq *ReplicateQueue) applyChange(ctx context.Context, change *change) error { +func (rq *Queue) applyChange(ctx context.Context, change *change) error { var rd *rfpb.RangeDescriptor if change.addOp != nil { rsp, err := rq.store.AddReplica(ctx, change.addOp) @@ -445,7 +474,7 @@ func (rq *ReplicateQueue) applyChange(ctx context.Context, change *change) error } -func (rq *ReplicateQueue) processReplica(repl IReplica) (bool, error) { +func (rq *Queue) processReplica(repl IReplica) (bool, error) { rd := repl.RangeDescriptor() if !rq.store.HaveLease(rd.GetRangeId()) { // the store doesn't have the lease of this range. @@ -460,7 +489,7 @@ func (rq *ReplicateQueue) processReplica(repl IReplica) (bool, error) { switch action { case DriverNoop: - case DirverAddReplica: + case DriverAddReplica: log.Debugf("add replica: %d", repl.ShardID()) change = rq.addReplica(rd) case DriverReplaceDeadReplica: @@ -493,7 +522,7 @@ func (rq *ReplicateQueue) processReplica(repl IReplica) (bool, error) { return true, err } -func (rq *ReplicateQueue) postProcess(repl IReplica) { +func (rq *Queue) postProcess(repl IReplica) { itemIface, ok := rq.pqItemMap.Load(repl.ShardID()) if !ok { alert.UnexpectedEvent("unexpected_pq_item_not_found") @@ -552,18 +581,6 @@ func (c *candidate) compare(o *candidate) float64 { return 0 } -type byScore []*candidate - -func (x byScore) Len() int { return len(x) } -func (x byScore) Less(i, j int) bool { - res := x[i].compare(x[j]) - if res != 0 { - return res < 0 - } - return x[i].usage.GetNode().GetNhid() < x[j].usage.GetNode().GetNhid() -} -func (x byScore) Swap(i, j int) { x[i], x[j] = x[j], x[i] } - func replicaCountMeanLevel(storesWithStats *storemap.StoresWithStats, su *rfpb.StoreUsage) meanLevel { maxReplicaCount := aboveMeanReplicaCountThreshold(storesWithStats.ReplicaCount.Mean) minReplicaCount := belowMeanReplicaCountThreshold(storesWithStats.ReplicaCount.Mean) diff --git a/enterprise/server/raft/store/store.go b/enterprise/server/raft/store/store.go index 2a9c433a396..d94dfe09d2a 100644 --- a/enterprise/server/raft/store/store.go +++ b/enterprise/server/raft/store/store.go @@ -107,7 +107,7 @@ type Store struct { updateTagsWorker *updateTagsWorker txnCoordinator *txn.Coordinator - replicateQueue *driver.ReplicateQueue + driverQueue *driver.Queue } // registryHolder implements NodeRegistryFactory. When nodeHost is created, it @@ -208,7 +208,7 @@ func NewWithArgs(env environment.Env, rootDir string, nodeHost *dragonboat.NodeH usages, err := usagetracker.New(s, gossipManager, s.NodeDescriptor(), partitions, s.AddEventListener()) - s.replicateQueue = driver.NewReplicateQueue(s, gossipManager) + s.driverQueue = driver.NewQueue(s, gossipManager) if err != nil { return nil, err @@ -436,7 +436,7 @@ func (s *Store) Start() error { return nil }) eg.Go(func() error { - s.replicateQueue.Start() + s.driverQueue.Start() return nil }) @@ -454,7 +454,7 @@ func (s *Store) Stop(ctx context.Context) error { s.egCancel() s.leaseKeeper.Stop() s.liveness.Release() - s.replicateQueue.Stop() + s.driverQueue.Stop() s.eg.Wait() } s.updateTagsWorker.Stop() @@ -1855,7 +1855,7 @@ func (store *Store) scan(ctx context.Context) { log.Debug("scan started") replicas := store.getLeasedReplicas() for _, repl := range replicas { - store.replicateQueue.MaybeAdd(repl) + store.driverQueue.MaybeAdd(repl) } } }