Skip to content

Commit

Permalink
MB-54678 Unlock shards on source after replica repair in rebalance
Browse files Browse the repository at this point in the history
During replica repair, indexes on source node should not be
dropped. However, all the data that is transferred to S3 should
be cleaned up and the corresponding shards have to be unlocked.
Otherwise, new index creation after replica repair will fail
due to shards being locked.

This patch addresses the issue by cleaning up the transferred
data and unlocking the shards after replica repair case.

Change-Id: Id5b27c111d4701b4a0b7c578c1cd727889027e3f
  • Loading branch information
varunv-cb committed Dec 2, 2022
1 parent c188907 commit fceb0cd
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 19 deletions.
48 changes: 30 additions & 18 deletions secondary/indexer/shard_rebalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,17 +388,15 @@ func (sr *ShardRebalancer) processShardTransferTokenAsMaster(ttid string, tt *c.
setTransferTokenInMetakv(dropOnSourceTokenId, dropOnSourceToken)
}
} else {
// If sibling does not exist, this could be replica repair or swap rebalance.
if tt.TransferMode == common.TokenTransferModeMove { // single node swap rebalance
// Go-ahead and post dropOnSource token signalling completion of movements
dropOnSourceTokenId, dropOnSourceToken := genShardTokenDropOnSource(tt.RebalId, ttid, "")
setTransferTokenInMetakv(dropOnSourceTokenId, dropOnSourceToken)
} else { // replica repair
// Replica repair case. Go-ahead and move the token to Commit phase as
// we should not drop indexes on source node during replica repair
tt.ShardTransferTokenState = common.ShardTokenCommit
setTransferTokenInMetakv(ttid, tt)
}
// If sibling does not exist, this could be replica repair or single node
// swap rebalance.
// For replica repair case, source node should clear the transferred data and
// also unlock the shards. For single node swap rebalance, the indexes are
// also expected to be dropped on source node. Therefore, post a DropOnSource
// token in either case. Source node will act based on the transferMode of the
// token
dropOnSourceTokenId, dropOnSourceToken := genShardTokenDropOnSource(tt.RebalId, ttid, "")
setTransferTokenInMetakv(dropOnSourceTokenId, dropOnSourceToken)
}

return true
Expand Down Expand Up @@ -541,7 +539,18 @@ func (sr *ShardRebalancer) processShardTransferTokenAsSource(ttid string, tt *c.

func (sr *ShardRebalancer) checkAndQueueTokenForDrop(token *c.TransferToken, sourceId, siblingId string) bool {
sourceToken := sr.getTokenById(sourceId)
if sourceToken != nil && sourceToken.TransferMode == common.TokenTransferModeMove {
if sourceToken != nil && sourceToken.TransferMode == common.TokenTransferModeCopy && siblingId == "" { // only replica repair case
// Since this is replica repair, do not drop the shard data. Only cleanup the transferred data
// and unlock the shards
l.Infof("ShardRebalacner::processShardTransferTokenAsSource Initiating cleanup of transfer & shard unlocking for token: %v", sourceId)
sr.initiateShardTransferCleanup(sourceToken.ShardPaths, sourceToken.Destination, sourceId, sourceToken, nil)

sourceToken.ShardTransferTokenState = common.ShardTokenCommit
setTransferTokenInMetakv(sourceId, sourceToken)
return true

} else if sourceToken != nil && sourceToken.TransferMode == common.TokenTransferModeMove { // Single node swap rebalance (or) single node swap + replica repair in same rebalance

l.Infof("ShardRebalacner::processShardTransferTokenAsSource Queuing token: %v for drop", sourceId)
sr.queueDropShardRequests(sourceId)

Expand All @@ -551,6 +560,7 @@ func (sr *ShardRebalancer) checkAndQueueTokenForDrop(token *c.TransferToken, sou
setTransferTokenInMetakv(siblingId, siblingToken)
}
return true

} else {
l.Infof("ShardRebalacner::processShardTransferTokenAsSource Skipping token: %v for drop", sourceId)
}
Expand All @@ -561,8 +571,8 @@ func (sr *ShardRebalancer) getTokenById(ttid string) *c.TransferToken {
sr.mu.Lock()
defer sr.mu.Unlock()

if siblingToken, ok := sr.sourceTokens[ttid]; ok && siblingToken != nil {
return siblingToken
if token, ok := sr.sourceTokens[ttid]; ok && token != nil {
return token.Clone()
}
return nil // Return error as the default state
}
Expand Down Expand Up @@ -672,10 +682,12 @@ func (sr *ShardRebalancer) initiateShardTransferCleanup(shardPaths map[common.Sh
l.Infof("ShardRebalancer::initiateShardTransferCleanup Done clean-up for ttid: %v, "+
"shard paths: %v, destination: %v, elapsed(sec): %v", ttid, shardPaths, destination, elapsed)

// Update error in transfer token so that rebalance master
// will finish the rebalance and clean-up can be invoked for
// other transfer tokens in the batch depending on their state
sr.setTransferTokenError(ttid, tt, err.Error())
if err != nil {
// Update error in transfer token so that rebalance master
// will finish the rebalance and clean-up can be invoked for
// other transfer tokens in the batch depending on their state
sr.setTransferTokenError(ttid, tt, err.Error())
}

}

Expand Down
2 changes: 1 addition & 1 deletion secondary/indexer/shard_transfer_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ func (stm *ShardTransferManager) processTransferCleanupMessage(cmd Message) {
meta := make(map[string]interface{})
meta[plasma.GSIRebalanceId] = rebalanceId
meta[plasma.GSIRebalanceTransferToken] = ttid
meta[plasma.GSIShardID] = int64(shardId)
meta[plasma.GSIShardID] = uint64(shardId)
meta[plasma.GSIShardUploadPath] = shardPath

plasma.UnlockShard(plasma.ShardId(shardId))
Expand Down

0 comments on commit fceb0cd

Please sign in to comment.