Skip to content

Commit

Permalink
MB-46320 Lock protect writes and reads from clustMgrAgentCmdCh
Browse files Browse the repository at this point in the history
In http://review.couchbase.org/c/indexing/+/156184, the bootstrap
code is made asyncronous. This means that multiple go-routines
can read and write from cluster manager agent. In this patch,
all reads and writes to clustMgrAgentCmdCh are lock protected
to prevent message cross over

Change-Id: I62455ef75f2ca5f02e8cfd32fcde619479cbc5ee
  • Loading branch information
varunv-cb committed Oct 26, 2021
1 parent 0226406 commit 9c5bd8a
Showing 1 changed file with 68 additions and 53 deletions.
121 changes: 68 additions & 53 deletions secondary/indexer/indexer.go
Expand Up @@ -169,8 +169,9 @@ type indexer struct {

config common.Config

kvlock sync.Mutex //fine-grain lock for KVSender
stateLock sync.RWMutex //lock to protect the keyspaceIdStatus map
kvlock sync.Mutex //fine-grain lock for KVSender
clustMgrLock sync.Mutex // lock to protect concurrent reads and writes from clustMgrAgentCmdCh
stateLock sync.RWMutex //lock to protect the keyspaceIdStatus map

stats *IndexerStats

Expand Down Expand Up @@ -606,7 +607,8 @@ func (idx *indexer) handleSecurityChange(msg Message) {
if refreshEncrypt {
// restart lifecyclemgr
logging.Infof("handleSecurityChange: restarting index manager")
if err := idx.sendMsgToWorker(msg, idx.clustMgrAgentCmdCh); err != nil {

if err := idx.sendMsgToClustMgrAndProcessResponse(msg); err != nil {
exitFn(fmt.Sprintf("Fail to restart lifecycle mgr on security change. Error %v", err))
}

Expand Down Expand Up @@ -1361,8 +1363,7 @@ func (idx *indexer) handleWorkerMsgs(msg Message) {
<-idx.tkCmdCh

case INDEX_STATS_DONE, INDEX_STATS_BROADCAST:
idx.clustMgrAgentCmdCh <- msg
<-idx.clustMgrAgentCmdCh
idx.sendMsgToClustMgr(msg)

case INDEXER_KEYSPACE_NOT_FOUND:
idx.handleKeyspaceNotFound(msg)
Expand Down Expand Up @@ -1624,8 +1625,9 @@ func (idx *indexer) handleConfigUpdate(msg Message) {
<-idx.ddlSrvMgrCmdCh
idx.schedIdxCreatorCmdCh <- msg
<-idx.schedIdxCreatorCmdCh
idx.clustMgrAgentCmdCh <- msg
<-idx.clustMgrAgentCmdCh

idx.sendMsgToClustMgr(msg)

idx.storageMgrCmdCh <- msg
<-idx.storageMgrCmdCh
idx.updateSliceWithConfig(newConfig)
Expand Down Expand Up @@ -2419,7 +2421,7 @@ func (idx *indexer) mergePartition(bucket string, streamId common.StreamId, sour
tgtInstVersion: uint64(target.Version),
respch: clustMgrRespch,
}
if err := idx.sendMsgToClusterMgr(msg); err != nil {
if err := idx.sendMsgToClustMgrAndProcessResponse(msg); err != nil {
common.CrashOnError(err)
}

Expand Down Expand Up @@ -3630,7 +3632,7 @@ func (idx *indexer) resetSingleIndexOnRollback(inst *common.IndexInst,
respch: respch,
}

if err := idx.sendMsgToClusterMgr(msg); err != nil {
if err := idx.sendMsgToClustMgrAndProcessResponse(msg); err != nil {
common.CrashOnError(err)
}

Expand Down Expand Up @@ -4524,8 +4526,7 @@ func (idx *indexer) shutdownWorkers() {

if idx.enableManager {
//shutdown cluster manager
idx.clustMgrAgentCmdCh <- &MsgGeneral{mType: CLUST_MGR_AGENT_SHUTDOWN}
<-idx.clustMgrAgentCmdCh
idx.sendMsgToClustMgr(&MsgGeneral{mType: CLUST_MGR_AGENT_SHUTDOWN})
}

//shutdown kv sender
Expand Down Expand Up @@ -4783,6 +4784,16 @@ func (idx *indexer) sendMsgToKVSender(cmd Message) {
<-idx.kvSenderCmdCh
}

func (idx *indexer) sendMsgToClustMgr(cmd Message) (Message, bool) {
idx.clustMgrLock.Lock()
defer idx.clustMgrLock.Unlock()

//send stream update to kv sender
idx.clustMgrAgentCmdCh <- cmd
resp, ok := <-idx.clustMgrAgentCmdCh
return resp, ok
}

// sendStreamUpdateToWorker synchronously sends a message to a worker and awaits the
// reply or channel death, logging and returning an error unless it receives a success message.
func (idx *indexer) sendStreamUpdateToWorker(cmd Message, workerCmdCh MsgChannel,
Expand Down Expand Up @@ -5113,8 +5124,16 @@ func (idx *indexer) sendUpdatedKeyspaceStatsMapToWorker(msgUpdateKeyspaceStatsMa
func (idx *indexer) sendMessageToWorker(msg Message, workerCmdCh chan Message, workerStr string) error {

if msg != nil {
workerCmdCh <- msg
if resp, ok := <-workerCmdCh; ok {
var resp Message
var ok bool
if workerCmdCh == idx.clustMgrAgentCmdCh && workerStr == "clusterMgrAgent" {
resp, ok = idx.sendMsgToClustMgr(msg)
} else {
workerCmdCh <- msg
resp, ok = <-workerCmdCh
}

if ok {
if resp.GetMsgType() == MSG_ERROR {
logging.Errorf("Indexer::sendMessageToWorker - Error received from %v processing "+
"Msg %v Err %v. Aborted.", workerStr, msg, resp)
Expand Down Expand Up @@ -6944,13 +6963,13 @@ func (idx *indexer) handleStorageWarmupDone(msg Message) {
<-idx.scanCoordCmdCh

// Persist node uuid in Metadata store
idx.clustMgrAgentCmdCh <- &MsgClustMgrLocal{
clustMgrMsg := &MsgClustMgrLocal{
mType: CLUST_MGR_SET_LOCAL,
key: INDEXER_NODE_UUID,
value: idx.config["nodeuuid"].String(),
}

respMsg := <-idx.clustMgrAgentCmdCh
respMsg, _ := idx.sendMsgToClustMgr(clustMgrMsg)
resp := respMsg.(*MsgClustMgrLocal)

errMsg := resp.GetError()
Expand Down Expand Up @@ -6985,12 +7004,12 @@ func (idx *indexer) handleReadPersistedStats(msg Message) {
func (idx *indexer) bootstrap2() error {

if common.GetStorageMode() == common.MOI {
idx.clustMgrAgentCmdCh <- &MsgClustMgrLocal{
clustMgrMsg := &MsgClustMgrLocal{
mType: CLUST_MGR_GET_LOCAL,
key: INDEXER_STATE_KEY,
}

respMsg := <-idx.clustMgrAgentCmdCh
respMsg, _ := idx.sendMsgToClustMgr(clustMgrMsg)
resp := respMsg.(*MsgClustMgrLocal)

val := resp.GetValue()
Expand Down Expand Up @@ -7032,7 +7051,7 @@ func (idx *indexer) bootstrap2() error {

// ready to process DDL
msg := &MsgClustMgrUpdate{mType: CLUST_MGR_INDEXER_READY}
if err := idx.sendMsgToClusterMgr(msg); err != nil {
if err := idx.sendMsgToClustMgrAndProcessResponse(msg); err != nil {
return err
}

Expand Down Expand Up @@ -7062,12 +7081,12 @@ func (idx *indexer) bootstrap2() error {

func (idx *indexer) recoverRebalanceState() {

idx.clustMgrAgentCmdCh <- &MsgClustMgrLocal{
clustMgrMsg := &MsgClustMgrLocal{
mType: CLUST_MGR_GET_LOCAL,
key: RebalanceRunning,
}

respMsg := <-idx.clustMgrAgentCmdCh
respMsg, _ := idx.sendMsgToClustMgr(clustMgrMsg)
resp := respMsg.(*MsgClustMgrLocal)

val := resp.GetValue()
Expand All @@ -7083,12 +7102,12 @@ func (idx *indexer) recoverRebalanceState() {
idx.rebalanceRunning = false
}

idx.clustMgrAgentCmdCh <- &MsgClustMgrLocal{
clustMgrMsg = &MsgClustMgrLocal{
mType: CLUST_MGR_GET_LOCAL,
key: RebalanceTokenTag,
}

respMsg = <-idx.clustMgrAgentCmdCh
respMsg, _ = idx.sendMsgToClustMgr(clustMgrMsg)
resp = respMsg.(*MsgClustMgrLocal)

val = resp.GetValue()
Expand Down Expand Up @@ -7154,12 +7173,12 @@ func (idx *indexer) genIndexerId() {
if idx.enableManager {

//try to fetch IndexerId from manager
idx.clustMgrAgentCmdCh <- &MsgClustMgrLocal{
clustMgrMsg := &MsgClustMgrLocal{
mType: CLUST_MGR_GET_LOCAL,
key: INDEXER_ID_KEY,
}

respMsg := <-idx.clustMgrAgentCmdCh
respMsg, _ := idx.sendMsgToClustMgr(clustMgrMsg)
resp := respMsg.(*MsgClustMgrLocal)

val := resp.GetValue()
Expand All @@ -7178,13 +7197,13 @@ func (idx *indexer) genIndexerId() {
//}

idx.id = idx.config["nodeuuid"].String()
idx.clustMgrAgentCmdCh <- &MsgClustMgrLocal{
clustMgrMsg := &MsgClustMgrLocal{
mType: CLUST_MGR_SET_LOCAL,
key: INDEXER_ID_KEY,
value: idx.id,
}

respMsg := <-idx.clustMgrAgentCmdCh
respMsg, _ := idx.sendMsgToClustMgr(clustMgrMsg)
resp := respMsg.(*MsgClustMgrLocal)

errMsg := resp.GetError()
Expand Down Expand Up @@ -7416,7 +7435,7 @@ func (idx *indexer) forceCleanupIndexPartition(indexInst *common.IndexInst,
updateStatusOnly: true,
}

if err := idx.sendMsgToClusterMgr(msg); err != nil {
if err := idx.sendMsgToClustMgrAndProcessResponse(msg); err != nil {
logging.Errorf("Indexer::forceCleanupIndexPartition %v %v Got error %v in marking metadata as deleted",
indexInst.InstId, partnId, err)
common.CrashOnError(err)
Expand Down Expand Up @@ -7456,7 +7475,7 @@ func (idx *indexer) forceCleanupIndexPartition(indexInst *common.IndexInst,
partnId: partnInst.Defn.GetPartitionId(),
}

if err := idx.sendMsgToClusterMgr(msg); err != nil {
if err := idx.sendMsgToClustMgrAndProcessResponse(msg); err != nil {
logging.Errorf("Indexer::forceCleanupIndexPartition %v %v Got error %v in deleting metadata. "+
"Metadata will be deleted on next indexer restart.", indexInst.InstId, partnId, err)
}
Expand All @@ -7474,9 +7493,9 @@ func (idx *indexer) recoverIndexInstMap() error {

func (idx *indexer) recoverInstMapFromManager() error {

idx.clustMgrAgentCmdCh <- &MsgClustMgrTopology{}
clustMgrMsg := &MsgClustMgrTopology{}

resp := <-idx.clustMgrAgentCmdCh
resp, _ := idx.sendMsgToClustMgr(clustMgrMsg)

switch resp.GetMsgType() {

Expand Down Expand Up @@ -7650,7 +7669,7 @@ func (idx *indexer) upgradeSingleIndex(inst *common.IndexInst, storageMode commo
msg := &MsgClustMgrResetIndexOnUpgrade{
inst: *inst,
}
idx.sendMsgToClusterMgr(msg)
idx.sendMsgToClustMgrAndProcessResponse(msg)
}

func (idx *indexer) validateIndexInstMap() {
Expand Down Expand Up @@ -8118,7 +8137,7 @@ func (idx *indexer) updateMetaInfoForIndexList(instIdList []common.IndexInstId,
syncUpdate: syncUpdate,
respCh: respCh}

return idx.sendMsgToClusterMgr(msg)
return idx.sendMsgToClustMgrAndProcessResponse(msg)

}

Expand All @@ -8132,44 +8151,44 @@ func (idx *indexer) updateMetaInfoForDeleteKeyspace(bucket,
collection: collection,
streamId: streamId}

return idx.sendMsgToClusterMgr(msg)
return idx.sendMsgToClustMgrAndProcessResponse(msg)
}

func (idx *indexer) cleanupIndexMetadata(indexInst common.IndexInst) error {

temp := indexInst
temp.Pc = nil
msg := &MsgClustMgrUpdate{mType: CLUST_MGR_CLEANUP_INDEX, indexList: []common.IndexInst{temp}}
return idx.sendMsgToClusterMgr(msg)
return idx.sendMsgToClustMgrAndProcessResponse(msg)
}

func (idx *indexer) sendMsgToClusterMgr(msg Message) error {
func (idx *indexer) sendMsgToClustMgrAndProcessResponse(msg Message) error {

idx.clustMgrAgentCmdCh <- msg
res, ok := idx.sendMsgToClustMgr(msg)

if res, ok := <-idx.clustMgrAgentCmdCh; ok {
if ok {

switch res.GetMsgType() {

case MSG_SUCCESS:
return nil

case MSG_ERROR:
logging.Errorf("Indexer::sendMsgToClusterMgr Error "+
logging.Errorf("Indexer::sendMsgToClustMgrAndProcessResponse Error "+
"from Cluster Manager %v", res)
err := res.(*MsgError).GetError()
return err.cause

default:
logging.Fatalf("Indexer::sendMsgToClusterMgr Unknown Response "+
logging.Fatalf("Indexer::sendMsgToClustMgrAndProcessResponse Unknown Response "+
"from Cluster Manager %v", res)
common.CrashOnError(errors.New("Unknown Response"))

}

} else {

logging.Fatalf("clustMgrAgent::sendMsgToClusterMgr Unexpected Channel Close " +
logging.Fatalf("clustMgrAgent::sendMsgToClustMgrAndProcessResponse Unexpected Channel Close " +
"from Cluster Manager")
common.CrashOnError(errors.New("Unknown Response"))

Expand Down Expand Up @@ -8226,16 +8245,15 @@ func (idx *indexer) handleSetLocalMeta(msg Message) {
}
}

idx.clustMgrAgentCmdCh <- msg
respMsg := <-idx.clustMgrAgentCmdCh
respMsg, _ := idx.sendMsgToClustMgr(msg)

err := respMsg.(*MsgClustMgrLocal).GetError()
if err == nil {
if key == RebalanceRunning {
idx.rebalanceRunning = true

msg := &MsgClustMgrUpdate{mType: CLUST_MGR_REBALANCE_RUNNING}
idx.sendMsgToClusterMgr(msg)
idx.sendMsgToClustMgrAndProcessResponse(msg)

} else if key == RebalanceTokenTag {
var rebalToken RebalanceToken
Expand All @@ -8250,17 +8268,15 @@ func (idx *indexer) handleSetLocalMeta(msg Message) {

func (idx *indexer) handleGetLocalMeta(msg Message) {

idx.clustMgrAgentCmdCh <- msg
respMsg := <-idx.clustMgrAgentCmdCh
respMsg, _ := idx.sendMsgToClustMgr(msg)
respch := msg.(*MsgClustMgrLocal).GetRespCh()
respch <- respMsg

}

func (idx *indexer) handleDelLocalMeta(msg Message) {

idx.clustMgrAgentCmdCh <- msg
respMsg := <-idx.clustMgrAgentCmdCh
respMsg, _ := idx.sendMsgToClustMgr(msg)

key := msg.(*MsgClustMgrLocal).GetKey()

Expand Down Expand Up @@ -8759,8 +8775,7 @@ func (idx *indexer) setProfilerOptions(config common.Config) {

func (idx *indexer) getIndexInstForKeyspaceId(keyspaceId string) ([]common.IndexInstId, error) {

idx.clustMgrAgentCmdCh <- &MsgClustMgrTopology{}
resp := <-idx.clustMgrAgentCmdCh
resp, _ := idx.sendMsgToClustMgr(&MsgClustMgrTopology{})

var result []common.IndexInstId = nil

Expand Down Expand Up @@ -9073,13 +9088,13 @@ func (idx *indexer) handleIndexerPause(msg Message) {
}

//Send message to index manager to update the internal state
idx.clustMgrAgentCmdCh <- &MsgClustMgrLocal{
clustMgrMsg := &MsgClustMgrLocal{
mType: CLUST_MGR_SET_LOCAL,
key: INDEXER_STATE_KEY,
value: fmt.Sprintf("%s", common.INDEXER_PAUSED),
}

respMsg := <-idx.clustMgrAgentCmdCh
respMsg, _ := idx.sendMsgToClustMgr(clustMgrMsg)
resp := respMsg.(*MsgClustMgrLocal)

errMsg := resp.GetError()
Expand Down Expand Up @@ -9159,13 +9174,13 @@ func (idx *indexer) doUnpause() {
//Notify Index Manager
//TODO Need to make sure the DDLs don't start getting
//processed before stream requests
idx.clustMgrAgentCmdCh <- &MsgClustMgrLocal{
clustMgrMsg := &MsgClustMgrLocal{
mType: CLUST_MGR_SET_LOCAL,
key: INDEXER_STATE_KEY,
value: fmt.Sprintf("%s", common.INDEXER_ACTIVE),
}

respMsg := <-idx.clustMgrAgentCmdCh
respMsg, _ := idx.sendMsgToClustMgr(clustMgrMsg)
resp := respMsg.(*MsgClustMgrLocal)

errMsg := resp.GetError()
Expand Down

0 comments on commit 9c5bd8a

Please sign in to comment.