From 2e5159624ec55fa886f2c08075f4f281a85764a3 Mon Sep 17 00:00:00 2001 From: Varun Velamuri Date: Fri, 22 Jan 2021 01:22:57 +0530 Subject: [PATCH 1/2] MB-42220 Filter solution to remove un-necessary replica movements Consider a placement before rebalance: i1:n1, i2:n2, i3:n3 After the planner run, if the placement generated is i1:n2, i2:n3, i3:n4, then the movement of i2 and i3 are un-necessary as i1,i2,i3 are all replica instances. Instead i1 can be moved to i4. This patch takes care of updating the generated solution to handle such scenarios Change-Id: I208da121892e773343dfeaea8aec85293a057e52 --- secondary/planner/executor.go | 211 +++++++++++++++++++++------------- secondary/planner/planner.go | 9 +- 2 files changed, 136 insertions(+), 84 deletions(-) diff --git a/secondary/planner/executor.go b/secondary/planner/executor.go index 51af56019..b9e00a481 100644 --- a/secondary/planner/executor.go +++ b/secondary/planner/executor.go @@ -194,16 +194,141 @@ func ExecuteRebalanceInternal(clusterUrl string, return nil, err } + filterSolution(p.Result.Placement) + transferTokens, err := genTransferToken(p.Result, masterId, topologyChange, deleteNodes) if err != nil { return nil, err } - // Filter the transfer tokens to avoid swap rebalancing of replica partitions - transferTokens = filterTransferTokens(transferTokens) return transferTokens, nil } +// filterSolution will iterate through the new placement generated +// by planner and filter out all un-necessary movements. +// +// E.g. if for an index, there exists 3 replicas on nodes n1 (replica 0), +// n2 (replica 1), n3 (replica 2) in a 4 node cluster (n1,n2,n3,n4) and +// if planner has generated a placement to move replica 0 from n1->n2, +// replica 1 from n2->n3 and replica 3 from n3->n4, the movement of replicas +// from n2 and n3 are unnecessary as replica instaces exist on the node before +// and after movement. filterSolution will elimimate all such movements and +// update the solution to have one final movement from n1->n4 +// +// Similarly, if there are any cyclic movements i.e. n1->n2,n2->n3,n3->n1, +// all such movements will be avoided +func filterSolution(placement []*IndexerNode) { + + indexDefnMap := make(map[common.IndexDefnId]map[common.PartitionId][]*IndexUsage) + indexerMap := make(map[string]*IndexerNode) + + // Group the index based on replica, partition. This grouping + // will help to identify if multiple replicas are being moved + // between nodes + for _, indexer := range placement { + indexerMap[indexer.NodeId] = indexer + for _, index := range indexer.Indexes { + // Update destNode for each of the index as planner has + // finished the run and generated a tentative placement. + // A transfer token will not be generated if initialNode + // and destNode are same. + index.destNode = indexer + if _, ok := indexDefnMap[index.DefnId]; !ok { + indexDefnMap[index.DefnId] = make(map[common.PartitionId][]*IndexUsage) + } + indexDefnMap[index.DefnId][index.PartnId] = append(indexDefnMap[index.DefnId][index.PartnId], index) + } + } + + for _, defnMap := range indexDefnMap { + for _, indexes := range defnMap { + if len(indexes) == 1 { + continue + } + + transferMap := make(map[string]string) + + // Generate a map of all transfers between nodes for this replica instance + for _, index := range indexes { + if index.initialNode != nil && index.initialNode.NodeId != index.destNode.NodeId { + transferMap[index.initialNode.NodeId] = index.destNode.NodeId + } else if index.initialNode == nil { + // Create a dummy source node for replica repair. This is to + // address scenarios like lost_replica -> n0, n0 -> n1 + key := fmt.Sprintf("ReplicaRepair_%v_%v", index.InstId, index.PartnId) + transferMap[key] = index.destNode.NodeId + } + } + + if len(transferMap) == 0 { + continue + } + + loop: + // Search the transferMap in Depth-First fashion and find the appropriate + // source and destination + for src, dst := range transferMap { + if newDest, ok := transferMap[dst]; ok { + delete(transferMap, dst) + if newDest != src { + transferMap[src] = newDest + } else { // Delete src to avoid cyclic transfers (n0 -> n1, n1 -> n0) + delete(transferMap, src) + } + goto loop + } + } + + // Filter out un-necessary movements from based on transferMap + // and update the solution to have only valid movements + for _, index := range indexes { + var initialNodeId string + var replicaRepair bool + if index.initialNode == nil { + initialNodeId = fmt.Sprintf("ReplicaRepair_%v_%v", index.InstId, index.PartnId) + replicaRepair = true + } else { + initialNodeId = index.initialNode.NodeId + } + + if destNodeId, ok := transferMap[initialNodeId]; ok { + // Inst. is moved to a different node after filtering the solution + if destNodeId != index.destNode.NodeId { + destIndexer := indexerMap[destNodeId] + preFilterDest := index.destNode + index.destNode = destIndexer + + fmsg := "Planner::filterSolution - Planner intended to move the inst: %v, " + + "partn: %v from node %v to node %v. Instead the inst is moved to node: %v " + + "after eliminating the un-necessary replica movements" + + if replicaRepair { // initialNode would be nil incase of replica repair + logging.Infof(fmsg, index.InstId, index.PartnId, + "", preFilterDest.NodeId, index.destNode.NodeId) + } else { + logging.Infof(fmsg, index.InstId, index.PartnId, + index.initialNode.NodeId, preFilterDest.NodeId, index.destNode.NodeId) + } + } else { + // Initial destination and final destiantion are same. No change + // in placement required + } + } else { + // Planner initially planned a movement for this index but after filtering the + // solution, the movement is deemed un-necessary + if index.initialNode != nil && index.destNode.NodeId != index.initialNode.NodeId { + logging.Infof("Planner::filterSolution - Planner intended to move the inst: %v, "+ + "partn: %v from node %v to node %v. This movement is deemed un-necessary as node: %v "+ + "already has a replica partition", index.InstId, index.PartnId, index.initialNode.NodeId, + index.destNode.NodeId, index.destNode.NodeId) + index.destNode = index.initialNode + } + } + } + } + } +} + func genTransferToken(solution *Solution, masterId string, topologyChange service.TopologyChange, deleteNodes []string) (map[string]*common.TransferToken, error) { @@ -211,7 +336,7 @@ func genTransferToken(solution *Solution, masterId string, topologyChange servic for _, indexer := range solution.Placement { for _, index := range indexer.Indexes { - if index.initialNode != nil && index.initialNode.NodeId != indexer.NodeId && !index.pendingCreate { + if index.initialNode != nil && index.initialNode.NodeId != index.destNode.NodeId && !index.pendingCreate { // one token for every index replica between a specific source and destination tokenKey := fmt.Sprintf("%v %v %v %v", index.DefnId, index.Instance.ReplicaId, index.initialNode.NodeUUID, indexer.NodeUUID) @@ -221,7 +346,7 @@ func genTransferToken(solution *Solution, masterId string, topologyChange servic token = &common.TransferToken{ MasterId: masterId, SourceId: index.initialNode.NodeUUID, - DestId: indexer.NodeUUID, + DestId: index.destNode.NodeUUID, RebalId: topologyChange.ID, State: common.TransferTokenCreated, InstId: index.InstId, @@ -337,84 +462,6 @@ func genTransferToken(solution *Solution, masterId string, topologyChange servic return result, nil } -// This method will filter all the replica instances (and partitions) -// that share the same definition ID and partition ID but are being -// swapped between nodes. Swapping replicas between nodes will not offer -// any advantage as GSI replication is master-master in nature. - -// If replica instances are not filtered, then it can cause rebalance -// failures, add un-necessary load to the system by re-building the -// instances etc. -func filterTransferTokens(transferTokens map[string]*common.TransferToken) map[string]*common.TransferToken { - - // key -> DefnId:SourceId:DestId:PartnId, value -> Transfer token ID - groupedTransferTokens := make(map[string]string) - for tid, token := range transferTokens { - for _, partnId := range token.IndexInst.Defn.Partitions { - groupedKey := fmt.Sprintf("%v:%v:%v:%v", token.IndexInst.Defn.DefnId, token.SourceId, token.DestId, partnId) - groupedTransferTokens[groupedKey] = tid - } - } - - // For each token, this map maintains a list of all partitions that - // have to be filtered out if they are being swapped across nodes - filteredPartnMap := make(map[string][]common.PartitionId) - for tid1, token := range transferTokens { - for _, partnId := range token.IndexInst.Defn.Partitions { - - // For this partition, check if there is any replica that - // is being swapped i.e. if a token tid2 exists such that - // tid1.sourceId == tid2.DestId and tid1.DestId == tid2.sourceId - swappedKey := fmt.Sprintf("%v:%v:%v:%v", token.IndexInst.Defn.DefnId, token.DestId, token.SourceId, partnId) - - if tid2, ok := groupedTransferTokens[swappedKey]; ok && tid1 != tid2 { - // Replicas are being swapped - logging.Infof("Rebalancer::filterTransferTokens Partition: %v for defnId: %v "+ - "is being swapped through tokens: %v, %v", partnId, token.IndexInst.Defn.DefnId, tid1, tid2) - filteredPartnMap[tid1] = append(filteredPartnMap[tid1], partnId) - filteredPartnMap[tid2] = append(filteredPartnMap[tid2], partnId) - } - } - } - - if len(filteredPartnMap) == 0 { - logging.Infof("Rebalancer::filterTransferTokens Nothing to filter") - return transferTokens - } - - getValidPartns := func(filteredPartns []common.PartitionId, partnsInToken []common.PartitionId) []common.PartitionId { - validPartns := make([]common.PartitionId, 0) - partnMap := make(map[common.PartitionId]bool) - for _, partnId := range filteredPartns { - partnMap[partnId] = true - } - - for _, partnId := range partnsInToken { - if _, ok := partnMap[partnId]; !ok { // This partition is not being filtered - validPartns = append(validPartns, partnId) - } - } - return validPartns - } - - // Remove the partitions that are being swapped - for tid, swappedPartns := range filteredPartnMap { - if token, ok := transferTokens[tid]; ok { - validPartns := getValidPartns(swappedPartns, token.IndexInst.Defn.Partitions) - // All the partitions in this token are being swapped. Delete the token - if len(validPartns) == 0 { - delete(transferTokens, tid) - logging.Infof("Rebalancer::filterTransferTokens Removing the transfer token: %v "+ - "as all the partitions are being swapped", tid) - } else { - token.IndexInst.Defn.Partitions = validPartns - logging.Infof("Rebalancer::filterTransferTokens Updated the transfer token: %v after filtering the swapped partitions. Updated token: %v", tid, token) - } - } - } - return transferTokens -} - ////////////////////////////////////////////////////////////// // Integration with Metadata Provider ///////////////////////////////////////////////////////////// diff --git a/secondary/planner/planner.go b/secondary/planner/planner.go index 99f7ad817..2013444b7 100644 --- a/secondary/planner/planner.go +++ b/secondary/planner/planner.go @@ -13,14 +13,15 @@ package planner import ( "errors" "fmt" - "github.com/couchbase/indexing/secondary/common" - "github.com/couchbase/indexing/secondary/logging" "math" "math/rand" "sort" "strconv" "sync" "time" + + "github.com/couchbase/indexing/secondary/common" + "github.com/couchbase/indexing/secondary/logging" ) //TODO @@ -261,6 +262,10 @@ type IndexUsage struct { // for new indexes to be placed on an existing topology (e.g. live cluster), this must not be set. initialNode *IndexerNode + // The node to which planner moves this index from initialNode + // Equals "initialNode" if planner does not move the index + destNode *IndexerNode + // input: flag to indicate if the index in delete or create token pendingDelete bool // true if there is a delete token associated with this index pendingCreate bool // true if there is a create token associated with this index From 69c0dd1bc3a8b4bf66617e6e992a2e6dcc4cc324 Mon Sep 17 00:00:00 2001 From: Amit Kulkarni Date: Mon, 25 Jan 2021 19:59:40 +0530 Subject: [PATCH 2/2] MB-40921: Avoid maintaining server group map in planner if not needed Change-Id: Id45447eff4204581d91813496aeb87bd283b8c8d --- secondary/planner/planner.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/secondary/planner/planner.go b/secondary/planner/planner.go index 2013444b7..e981a6d5c 100644 --- a/secondary/planner/planner.go +++ b/secondary/planner/planner.go @@ -2469,6 +2469,9 @@ func (s *Solution) SatisfyClusterConstraint() bool { return true } +// +// findServerGroup gets called only if solution.numServerGroup > 1 +// func (s *Solution) findServerGroup(defnId common.IndexDefnId, partnId common.PartitionId, replicaId int) (string, bool) { findSG := func(sgMap ServerGroupMap) (string, bool) { @@ -2496,6 +2499,8 @@ func (s *Solution) findServerGroup(defnId common.IndexDefnId, partnId common.Par // // Check if there is any replica (excluding self) in the server group // +// hasReplicaInServerGroup gets called only if solution.numServerGroup > 1 +// func (s *Solution) hasReplicaInServerGroup(u *IndexUsage, group string) bool { hasReplicaInSG := func(replicaM ReplicaMap) bool { @@ -2539,6 +2544,8 @@ func (s *Solution) hasReplicaInServerGroup(u *IndexUsage, group string) bool { // // Get server groups having replicas for this index (excluding self) // +// getServerGroupsWithReplica gets called only if solution.numServerGroup > 1 +// func (s *Solution) getServerGroupsWithReplica(u *IndexUsage) map[string]bool { getSGWithReplica := func(replicaM ReplicaMap) map[string]bool { @@ -2580,6 +2587,8 @@ func (s *Solution) getServerGroupsWithReplica(u *IndexUsage) map[string]bool { // // Check if any server group without this replica (excluding self) // +// hasServerGroupWithNoReplica gets called only if solution.numServerGroup > 1 +// func (s *Solution) hasServerGroupWithNoReplica(u *IndexUsage) bool { if s.numServerGroup > len(s.getServerGroupsWithReplica(u)) { @@ -2736,6 +2745,10 @@ func (s *Solution) hasDeletedNodes() bool { // func (s *Solution) updateServerGroupMap(index *IndexUsage, indexer *IndexerNode) { + if s.numServerGroup <= 1 { + return + } + updateSGMap := func(sgMap ServerGroupMap) { if index.Instance != nil { if indexer != nil {