diff --git a/secondary/common/cluster_info.go b/secondary/common/cluster_info.go index fd0a7181c..38659571a 100644 --- a/secondary/common/cluster_info.go +++ b/secondary/common/cluster_info.go @@ -150,6 +150,51 @@ func (c *ClusterInfoCache) SetServicePorts(portMap map[string]string) { } +func (c *ClusterInfoCache) Connect() (err error) { + c.client, err = couchbase.Connect(c.url) + if err != nil { + return err + } + + c.client.SetUserAgent(c.userAgent) + return nil +} + +// Note: This function does not fetch BucketMap and Manifest data in c.pool +func (c *ClusterInfoCache) FetchNodesData() (err error) { + c.pool, err = c.client.CallPoolURI(c.poolName) + if err != nil { + return err + } + + c.updateNodesData() + + found := false + for _, node := range c.nodes { + if node.ThisNode { + found = true + } + } + + if !found { + return errors.New("Current node's cluster membership is not active") + } + return nil +} + +func (c *ClusterInfoCache) FetchNodeSvsData() (err error) { + var poolServs couchbase.PoolServices + + poolServs, err = c.client.GetPoolServices(c.poolName) + if err != nil { + return err + } + + c.nodesvs = poolServs.NodesExt + c.buildEncryptPortMapping() + return nil +} + // TODO: In many places (e.g. lifecycle manager), cluster info cache // refresh is required only for one bucket. It is sub-optimal to update // the cluster info for all the buckets. Add a new method @@ -378,6 +423,54 @@ func (c *ClusterInfoCache) FetchForPoolChange() error { return rh.Run() } +func (c *ClusterInfoCache) FetchNodesAndSvsInfoWithLock() (err error) { + c.Lock() + defer c.Unlock() + + return c.FetchNodesAndSvsInfo() +} + +func (c *ClusterInfoCache) FetchNodesAndSvsInfo() (err error) { + fn := func(r int, err error) error { + if r > 0 { + logging.Infof("%vError occured during nodes and nodesvs update (%v) .. Retrying(%d)", + c.logPrefix, err, r) + } + + vretry := 0 + retry: + if err = c.Connect(); err != nil { + return err + } + + if err = c.FetchNodesData(); err != nil { + return err + } + + if err = c.FetchNodeSvsData(); err != nil { + return err + } + + if !c.validateCache(c.client.Info.IsIPv6) { + if vretry < CLUSTER_INFO_VALIDATION_RETRIES { + vretry++ + logging.Infof("%vValidation Failed while updating nodes and nodesvs.. Retrying(%d)", + c.logPrefix, vretry) + goto retry + } else { + logging.Errorf("%vValidation Failed while updating nodes and nodesvs.. %v", + c.logPrefix, c) + return ErrValidationFailed + } + } + + return nil + } + + rh := NewRetryHelper(c.retries, time.Second*2, 1, fn) + return rh.Run() +} + func (c *ClusterInfoCache) FetchManifestInfoOnUIDChange(bucketName string, muid string) error { c.Lock() defer c.Unlock() diff --git a/secondary/indexer/cluster_manager_agent.go b/secondary/indexer/cluster_manager_agent.go index 0ce723b86..46330ea3d 100644 --- a/secondary/indexer/cluster_manager_agent.go +++ b/secondary/indexer/cluster_manager_agent.go @@ -407,15 +407,38 @@ func (c *clustMgrAgent) handleGetGlobalTopology(cmd Message) { defer metaIter.Close() indexInstMap := make(common.IndexInstMap) + topoCache := make(map[string]map[string]map[string]*manager.IndexTopology) + + var delTokens map[common.IndexDefnId]*mc.DeleteCommandToken + delTokens, err = mc.FetchIndexDefnToDeleteCommandTokensMap() + if err != nil { + logging.Warnf("ClustMgr:handleGetGlobalTopology: Error in FetchIndexDefnToDeleteCommandTokensMap %v", err) + } + + var dropTokens map[common.IndexDefnId][]*mc.DropInstanceCommandToken + dropTokens, err = mc.FetchIndexDefnToDropInstanceCommandTokenMap() + if err != nil { + logging.Warnf("ClustMgr:handleGetGlobalTopology: Error in FetchIndexDefnToDropInstanceCommandTokenMap %v", err) + } for _, defn, err := metaIter.Next(); err == nil; _, defn, err = metaIter.Next() { var idxDefn common.IndexDefn idxDefn = *defn - t, e := c.mgr.GetTopologyByCollection(idxDefn.Bucket, idxDefn.Scope, idxDefn.Collection) - if e != nil { - common.CrashOnError(e) + t := topoCache[idxDefn.Bucket][idxDefn.Scope][idxDefn.Collection] + if t == nil { + t, err = c.mgr.GetTopologyByCollection(idxDefn.Bucket, idxDefn.Scope, idxDefn.Collection) + if err != nil { + common.CrashOnError(err) + } + if _, ok := topoCache[idxDefn.Bucket]; !ok { + topoCache[idxDefn.Bucket] = make(map[string]map[string]*manager.IndexTopology) + } + if _, ok := topoCache[idxDefn.Bucket][idxDefn.Scope]; !ok { + topoCache[idxDefn.Bucket][idxDefn.Scope] = make(map[string]*manager.IndexTopology) + } + topoCache[idxDefn.Bucket][idxDefn.Scope][idxDefn.Collection] = t } if t == nil { logging.Warnf("ClustMgr:handleGetGlobalTopology Index Instance Not "+ @@ -464,14 +487,31 @@ func (c *clustMgrAgent) handleGetGlobalTopology(cmd Message) { } if idxInst.State != common.INDEX_STATE_DELETED { - exist1, err := mc.DeleteCommandTokenExist(idxDefn.DefnId) - if err != nil { - logging.Warnf("Error when reading delete command token for defn %v", idxDefn.DefnId) + var exist1, exist2 bool + var err error + + if delTokens != nil { + _, exist1 = delTokens[idxDefn.DefnId] + } else { + exist1, err = mc.DeleteCommandTokenExist(idxDefn.DefnId) + if err != nil { + logging.Warnf("Error when reading delete command token for defn %v", idxDefn.DefnId) + } } - exist2, err := mc.DropInstanceCommandTokenExist(idxDefn.DefnId, idxInst.InstId) - if err != nil { - logging.Warnf("Error when reading delete command token for index (%v, %v)", idxDefn.DefnId, idxInst.InstId) + if dropTokens != nil { + dropTokenList := dropTokens[idxDefn.DefnId] + for _, dropToken := range dropTokenList { + if dropToken.InstId == idxDefn.InstId { + exist2 = true + break + } + } + } else { + exist2, err = mc.DropInstanceCommandTokenExist(idxDefn.DefnId, idxInst.InstId) + if err != nil { + logging.Warnf("Error when reading drop command token for index (%v, %v)", idxDefn.DefnId, idxInst.InstId) + } } if exist1 || exist2 { diff --git a/secondary/indexer/indexer.go b/secondary/indexer/indexer.go index a8404df3c..f8220c8eb 100644 --- a/secondary/indexer/indexer.go +++ b/secondary/indexer/indexer.go @@ -120,10 +120,11 @@ type indexer struct { keyspaceIdCreateClientChMap map[string]MsgChannel wrkrRecvCh MsgChannel //channel to receive messages from workers - internalRecvCh MsgChannel //buffered channel to queue worker requests - adminRecvCh MsgChannel //channel to receive admin messages - internalAdminRecvCh MsgChannel //internal channel to receive admin messages - internalAdminRespCh MsgChannel //internal channel to respond admin messages + // (SettingsManager, MutationManager, KVSender, Timekeeper) + internalRecvCh MsgChannel //buffered channel to queue worker requests (most MsgXyz messages) + adminRecvCh MsgChannel //channel to receive admin messages (ClustMgrAgent) + internalAdminRecvCh MsgChannel //internal channel to receive admin messages (DDL; unbuffered channel) + internalAdminRespCh MsgChannel //internal channel to respond admin messages (DDL; unbuffered channel) shutdownInitCh MsgChannel //internal shutdown channel for indexer shutdownCompleteCh MsgChannel //indicate shutdown completion @@ -859,88 +860,165 @@ func (idx *indexer) releaseStreamRequestLock(req *kvRequest) { } } +const MSG_LOOP_MARKER string = "msg_loop" // to grep for all the following logXyz messages +const MSG_PROCESSING_SLOW time.Duration = time.Minute // threshold for warning of slow message processing + +// logProcessingTime logs the time it took to process a message. To avoid log flooding, this logs at levels +// Warn -- if time taken is long, regardless of forceLog flag +// Info -- if time taken is short but forceLog flag is true (admin messages = DDL) +// Debug -- if time taken is short and forceLog flag is false +// classMethod is logging prefix of form "class::method". +func logProcessingTime(classMethod string, msg Message, channel string, timeTaken time.Duration, forceLog bool) { + + if timeTaken > MSG_PROCESSING_SLOW { + logging.Warnf("%v:%v: %v message from %v channel processing took %v > %v", + classMethod, MSG_LOOP_MARKER, msg.GetMsgType().String(), channel, timeTaken, MSG_PROCESSING_SLOW) + return + } + + var loggingFunc func(format string, v ...interface{}) + if forceLog { + loggingFunc = logging.Infof + } else if logging.IsEnabled(logging.Debug) { + loggingFunc = logging.Debugf + } + if loggingFunc != nil { + loggingFunc("%v:%v: %v message from %v channel processing took %v", + classMethod, MSG_LOOP_MARKER, msg.GetMsgType().String(), channel, timeTaken) + } +} + +// logStreamRequestLockTime logs the time consumed waiting for stream request locks in drop processing. +// If this is slow it logs at Warn level, else Info. +// classMethod is logging prefix of form "class::method". +func logStreamRequestLockTime(classMethod string, msg Message, channel string, timeTaken time.Duration) { + if timeTaken <= MSG_PROCESSING_SLOW { + logging.Infof("%v:%v: %v message from %v channel stream request lock waits took %v", + classMethod, MSG_LOOP_MARKER, msg.GetMsgType().String(), channel, timeTaken) + } else { + logging.Warnf("%v:%v: %v message from %v channel stream request lock waits took %v > %v", + classMethod, MSG_LOOP_MARKER, msg.GetMsgType().String(), channel, timeTaken, MSG_PROCESSING_SLOW) + } +} + +// logShutdownStart logs a shutdown starting message for a non-trivial message processing loop shutdown. +// classMethod is logging prefix of form "class::method". +func logShutdownStart(classMethod string) { + logging.Infof("%v:%v: shutdown starting", + classMethod, MSG_LOOP_MARKER) +} + +// logShutdownComplete logs a standard shutdown complete message for a message procesing loop. +// classMethod is logging prefix of form "class::method". +func logShutdownComplete(classMethod string) { + logging.Infof("%v:%v: shutdown complete", + classMethod, MSG_LOOP_MARKER) +} + //run starts the main loop for the indexer func (idx *indexer) run() { + const classMethod string = "Indexer::run" // for logging - go idx.listenWorkerMsgs() - go idx.listenAdminMsgs() + go idx.listenWorkerMsgs() // wrkrRecvCh + go idx.listenAdminMsgs() // adminRecvCh for { + var msg Message + var ok bool // needed as "msg, ok := <-" shadows msg when creating ok + var receiveTime time.Time // time msg was received + var channel string // name of channel msg came from + var forceLog bool // admin messages force logging of processing time + // internalRecvCh and internalAdminRecvCh chosen at random per Go official behavior select { - case msg, ok := <-idx.internalRecvCh: + case msg, ok = <-idx.internalRecvCh: if ok { + receiveTime = time.Now() + channel = "internalRecvCh" + forceLog = false idx.handleWorkerMsgs(msg) } - case msg, ok := <-idx.internalAdminRecvCh: + case msg, ok = <-idx.internalAdminRecvCh: if ok { + receiveTime = time.Now() + channel = "internalAdminRecvCh" + forceLog = true resp := idx.handleAdminMsgs(msg) idx.internalAdminRespCh <- resp } case <-idx.shutdownInitCh: - //send shutdown to all workers + logShutdownStart(classMethod) + //send shutdown to all workers idx.shutdownWorkers() //close the shutdown complete channel to indicate //all workers are shutdown close(idx.shutdownCompleteCh) - return + logShutdownComplete(classMethod) + return } + logProcessingTime(classMethod, msg, channel, time.Since(receiveTime), forceLog) } - } -//run starts the main loop for the indexer +// listenAdminMsgs is the message processing loop for the adminRecvCh channel. func (idx *indexer) listenAdminMsgs() { - - waitForStream := true + const classMethod string = "Indexer::listenAdminMsgs" // for logging for { + var msg Message + var ok bool // needed as "msg, ok := <-" shadows msg when creating ok + var receiveTime time.Time // time msg was received + select { - case msg, ok := <-idx.adminRecvCh: + case msg, ok = <-idx.adminRecvCh: if ok { + receiveTime = time.Now() + // internalAdminRecvCh size is 1. So it will blocked if the previous msg is being // processed. idx.internalAdminRecvCh <- msg resp := <-idx.internalAdminRespCh - if waitForStream { - // now that indexer has processed the message. Let's make sure that - // the stream request is finished before processing the next admin - // msg. This is done by acquiring a lock on the stream request for each - // bucket (on both streams). The lock is FIFO, so if this function - // can get a lock, it will mean that previous stream request would have - // been cleared. - - //create and build don't need to be checked - //create doesn't take stream lock. build only works - //on a fresh stream. - if msg.GetMsgType() == CLUST_MGR_DROP_INDEX_DDL || - msg.GetMsgType() == CLUST_MGR_PRUNE_PARTITION { - - if resp.GetMsgType() == MSG_SUCCESS_DROP { - streamId := resp.(*MsgSuccessDrop).GetStreamId() - keyspaceId := resp.(*MsgSuccessDrop).GetKeyspaceId() - - f := func(streamId common.StreamId, keyspaceId string) { - lock := idx.acquireStreamRequestLock(keyspaceId, streamId) - defer idx.releaseStreamRequestLock(lock) - idx.waitStreamRequestLock(lock) - } - - f(streamId, keyspaceId) + // now that indexer has processed the message. Let's make sure that + // the stream request is finished before processing the next admin + // msg. This is done by acquiring a lock on the stream request for each + // bucket (on both streams). The lock is FIFO, so if this function + // can get a lock, it will mean that previous stream request would have + // been cleared. + + //create and build don't need to be checked + //create doesn't take stream lock. build only works + //on a fresh stream. + if msg.GetMsgType() == CLUST_MGR_DROP_INDEX_DDL || + msg.GetMsgType() == CLUST_MGR_PRUNE_PARTITION { + + if resp.GetMsgType() == MSG_SUCCESS_DROP { + streamId := resp.(*MsgSuccessDrop).GetStreamId() + keyspaceId := resp.(*MsgSuccessDrop).GetKeyspaceId() + + f := func(streamId common.StreamId, keyspaceId string) { + lock := idx.acquireStreamRequestLock(keyspaceId, streamId) + defer idx.releaseStreamRequestLock(lock) + idx.waitStreamRequestLock(lock) } + + startStreamRequestLockTime := time.Now() + f(streamId, keyspaceId) + logStreamRequestLockTime(classMethod, msg, "adminRecvCh", time.Since(startStreamRequestLockTime)) } } } case <-idx.shutdownInitCh: + logShutdownComplete(classMethod) return } + logProcessingTime(classMethod, msg, "adminRecvCh", time.Since(receiveTime), true) } } @@ -964,15 +1042,22 @@ func (idx *indexer) getKeyspaceIdForAdminMsg(msg Message) []string { } } +// listenWorkerMsgs is the message processing loop for the wrkrRecvCh channel. func (idx *indexer) listenWorkerMsgs() { + const classMethod string = "Indexer::listenWorkerMsgs" // for logging //listen to worker messages for { + var msg Message + var ok bool // needed as "msg, ok := <-" shadows msg when creating ok + var receiveTime time.Time // time msg was received select { - case msg, ok := <-idx.wrkrRecvCh: + case msg, ok = <-idx.wrkrRecvCh: if ok { + receiveTime = time.Now() + //handle high priority messages switch msg.GetMsgType() { case MSG_ERROR: @@ -986,10 +1071,11 @@ func (idx *indexer) listenWorkerMsgs() { case <-idx.shutdownInitCh: //exit the loop + logShutdownComplete(classMethod) return } + logProcessingTime(classMethod, msg, "wrkrRecvCh", time.Since(receiveTime), false) } - } func (idx *indexer) handleWorkerMsgs(msg Message) { @@ -6211,13 +6297,14 @@ func (idx *indexer) bootstrap1(snapshotNotifych []chan IndexSnapshot, snapshotRe idx.recoverRebalanceState() + start := time.Now() err := idx.recoverIndexInstMap() if err != nil { logging.Fatalf("Indexer::initFromPersistedState Error Recovering IndexInstMap %v", err) return err } - logging.Infof("Indexer::initFromPersistedState Recovered IndexInstMap %v", idx.indexInstMap) + logging.Infof("Indexer::initFromPersistedState Recovered IndexInstMap %v, elapsed: %v", idx.indexInstMap, time.Since(start)) idx.validateIndexInstMap() diff --git a/secondary/indexer/message.go b/secondary/indexer/message.go index 39bf113ac..5aaa11ea2 100644 --- a/secondary/indexer/message.go +++ b/secondary/indexer/message.go @@ -2375,12 +2375,23 @@ func (m MsgType) String() string { return "INDEXER_MERGE_PARTITION" case INDEXER_CANCEL_MERGE_PARTITION: return "INDEXER_CANCEL_MERGE_PARTITION" + case INDEXER_MTR_FAIL: + return "INDEXER_MTR_FAIL" case INDEXER_STORAGE_WARMUP_DONE: return "INDEXER_STORAGE_WARMUP_DONE" + case INDEXER_SECURITY_CHANGE: + return "INDEXER_SECURITY_CHANGE" + case INDEXER_RESET_INDEX_DONE: + return "INDEXER_RESET_INDEX_DONE" + case INDEXER_ACTIVE: + return "INDEXER_ACTIVE" case SCAN_COORD_SHUTDOWN: return "SCAN_COORD_SHUTDOWN" + case COMPACTION_MGR_SHUTDOWN: + return "COMPACTION_MGR_SHUTDOWN" + case UPDATE_INDEX_INSTANCE_MAP: return "UPDATE_INDEX_INSTANCE_MAP" case UPDATE_INDEX_PARTITION_MAP: @@ -2411,15 +2422,21 @@ func (m MsgType) String() string { case KV_SENDER_RESTART_VBUCKETS: return "KV_SENDER_RESTART_VBUCKETS" + case KV_SENDER_RESTART_VBUCKETS_RESPONSE: + return "KV_SENDER_RESTART_VBUCKETS_RESPONSE" case KV_SENDER_REPAIR_ENDPOINTS: return "KV_SENDER_REPAIR_ENDPOINTS" case KV_STREAM_REPAIR: return "KV_STREAM_REPAIR" + case MSG_SUCCESS_OPEN_STREAM: + return "MSG_SUCCESS_OPEN_STREAM" case CLUST_MGR_CREATE_INDEX_DDL: return "CLUST_MGR_CREATE_INDEX_DDL" case CLUST_MGR_BUILD_INDEX_DDL: return "CLUST_MGR_BUILD_INDEX_DDL" + case CLUST_MGR_BUILD_INDEX_DDL_RESPONSE: + return "CLUST_MGR_BUILD_INDEX_DDL_RESPONSE" case CLUST_MGR_DROP_INDEX_DDL: return "CLUST_MGR_DROP_INDEX_DDL" case CLUST_MGR_UPDATE_TOPOLOGY_FOR_INDEX: @@ -2474,11 +2491,21 @@ func (m MsgType) String() string { case CONFIG_SETTINGS_UPDATE: return "CONFIG_SETTINGS_UPDATE" - case STATS_RESET: - return "STATS_RESET" + case STORAGE_STATS: + return "STORAGE_STATS" + case SCAN_STATS: + return "SCAN_STATS" + case INDEX_PROGRESS_STATS: + return "INDEX_PROGRESS_STATS" + case INDEXER_STATS: + return "INDEXER_STATS" + case INDEX_STATS_DONE: + return "INDEX_STATS_DONE" case INDEX_STATS_BROADCAST: return "INDEX_STATS_BROADCAST" + case STATS_RESET: + return "STATS_RESET" case STATS_PERSISTER_START: return "STATS_PERSISTER_START" case STATS_PERSISTER_STOP: @@ -2490,6 +2517,9 @@ func (m MsgType) String() string { case STATS_READ_PERSISTED_STATS: return "STATS_READ_PERSISTED_STATS" + case INDEXER_DDL_IN_PROGRESS_RESPONSE: + return "INDEXER_DDL_IN_PROGRESS_RESPONSE" + default: return "UNKNOWN_MSG_TYPE" } diff --git a/secondary/queryport/client/meta_client.go b/secondary/queryport/client/meta_client.go index b7cd77f53..e956691eb 100644 --- a/secondary/queryport/client/meta_client.go +++ b/secondary/queryport/client/meta_client.go @@ -1501,7 +1501,7 @@ func (b *metadataClient) updateIndexerList(discardExisting bool) error { return err } cinfo.SetUserAgent("updateIndexerList") - if err := cinfo.Fetch(); err != nil { + if err := cinfo.FetchNodesAndSvsInfoWithLock(); err != nil { return err } @@ -2038,14 +2038,16 @@ func (b *metadataClient) watchClusterChanges() { ch := scn.GetNotifyCh() for { select { - case _, ok := <-ch: + case n, ok := <-ch: if !ok { selfRestart() return - } else if err := b.updateIndexerList(false); err != nil { - logging.Errorf("updateIndexerList(): %v\n", err) - selfRestart() - return + } else if n.Type != common.CollectionManifestChangeNotification { + if err := b.updateIndexerList(false); err != nil { + logging.Errorf("updateIndexerList(): %v\n", err) + selfRestart() + return + } } case _, ok := <-b.mdNotifyCh: if ok {