Skip to content

Commit

Permalink
Merge remote-tracking branch 'couchbase/unstable' into HEAD
Browse files Browse the repository at this point in the history
http: //ci2i-unstable.northscale.in/gsi-29.01.2021-05.30.pass.html
Change-Id: I8651946999099cc813282d9e3941635bbc7925e3
  • Loading branch information
jeelanp2003 committed Jan 29, 2021
2 parents 6e749ed + 69c0dd1 commit f7c8878
Show file tree
Hide file tree
Showing 2 changed files with 149 additions and 84 deletions.
211 changes: 129 additions & 82 deletions secondary/planner/executor.go
Expand Up @@ -194,24 +194,149 @@ func ExecuteRebalanceInternal(clusterUrl string,
return nil, err
}

filterSolution(p.Result.Placement)

transferTokens, err := genTransferToken(p.Result, masterId, topologyChange, deleteNodes)
if err != nil {
return nil, err
}

// Filter the transfer tokens to avoid swap rebalancing of replica partitions
transferTokens = filterTransferTokens(transferTokens)
return transferTokens, nil
}

// filterSolution will iterate through the new placement generated
// by planner and filter out all un-necessary movements.
//
// E.g. if for an index, there exists 3 replicas on nodes n1 (replica 0),
// n2 (replica 1), n3 (replica 2) in a 4 node cluster (n1,n2,n3,n4) and
// if planner has generated a placement to move replica 0 from n1->n2,
// replica 1 from n2->n3 and replica 3 from n3->n4, the movement of replicas
// from n2 and n3 are unnecessary as replica instaces exist on the node before
// and after movement. filterSolution will elimimate all such movements and
// update the solution to have one final movement from n1->n4
//
// Similarly, if there are any cyclic movements i.e. n1->n2,n2->n3,n3->n1,
// all such movements will be avoided
func filterSolution(placement []*IndexerNode) {

indexDefnMap := make(map[common.IndexDefnId]map[common.PartitionId][]*IndexUsage)
indexerMap := make(map[string]*IndexerNode)

// Group the index based on replica, partition. This grouping
// will help to identify if multiple replicas are being moved
// between nodes
for _, indexer := range placement {
indexerMap[indexer.NodeId] = indexer
for _, index := range indexer.Indexes {
// Update destNode for each of the index as planner has
// finished the run and generated a tentative placement.
// A transfer token will not be generated if initialNode
// and destNode are same.
index.destNode = indexer
if _, ok := indexDefnMap[index.DefnId]; !ok {
indexDefnMap[index.DefnId] = make(map[common.PartitionId][]*IndexUsage)
}
indexDefnMap[index.DefnId][index.PartnId] = append(indexDefnMap[index.DefnId][index.PartnId], index)
}
}

for _, defnMap := range indexDefnMap {
for _, indexes := range defnMap {
if len(indexes) == 1 {
continue
}

transferMap := make(map[string]string)

// Generate a map of all transfers between nodes for this replica instance
for _, index := range indexes {
if index.initialNode != nil && index.initialNode.NodeId != index.destNode.NodeId {
transferMap[index.initialNode.NodeId] = index.destNode.NodeId
} else if index.initialNode == nil {
// Create a dummy source node for replica repair. This is to
// address scenarios like lost_replica -> n0, n0 -> n1
key := fmt.Sprintf("ReplicaRepair_%v_%v", index.InstId, index.PartnId)
transferMap[key] = index.destNode.NodeId
}
}

if len(transferMap) == 0 {
continue
}

loop:
// Search the transferMap in Depth-First fashion and find the appropriate
// source and destination
for src, dst := range transferMap {
if newDest, ok := transferMap[dst]; ok {
delete(transferMap, dst)
if newDest != src {
transferMap[src] = newDest
} else { // Delete src to avoid cyclic transfers (n0 -> n1, n1 -> n0)
delete(transferMap, src)
}
goto loop
}
}

// Filter out un-necessary movements from based on transferMap
// and update the solution to have only valid movements
for _, index := range indexes {
var initialNodeId string
var replicaRepair bool
if index.initialNode == nil {
initialNodeId = fmt.Sprintf("ReplicaRepair_%v_%v", index.InstId, index.PartnId)
replicaRepair = true
} else {
initialNodeId = index.initialNode.NodeId
}

if destNodeId, ok := transferMap[initialNodeId]; ok {
// Inst. is moved to a different node after filtering the solution
if destNodeId != index.destNode.NodeId {
destIndexer := indexerMap[destNodeId]
preFilterDest := index.destNode
index.destNode = destIndexer

fmsg := "Planner::filterSolution - Planner intended to move the inst: %v, " +
"partn: %v from node %v to node %v. Instead the inst is moved to node: %v " +
"after eliminating the un-necessary replica movements"

if replicaRepair { // initialNode would be nil incase of replica repair
logging.Infof(fmsg, index.InstId, index.PartnId,
"", preFilterDest.NodeId, index.destNode.NodeId)
} else {
logging.Infof(fmsg, index.InstId, index.PartnId,
index.initialNode.NodeId, preFilterDest.NodeId, index.destNode.NodeId)
}
} else {
// Initial destination and final destiantion are same. No change
// in placement required
}
} else {
// Planner initially planned a movement for this index but after filtering the
// solution, the movement is deemed un-necessary
if index.initialNode != nil && index.destNode.NodeId != index.initialNode.NodeId {
logging.Infof("Planner::filterSolution - Planner intended to move the inst: %v, "+
"partn: %v from node %v to node %v. This movement is deemed un-necessary as node: %v "+
"already has a replica partition", index.InstId, index.PartnId, index.initialNode.NodeId,
index.destNode.NodeId, index.destNode.NodeId)
index.destNode = index.initialNode
}
}
}
}
}
}

func genTransferToken(solution *Solution, masterId string, topologyChange service.TopologyChange,
deleteNodes []string) (map[string]*common.TransferToken, error) {

tokens := make(map[string]*common.TransferToken)

for _, indexer := range solution.Placement {
for _, index := range indexer.Indexes {
if index.initialNode != nil && index.initialNode.NodeId != indexer.NodeId && !index.pendingCreate {
if index.initialNode != nil && index.initialNode.NodeId != index.destNode.NodeId && !index.pendingCreate {

// one token for every index replica between a specific source and destination
tokenKey := fmt.Sprintf("%v %v %v %v", index.DefnId, index.Instance.ReplicaId, index.initialNode.NodeUUID, indexer.NodeUUID)
Expand All @@ -221,7 +346,7 @@ func genTransferToken(solution *Solution, masterId string, topologyChange servic
token = &common.TransferToken{
MasterId: masterId,
SourceId: index.initialNode.NodeUUID,
DestId: indexer.NodeUUID,
DestId: index.destNode.NodeUUID,
RebalId: topologyChange.ID,
State: common.TransferTokenCreated,
InstId: index.InstId,
Expand Down Expand Up @@ -337,84 +462,6 @@ func genTransferToken(solution *Solution, masterId string, topologyChange servic
return result, nil
}

// This method will filter all the replica instances (and partitions)
// that share the same definition ID and partition ID but are being
// swapped between nodes. Swapping replicas between nodes will not offer
// any advantage as GSI replication is master-master in nature.

// If replica instances are not filtered, then it can cause rebalance
// failures, add un-necessary load to the system by re-building the
// instances etc.
func filterTransferTokens(transferTokens map[string]*common.TransferToken) map[string]*common.TransferToken {

// key -> DefnId:SourceId:DestId:PartnId, value -> Transfer token ID
groupedTransferTokens := make(map[string]string)
for tid, token := range transferTokens {
for _, partnId := range token.IndexInst.Defn.Partitions {
groupedKey := fmt.Sprintf("%v:%v:%v:%v", token.IndexInst.Defn.DefnId, token.SourceId, token.DestId, partnId)
groupedTransferTokens[groupedKey] = tid
}
}

// For each token, this map maintains a list of all partitions that
// have to be filtered out if they are being swapped across nodes
filteredPartnMap := make(map[string][]common.PartitionId)
for tid1, token := range transferTokens {
for _, partnId := range token.IndexInst.Defn.Partitions {

// For this partition, check if there is any replica that
// is being swapped i.e. if a token tid2 exists such that
// tid1.sourceId == tid2.DestId and tid1.DestId == tid2.sourceId
swappedKey := fmt.Sprintf("%v:%v:%v:%v", token.IndexInst.Defn.DefnId, token.DestId, token.SourceId, partnId)

if tid2, ok := groupedTransferTokens[swappedKey]; ok && tid1 != tid2 {
// Replicas are being swapped
logging.Infof("Rebalancer::filterTransferTokens Partition: %v for defnId: %v "+
"is being swapped through tokens: %v, %v", partnId, token.IndexInst.Defn.DefnId, tid1, tid2)
filteredPartnMap[tid1] = append(filteredPartnMap[tid1], partnId)
filteredPartnMap[tid2] = append(filteredPartnMap[tid2], partnId)
}
}
}

if len(filteredPartnMap) == 0 {
logging.Infof("Rebalancer::filterTransferTokens Nothing to filter")
return transferTokens
}

getValidPartns := func(filteredPartns []common.PartitionId, partnsInToken []common.PartitionId) []common.PartitionId {
validPartns := make([]common.PartitionId, 0)
partnMap := make(map[common.PartitionId]bool)
for _, partnId := range filteredPartns {
partnMap[partnId] = true
}

for _, partnId := range partnsInToken {
if _, ok := partnMap[partnId]; !ok { // This partition is not being filtered
validPartns = append(validPartns, partnId)
}
}
return validPartns
}

// Remove the partitions that are being swapped
for tid, swappedPartns := range filteredPartnMap {
if token, ok := transferTokens[tid]; ok {
validPartns := getValidPartns(swappedPartns, token.IndexInst.Defn.Partitions)
// All the partitions in this token are being swapped. Delete the token
if len(validPartns) == 0 {
delete(transferTokens, tid)
logging.Infof("Rebalancer::filterTransferTokens Removing the transfer token: %v "+
"as all the partitions are being swapped", tid)
} else {
token.IndexInst.Defn.Partitions = validPartns
logging.Infof("Rebalancer::filterTransferTokens Updated the transfer token: %v after filtering the swapped partitions. Updated token: %v", tid, token)
}
}
}
return transferTokens
}

//////////////////////////////////////////////////////////////
// Integration with Metadata Provider
/////////////////////////////////////////////////////////////
Expand Down
22 changes: 20 additions & 2 deletions secondary/planner/planner.go
Expand Up @@ -13,14 +13,15 @@ package planner
import (
"errors"
"fmt"
"github.com/couchbase/indexing/secondary/common"
"github.com/couchbase/indexing/secondary/logging"
"math"
"math/rand"
"sort"
"strconv"
"sync"
"time"

"github.com/couchbase/indexing/secondary/common"
"github.com/couchbase/indexing/secondary/logging"
)

//TODO
Expand Down Expand Up @@ -261,6 +262,10 @@ type IndexUsage struct {
// for new indexes to be placed on an existing topology (e.g. live cluster), this must not be set.
initialNode *IndexerNode

// The node to which planner moves this index from initialNode
// Equals "initialNode" if planner does not move the index
destNode *IndexerNode

// input: flag to indicate if the index in delete or create token
pendingDelete bool // true if there is a delete token associated with this index
pendingCreate bool // true if there is a create token associated with this index
Expand Down Expand Up @@ -2464,6 +2469,9 @@ func (s *Solution) SatisfyClusterConstraint() bool {
return true
}

//
// findServerGroup gets called only if solution.numServerGroup > 1
//
func (s *Solution) findServerGroup(defnId common.IndexDefnId, partnId common.PartitionId, replicaId int) (string, bool) {

findSG := func(sgMap ServerGroupMap) (string, bool) {
Expand Down Expand Up @@ -2491,6 +2499,8 @@ func (s *Solution) findServerGroup(defnId common.IndexDefnId, partnId common.Par
//
// Check if there is any replica (excluding self) in the server group
//
// hasReplicaInServerGroup gets called only if solution.numServerGroup > 1
//
func (s *Solution) hasReplicaInServerGroup(u *IndexUsage, group string) bool {

hasReplicaInSG := func(replicaM ReplicaMap) bool {
Expand Down Expand Up @@ -2534,6 +2544,8 @@ func (s *Solution) hasReplicaInServerGroup(u *IndexUsage, group string) bool {
//
// Get server groups having replicas for this index (excluding self)
//
// getServerGroupsWithReplica gets called only if solution.numServerGroup > 1
//
func (s *Solution) getServerGroupsWithReplica(u *IndexUsage) map[string]bool {

getSGWithReplica := func(replicaM ReplicaMap) map[string]bool {
Expand Down Expand Up @@ -2575,6 +2587,8 @@ func (s *Solution) getServerGroupsWithReplica(u *IndexUsage) map[string]bool {
//
// Check if any server group without this replica (excluding self)
//
// hasServerGroupWithNoReplica gets called only if solution.numServerGroup > 1
//
func (s *Solution) hasServerGroupWithNoReplica(u *IndexUsage) bool {

if s.numServerGroup > len(s.getServerGroupsWithReplica(u)) {
Expand Down Expand Up @@ -2731,6 +2745,10 @@ func (s *Solution) hasDeletedNodes() bool {
//
func (s *Solution) updateServerGroupMap(index *IndexUsage, indexer *IndexerNode) {

if s.numServerGroup <= 1 {
return
}

updateSGMap := func(sgMap ServerGroupMap) {
if index.Instance != nil {
if indexer != nil {
Expand Down

0 comments on commit f7c8878

Please sign in to comment.