Skip to content

Commit

Permalink
MB-52930 update processTokens logic for shard rebalance
Browse files Browse the repository at this point in the history
The process tokens methods for each of master, source and
destination have been updated. Some utility functions like
updateInMemToken and checkValidNotifyState have been updated

Change-Id: I695c7bdd67ea28fbbd5d02ad90202ce8285c4797
  • Loading branch information
varunv-cb committed Sep 9, 2022
1 parent 922fff1 commit 6fc5b3a
Show file tree
Hide file tree
Showing 2 changed files with 276 additions and 10 deletions.
20 changes: 16 additions & 4 deletions secondary/indexer/rebalance_service_manager.go
Expand Up @@ -2354,10 +2354,22 @@ func (m *RebalanceServiceManager) handleRegisterRebalanceToken(w http.ResponseWr
return
}

m.rebalancerF = NewRebalancer(nil, m.rebalanceToken, string(m.nodeInfo.NodeID),
false, nil, m.rebalanceDoneCallback, m.supvMsgch,
m.localhttp, m.config.Load(), nil, false, &m.p,
m.genericMgr.statsMgr)
// TODO: Add a cluster version check
cfg := m.config.Load()
isShardAwareRebalance := cfg["rebalance.shard_aware_rebalance"].Bool()

if isShardAwareRebalance {
m.rebalancerF = NewShardRebalancer(nil, m.rebalanceToken, string(m.nodeInfo.NodeID),
false, nil, m.rebalanceDoneCallback, m.supvMsgch,
m.localhttp, m.config.Load(), nil, false, &m.p,
m.genericMgr.statsMgr)
} else {
m.rebalancerF = NewRebalancer(nil, m.rebalanceToken, string(m.nodeInfo.NodeID),
false, nil, m.rebalanceDoneCallback, m.supvMsgch,
m.localhttp, m.config.Load(), nil, false, &m.p,
m.genericMgr.statsMgr)
}

m.writeOk(w)
return

Expand Down
266 changes: 260 additions & 6 deletions secondary/indexer/shard_rebalancer.go
@@ -1,6 +1,7 @@
package indexer

import (
"fmt"
"strings"
"sync"
"sync/atomic"
Expand All @@ -23,8 +24,12 @@ type ShardRebalancer struct {
sourceTokens map[string]*c.TransferToken // as maintained by source
acceptedTokens map[string]*c.TransferToken // as maintained by destination

// List of all tokens that have been acknowledge by both source
// and destination nodes
ackedTokens map[string]*c.TransferToken

// lock protecting access to maps like transferTokens, sourceTokens etc.
mu sync.Mutex
mu sync.RWMutex

rebalToken *RebalanceToken
nodeUUID string
Expand Down Expand Up @@ -81,6 +86,7 @@ func NewShardRebalancer(transferTokens map[string]*c.TransferToken, rebalToken *

acceptedTokens: make(map[string]*c.TransferToken),
sourceTokens: make(map[string]*c.TransferToken),
ackedTokens: make(map[string]*c.TransferToken),

cancel: make(chan struct{}),
done: make(chan struct{}),
Expand Down Expand Up @@ -193,6 +199,161 @@ func (sr *ShardRebalancer) processShardTokens(kve metakv.KVEntry) error {
return nil
}

func (sr *ShardRebalancer) processShardTransferToken(ttid string, tt *c.TransferToken) {
if !sr.addToWaitGroup() {
return
}

defer sr.wg.Done()

//TODO (7.2.0): Transfer token can be processed if there
// are conflicts with rebalance movements. Update this logic
// under allow DDL during rebalance part
if ddl, err := sr.runParams.checkDDLRunning("ShardRebalancer"); ddl {
sr.setTransferTokenError(ttid, tt, err.Error())
return
}

if !tt.IsShardTransferToken() {
err := fmt.Errorf("ShardRebalancer::processShardTransferToken Transfer token is not for transferring shard. ttid: %v, tt: %v",
ttid, tt)
l.Fatalf(err.Error())
sr.setTransferTokenError(ttid, tt, err.Error())
return
}

// "processed" var ensures only the incoming token state gets processed by this
// call, as metakv will call parent processTokens again for each TT state change.
var processed bool

if tt.MasterId == sr.nodeUUID {
processed = sr.processShardTransferTokenAsMaster(ttid, tt)
}

if tt.SourceId == sr.nodeUUID && !processed {
processed = sr.processShardTransferTokenAsSource(ttid, tt)
}

if tt.DestId == sr.nodeUUID && !processed {
processed = sr.processShardTransferTokenAsDest(ttid, tt)
}
}

func (sr *ShardRebalancer) processShardTransferTokenAsMaster(ttid string, tt *c.TransferToken) bool {

if tt.RebalId != sr.rebalToken.RebalId {
l.Warnf("ShardRebalancer::processShardTransferTokenAsMaster Found TransferToken with Unknown "+
"RebalanceId. Local RId %v Token %v. Ignored.", sr.rebalToken.RebalId, tt)
return true
}

if !sr.checkValidNotifyState(ttid, tt, "master") {
return true
}

switch tt.ShardTransferTokenState {

case c.ShardTokenScheduleAck:

sr.mu.Lock()
defer sr.mu.Unlock()

sr.ackedTokens[ttid] = tt
if _, ok := sr.transferTokens[ttid]; ok {
sr.transferTokens[ttid] = tt // Update in-memory book-keeping with new state
}

if sr.allShardTransferTokensAcked() {
// TODO: Create batches of transfer tokens
// and change the state of those transfer tokens
// for further processing

// Change the state of the batched tokens to
// "TransferTokenTransferShard"
}
return true

default:
return false
}

return false
}

func (sr *ShardRebalancer) processShardTransferTokenAsSource(ttid string, tt *c.TransferToken) bool {

if tt.RebalId != sr.rebalToken.RebalId {
l.Warnf("ShardRebalancer::processShardTransferTokenAsSource Found TransferToken with Unknown "+
"RebalanceId. Local RId %v Token %v. Ignored.", sr.rebalToken.RebalId, tt)
return true
}

if !sr.checkValidNotifyState(ttid, tt, "source") {
return true
}

switch tt.ShardTransferTokenState {

case c.ShardTokenCreated:

// TODO: Update in-mem book-keeping with the list of index
// movements during rebalance. This information will be used
// for conflict resolution when DDL and rebalance co-exist
// together
tt.ShardTransferTokenState = c.ShardTokenScheduledOnSource
sr.updateInMemToken(ttid, tt, "source")
setTransferTokenInMetakv(ttid, tt)

// TODO: It is possible for indexer to crash after updating
// the transfer token state. Include logic to clean-up rebalance
// in such case
return true

default:
l.Infof("VarunLog: In the source code in default ttid: %v", ttid)
return false
}

return false
}

func (sr *ShardRebalancer) processShardTransferTokenAsDest(ttid string, tt *c.TransferToken) bool {

if tt.RebalId != sr.rebalToken.RebalId {
l.Warnf("ShardRebalancer::processShardTransferTokenAsDest Found TransferToken with Unknown "+
"RebalanceId. Local RId %v Token %v. Ignored.", sr.rebalToken.RebalId, tt)
return true
}

if !sr.checkValidNotifyState(ttid, tt, "dest") {
return true
}

switch tt.ShardTransferTokenState {

case c.ShardTokenScheduledOnSource:

// TODO: Update in-mem book-keeping with the list of index
// movements during rebalance. This information will be used
// for conflict resolution when DDL and rebalance co-exist
// together

tt.ShardTransferTokenState = c.ShardTokenScheduleAck
sr.updateInMemToken(ttid, tt, "dest")
setTransferTokenInMetakv(ttid, tt)

// TODO: It is possible for destination node to crash
// after updating metakv state. Include logic to clean-up
// rebalance in such case
return true

default:
return false
}

return false
}

func (sr *ShardRebalancer) doRebalance() {

if sr.transferTokens == nil {
Expand All @@ -214,10 +375,102 @@ func (sr *ShardRebalancer) doRebalance() {
return

default:
// TODO: Add logic to publish transfer tokens to metaKV and start processing them
// Publish all transfer tokens to metaKV so that rebalance
// source and destination nodes are aware of potential index
// movements during rebalance
sr.publishShardTransferTokens()
close(sr.waitForTokenPublish)
go sr.observeRebalance()
}
}

func (sr *ShardRebalancer) publishShardTransferTokens() {
for ttid, tt := range sr.transferTokens {
setTransferTokenInMetakv(ttid, tt)
l.Infof("ShardRebalancer::publishShardTransferTokens Published transfer token: %v", ttid)
}
}

// Acquire "mu" before calling this method as "transferTokens"
// and "ackedTokens" are being accessed in this method
func (sr *ShardRebalancer) allShardTransferTokensAcked() bool {
if len(sr.transferTokens) != len(sr.ackedTokens) {
return false
}

for ttid, _ := range sr.transferTokens {
if _, ok := sr.ackedTokens[ttid]; !ok {
return false
}
}

l.Infof("ShardRebalancer::allShardTransferTokensAcked All transfer tokens " +
"moded to ScheduleAck state. Initiating transfer for futher processing")

return true
}

func (sr *ShardRebalancer) updateInMemToken(ttid string, tt *c.TransferToken, caller string) {

sr.mu.RLock()
defer sr.mu.RUnlock()

if caller == "master" {
if _, ok := sr.transferTokens[ttid]; ok {
sr.transferTokens[ttid] = tt
}
} else if caller == "source" {
if _, ok := sr.sourceTokens[ttid]; ok {
sr.sourceTokens[ttid] = tt
}
} else if caller == "dest" {
if _, ok := sr.acceptedTokens[ttid]; ok {
sr.acceptedTokens[ttid] = tt
}
}
}

// tokenMap is the in-memory version of the token state as maintained
// by shard rebalancer. "tt" is the transfer token received through notification
// from metaKV.
//
// Often, metaKV can send multiple notifications for the same state change
// (probably due to the eventual consistent nature of metaKV). ShardRebalancer
// will keep a track of all state changes in its in-memory book-keeping and
// ignores the duplicate notifications
func (sr *ShardRebalancer) checkValidNotifyState(ttid string, tt *c.TransferToken, caller string) bool {

// As the default state is "ShardTokenCreated"
// do not check for valid state changes for this state
if tt.ShardTransferTokenState == c.ShardTokenCreated {
return true
}

sr.mu.RLock()
defer sr.mu.RUnlock()

var inMemToken *c.TransferToken
var ok bool

if caller == "master" {
inMemToken, ok = sr.transferTokens[ttid]
} else if caller == "source" {
inMemToken, ok = sr.sourceTokens[ttid]
} else if caller == "dest" {
inMemToken, ok = sr.acceptedTokens[ttid]
}

if ok {
if tt.ShardTransferTokenState <= inMemToken.ShardTransferTokenState {
l.Warnf("ShardRebalancer::checkValidNotifyState Detected Invalid State "+
"Change Notification for %v. Token Id %v Local State %v Metakv State %v",
caller, ttid, inMemToken.ShardTransferTokenState, tt.ShardTransferTokenState)
return false
}
}
return true
}

// updateProgress runs in a master-node go routine to update the progress of processing
// a single transfer token
func (sr *ShardRebalancer) updateProgress() {
Expand Down Expand Up @@ -256,10 +509,6 @@ func (sr *ShardRebalancer) processDropShard() {
// TODO: Add logic to drop shard
}

func (sr *ShardRebalancer) processShardTransferToken(ttid string, tt *c.TransferToken) {
// TODO: Add logic to process the shard transfer token
}

func (sr *ShardRebalancer) finishRebalance(err error) {
// TODO: Add logic to clean-up transfer tokens
sr.retErr = err
Expand Down Expand Up @@ -302,6 +551,11 @@ func (sr *ShardRebalancer) addToWaitGroup() bool {
return false
}

func (sr *ShardRebalancer) setTransferTokenError(ttid string, tt *c.TransferToken, err string) {
tt.Error = err
setTransferTokenInMetakv(ttid, tt)
}

func (sr *ShardRebalancer) Cancel() {
l.Infof("ShardRebalancer::Cancel Exiting")

Expand Down

0 comments on commit 6fc5b3a

Please sign in to comment.