Skip to content

Commit

Permalink
MB-55062 Unlock all locked shards at the end of rebalance
Browse files Browse the repository at this point in the history
During rebalance, shards can be locked during:

a. Transfer to destination node
b. Recovery on destination node

If rebalance fails while the shard is being recovered, then
the shards on source node has to be unlocked and the recovered
data on destination node has to be deleted. Since there is only
one token tracking the state, it is possible for the following
race condition to happen:

a. Destination node has read all transfesr tokens from metaKV
b. Destination node has removed all local instances and deleted
   the metaKV token
c. Source node tries to read metaKV tokens for cleanup

As the token is removed at step (b), source not may not find the
token and it will skip unlocking the shards. The issue is happening
because both source and destination node are taking different
actions on a single transfer token while the other node can
potentially delete the token.

To avoid these race conditions, shard transfer manager keeps a track
of all shards that are locked during rebalance. At the end of rebalance,
rebalance service manager will call "RestoreAndUnlockShards" API
which will unlock all shards that are present on the node and yet to be
unlocked due to rebalance. This ensures that all shards are properly
unlocked at the end of rebalance.

If any shard is destroyed during cleanup, it will be skipped from
unlocking as unlock is not required for a shard that is destroyed.

Change-Id: Ib43665734e61b68d736efdf7622b8019ad2de5fc
  • Loading branch information
varunv-cb committed Jan 19, 2023
1 parent 62bdf93 commit f0f52af
Show file tree
Hide file tree
Showing 8 changed files with 129 additions and 23 deletions.
3 changes: 2 additions & 1 deletion secondary/indexer/indexer.go
Expand Up @@ -1562,7 +1562,8 @@ func (idx *indexer) handleWorkerMsgs(msg Message) {
DESTROY_LOCAL_SHARD,
LOCK_SHARDS,
UNLOCK_SHARDS,
RESTORE_SHARD_DONE:
RESTORE_SHARD_DONE,
RESTORE_AND_UNLOCK_LOCKED_SHARDS:

idx.storageMgrCmdCh <- msg
<-idx.storageMgrCmdCh
Expand Down
26 changes: 23 additions & 3 deletions secondary/indexer/message.go
Expand Up @@ -206,6 +206,7 @@ const (
LOCK_SHARDS
UNLOCK_SHARDS
RESTORE_SHARD_DONE
RESTORE_AND_UNLOCK_LOCKED_SHARDS
)

type Message interface {
Expand Down Expand Up @@ -2744,9 +2745,10 @@ func (m *MsgUpdateRebalancePhase) GetBucketTransferPhase() map[string]common.Reb
}

type MsgLockUnlockShards struct {
mType MsgType
shardIds []common.ShardId
respCh chan map[common.ShardId]error
mType MsgType
shardIds []common.ShardId
lockedForRecovery bool
respCh chan map[common.ShardId]error
}

func (m *MsgLockUnlockShards) GetMsgType() MsgType {
Expand All @@ -2757,6 +2759,10 @@ func (m *MsgLockUnlockShards) GetShardIds() []common.ShardId {
return m.shardIds
}

func (m *MsgLockUnlockShards) IsLockedForRecovery() bool {
return m.lockedForRecovery
}

func (m *MsgLockUnlockShards) GetRespCh() chan map[common.ShardId]error {
return m.respCh
}
Expand All @@ -2778,6 +2784,18 @@ func (m *MsgRestoreShardDone) GetRespCh() chan bool {
return m.respCh
}

type MsgRestoreAndUnlockShards struct {
respCh chan bool
}

func (m *MsgRestoreAndUnlockShards) GetMsgType() MsgType {
return RESTORE_AND_UNLOCK_LOCKED_SHARDS
}

func (m *MsgRestoreAndUnlockShards) GetRespCh() chan bool {
return m.respCh
}

// MsgType.String is a helper function to return string for message type.
func (m MsgType) String() string {

Expand Down Expand Up @@ -3096,6 +3114,8 @@ func (m MsgType) String() string {
return "UNLOCK_SHARDS"
case RESTORE_SHARD_DONE:
return "RESTORE_SHARD_DONE"
case RESTORE_AND_UNLOCK_LOCKED_SHARDS:
return "RESTORE_AND_UNLOCK_LOCKED_SHARDS"

default:
return "UNKNOWN_MSG_TYPE"
Expand Down
1 change: 1 addition & 0 deletions secondary/indexer/rebalance_provider.go
Expand Up @@ -2,4 +2,5 @@ package indexer

type RebalanceProvider interface {
Cancel()
RestoreAndUnlockShards()
}
25 changes: 15 additions & 10 deletions secondary/indexer/rebalance_service_manager.go
Expand Up @@ -928,6 +928,12 @@ func (m *RebalanceServiceManager) runCleanupPhaseLOCKED(path string, isMaster bo
}
}

if m.rebalancer != nil {
m.rebalancer.RestoreAndUnlockShards()
} else if m.rebalancerF != nil {
m.rebalancerF.RestoreAndUnlockShards()
}

err := m.cleanupLocalRToken()
if err != nil {
return err
Expand Down Expand Up @@ -1375,9 +1381,6 @@ func (m *RebalanceServiceManager) cleanupShardTokenForSource(ttid string, tt *c.
l.Infof("RebalanceServiceManager::cleanupShardTokenForSource: Deleted ttid: %v, "+
"from metakv", ttid)

case c.ShardTokenRestoreShard, c.ShardTokenRecoverShard:
unlockShards(tt.ShardIds, m.supvMsgch)

case c.ShardTokenReady:
// If this token is in Ready state, check for the presence of
// a tranfser token with ShardTokenDropOnSource state. If it
Expand All @@ -1397,9 +1400,10 @@ func (m *RebalanceServiceManager) cleanupShardTokenForSource(ttid string, tt *c.
l.Infof("RebalanceServiceManager::cleanupShardTokenForSource Cleaning up token: %v on source "+
"as ShardTokenDropOnSource is posted for this token", ttid)
return m.cleanupLocalIndexInstsAndShardToken(ttid, tt, true)
} else { // Else, cleanup will be invoked on destination node. Unlock shards on source
unlockShards(tt.ShardIds, m.supvMsgch)

} else {
// Else, cleanup on destination will be triggered as rebalance is not complete for this tenant
// Shards will be unlocked for source (by rebalance_service_manager) after cleanup
// is complete
l.Infof("RebalanceServiceManager::cleanupShardTokenForSource Skipping cleaning up token: %v on source "+
"as ShardTokenDropOnSource is not posted for this token", ttid)
}
Expand Down Expand Up @@ -1489,9 +1493,10 @@ func (m *RebalanceServiceManager) cleanupShardTokenForDest(ttid string, tt *c.Tr
l.Infof("RebalanceServiceManager::cleanupShardTokenForDest Cleaning up token: %v on dest "+
"as ShardTokenDropOnSource is not posted for this token", ttid)
return m.cleanupLocalIndexInstsAndShardToken(ttid, tt, true)
} else { // Else, cleanup on source will be triggered as rebalance is complete for this tenant
unlockShards(tt.ShardIds, m.supvMsgch)

} else {
// Else, cleanup on source will be triggered as rebalance is complete for this tenant
// Shards will be unlocked for destination (by rebalance_service_manager) after cleanup
// is complete
l.Infof("RebalanceServiceManager::cleanupShardTokenForDest Skipping cleaning up token: %v on dest "+
"as ShardTokenDropOnSource is posted for this token", ttid)
}
Expand Down Expand Up @@ -3743,7 +3748,7 @@ func (m *RebalanceServiceManager) handleLockShards(w http.ResponseWriter, r *htt
}

logging.Infof("RebalanceServiceManager::handleLockShards Locking shards: %v as requested by user", shardIds)
err = lockShards(shardIds, m.supvMsgch)
err = lockShards(shardIds, m.supvMsgch, false)
if err != nil {
logging.Infof("RebalanceServiceManager::handleLockShards Error observed when locking shards: %v, err: %v", shardIds, err)
m.writeError(w, err)
Expand Down
4 changes: 4 additions & 0 deletions secondary/indexer/rebalancer.go
Expand Up @@ -446,6 +446,10 @@ func (r *Rebalancer) Cancel() {
r.wg.Wait()
}

func (r *Rebalancer) RestoreAndUnlockShards() {
// No-op for rebalancer. Only used for shard rebalancer
}

func (r *Rebalancer) finishRebalance(err error) {
if err == nil && r.master && r.topologyChange != nil {
// Note that this function tansfers the ownership of only those
Expand Down
24 changes: 18 additions & 6 deletions secondary/indexer/shard_rebalancer.go
Expand Up @@ -658,7 +658,7 @@ func (sr *ShardRebalancer) startShardTransfer(ttid string, tt *c.TransferToken)
// Unlock of the shard happens:
// (a) after shard transfer is successful & destination node has recovered the shard
// (b) If any error is encountered, clean-up from indexer will unlock the shards
err := lockShards(tt.ShardIds, sr.supvMsgch)
err := lockShards(tt.ShardIds, sr.supvMsgch, false)
if err != nil {
logging.Errorf("ShardRebalancer::startShardTransfer Observed error: %v when locking shards: %v", err, tt.ShardIds)

Expand Down Expand Up @@ -1009,7 +1009,7 @@ func (sr *ShardRebalancer) startShardRecovery(ttid string, tt *c.TransferToken)
}
defer sr.wg.Done()

if err := lockShards(tt.ShardIds, sr.supvMsgch); err != nil {
if err := lockShards(tt.ShardIds, sr.supvMsgch, true); err != nil {
logging.Errorf("ShardRebalancer::startShardRecovery, error observed while locking shards: %v, err: %v", tt.ShardIds, err)

unlockShards(tt.ShardIds, sr.supvMsgch)
Expand Down Expand Up @@ -2201,6 +2201,17 @@ func (sr *ShardRebalancer) Cancel() {
}
}

func (sr *ShardRebalancer) RestoreAndUnlockShards() {
l.Infof("ShardRebalancer::RestoreAndUnlockShards Initiating restore and shard unlock")

respCh := make(chan bool)
sr.supvMsgch <- &MsgRestoreAndUnlockShards{
respCh: respCh,
}
<-respCh
l.Infof("ShardRebalancer::RestoreAndUnlockShards Exiting")
}

// This function batches a group of transfer tokens
// according to the following rules:
//
Expand Down Expand Up @@ -2446,13 +2457,14 @@ func isIndexInAsyncRecovery(errMsg string) bool {
return errMsg == common.ErrIndexInAsyncRecovery.Error()
}

func lockShards(shardIds []common.ShardId, supvMsgch MsgChannel) error {
func lockShards(shardIds []common.ShardId, supvMsgch MsgChannel, lockedForRecovery bool) error {

respCh := make(chan map[common.ShardId]error)
msg := &MsgLockUnlockShards{
mType: LOCK_SHARDS,
shardIds: shardIds,
respCh: respCh,
mType: LOCK_SHARDS,
shardIds: shardIds,
lockedForRecovery: lockedForRecovery,
respCh: respCh,
}

supvMsgch <- msg
Expand Down
66 changes: 64 additions & 2 deletions secondary/indexer/shard_transfer_manager.go
Expand Up @@ -16,6 +16,14 @@ type ShardTransferManager struct {
config common.Config
cmdCh chan Message

// lockedShards represent the list of shards that are locked
// for rebalance and are yet to be unlocked. Whenever shard
// rebalancer acquires lock, this map is updated. The entires
// in this map is cleared either when the shard is destroyed
// (or) when the shard is unlocked
lockedShards map[common.ShardId]bool
mu sync.Mutex

sliceList []Slice
sliceCloseNotifier map[common.ShardId]MsgChannel
}
Expand All @@ -24,6 +32,7 @@ func NewShardTransferManager(config common.Config) *ShardTransferManager {
stm := &ShardTransferManager{
config: config,
cmdCh: make(chan Message),
lockedShards: make(map[common.ShardId]bool),
sliceCloseNotifier: make(map[common.ShardId]MsgChannel),
}

Expand Down Expand Up @@ -97,7 +106,10 @@ func (stm *ShardTransferManager) handleStorageMgrCommands(cmd Message) {
stm.handleUnlockShardsCommand(cmd)

case RESTORE_SHARD_DONE:
stm.handleRestoreShardDone(cmd)
go stm.handleRestoreShardDone(cmd)

case RESTORE_AND_UNLOCK_LOCKED_SHARDS:
go stm.handleRestoreAndUnlockShards(cmd)
}
}

Expand Down Expand Up @@ -478,6 +490,15 @@ func (stm *ShardTransferManager) processDestroyLocalShardMessage(cmd Message, no
if err := plasma.DestroyShardID(plasma.ShardId(shardId)); err != nil {
logging.Errorf("ShardTransferManager::processDestroyLocalShardMessage Error cleaning-up shardId: %v from "+
"local file system, err: %v", shardId, err)
} else {
// Since the shard is being destroyed, delete the shard from book-keeping as
// there is no need to unlock a deleted shard
func() {
stm.mu.Lock()
defer stm.mu.Unlock()

delete(stm.lockedShards, shardId)
}()
}
}

Expand Down Expand Up @@ -531,17 +552,23 @@ func (stm *ShardTransferManager) updateSliceStatus() {
func (stm *ShardTransferManager) handleLockShardsCommand(cmd Message) {
lockMsg := cmd.(*MsgLockUnlockShards)

stm.mu.Lock()
defer stm.mu.Unlock()

shardIds := lockMsg.GetShardIds()
respCh := lockMsg.GetRespCh()
isLockedForRecovery := lockMsg.IsLockedForRecovery()

logging.Infof("ShardTransferManager::handleLockShardCommands Initiating shard locking for shards: %v", shardIds)
logging.Infof("ShardTransferManager::handleLockShardCommands Initiating shard locking for shards: %v, isLockedForRecovery: %v", shardIds, isLockedForRecovery)
start := time.Now()

errMap := make(map[common.ShardId]error)
for _, shardId := range shardIds {
err := plasma.LockShard(plasma.ShardId(shardId))
if err != nil {
logging.Errorf("ShardTransferManager::handleLockShardsCommand Error observed while locking shard: %v, err: %v", shardId, err)
} else {
stm.lockedShards[shardId] = isLockedForRecovery
}
errMap[shardId] = err
}
Expand All @@ -554,6 +581,9 @@ func (stm *ShardTransferManager) handleLockShardsCommand(cmd Message) {
func (stm *ShardTransferManager) handleUnlockShardsCommand(cmd Message) {
lockMsg := cmd.(*MsgLockUnlockShards)

stm.mu.Lock()
defer stm.mu.Unlock()

shardIds := lockMsg.GetShardIds()
respCh := lockMsg.GetRespCh()

Expand All @@ -565,6 +595,8 @@ func (stm *ShardTransferManager) handleUnlockShardsCommand(cmd Message) {
err := plasma.UnlockShard(plasma.ShardId(shardId))
if err != nil {
logging.Errorf("ShardTransferManager::handleUnlockShardsCommand Error observed while unlocking shard: %v, err: %v", shardId, err)
} else {
delete(stm.lockedShards, shardId) // Clear book-keeping as shard is unlocked
}
errMap[shardId] = err
}
Expand All @@ -587,3 +619,33 @@ func (stm *ShardTransferManager) handleRestoreShardDone(cmd Message) {
logging.Infof("ShardTransferManager::handleRestoreShardDone Finished RestoreShardDone for shards: %v, elapsed: %v", shardIds, time.Since(start))
respCh <- true
}

func (stm *ShardTransferManager) handleRestoreAndUnlockShards(cmd Message) {
clone := make(map[common.ShardId]bool)

msg := cmd.(*MsgRestoreAndUnlockShards)
respCh := msg.GetRespCh()
func() {
stm.mu.Lock()
defer stm.mu.Unlock()

for shardId, lockedForRecovery := range stm.lockedShards {
clone[shardId] = lockedForRecovery
}
}()

for shardId, lockedForRecovery := range clone {
if lockedForRecovery {
logging.Infof("ShardTransferManager::handleRestoreAndUnlockShards Initiating RestoreShardDone for shardId: %v", shardId)
plasma.RestoreShardDone(plasma.ShardId(shardId))
}
logging.Infof("ShardTransferManager::handleRestoreAndUnlockShards Initiating unlock for shardId: %v", shardId)
if err := plasma.UnlockShard(plasma.ShardId(shardId)); err != nil {
logging.Errorf("ShardTransferManager::handleRestoreAndUnlockShards Error observed while unlocking shard: %v, err: %v", shardId, err)
} else {
delete(stm.lockedShards, shardId) // Clean the book-keeping
}
}

respCh <- true
}
3 changes: 2 additions & 1 deletion secondary/indexer/storage_manager.go
Expand Up @@ -263,7 +263,8 @@ func (s *storageMgr) handleSupvervisorCommands(cmd Message) {
MONITOR_SLICE_STATUS,
LOCK_SHARDS,
UNLOCK_SHARDS,
RESTORE_SHARD_DONE:
RESTORE_SHARD_DONE,
RESTORE_AND_UNLOCK_LOCKED_SHARDS:
if s.stm != nil {
s.stm.ProcessCommand(cmd)
}
Expand Down

0 comments on commit f0f52af

Please sign in to comment.