From 057246842eac576a5098ee33ffb00e3efb6ebf5f Mon Sep 17 00:00:00 2001 From: Sai Krishna Teja Kommaraju Date: Tue, 27 Apr 2021 19:11:42 +0530 Subject: [PATCH 1/4] MB-45741 : Optimize ns_server calls in updateIndexerList, query client Change-Id: I9bf63ffba33dc4e17b670e8ec17757b236162b81 --- secondary/common/cluster_info.go | 93 +++++++++++++++++++++++ secondary/queryport/client/meta_client.go | 2 +- 2 files changed, 94 insertions(+), 1 deletion(-) diff --git a/secondary/common/cluster_info.go b/secondary/common/cluster_info.go index fd0a7181c..38659571a 100644 --- a/secondary/common/cluster_info.go +++ b/secondary/common/cluster_info.go @@ -150,6 +150,51 @@ func (c *ClusterInfoCache) SetServicePorts(portMap map[string]string) { } +func (c *ClusterInfoCache) Connect() (err error) { + c.client, err = couchbase.Connect(c.url) + if err != nil { + return err + } + + c.client.SetUserAgent(c.userAgent) + return nil +} + +// Note: This function does not fetch BucketMap and Manifest data in c.pool +func (c *ClusterInfoCache) FetchNodesData() (err error) { + c.pool, err = c.client.CallPoolURI(c.poolName) + if err != nil { + return err + } + + c.updateNodesData() + + found := false + for _, node := range c.nodes { + if node.ThisNode { + found = true + } + } + + if !found { + return errors.New("Current node's cluster membership is not active") + } + return nil +} + +func (c *ClusterInfoCache) FetchNodeSvsData() (err error) { + var poolServs couchbase.PoolServices + + poolServs, err = c.client.GetPoolServices(c.poolName) + if err != nil { + return err + } + + c.nodesvs = poolServs.NodesExt + c.buildEncryptPortMapping() + return nil +} + // TODO: In many places (e.g. lifecycle manager), cluster info cache // refresh is required only for one bucket. It is sub-optimal to update // the cluster info for all the buckets. Add a new method @@ -378,6 +423,54 @@ func (c *ClusterInfoCache) FetchForPoolChange() error { return rh.Run() } +func (c *ClusterInfoCache) FetchNodesAndSvsInfoWithLock() (err error) { + c.Lock() + defer c.Unlock() + + return c.FetchNodesAndSvsInfo() +} + +func (c *ClusterInfoCache) FetchNodesAndSvsInfo() (err error) { + fn := func(r int, err error) error { + if r > 0 { + logging.Infof("%vError occured during nodes and nodesvs update (%v) .. Retrying(%d)", + c.logPrefix, err, r) + } + + vretry := 0 + retry: + if err = c.Connect(); err != nil { + return err + } + + if err = c.FetchNodesData(); err != nil { + return err + } + + if err = c.FetchNodeSvsData(); err != nil { + return err + } + + if !c.validateCache(c.client.Info.IsIPv6) { + if vretry < CLUSTER_INFO_VALIDATION_RETRIES { + vretry++ + logging.Infof("%vValidation Failed while updating nodes and nodesvs.. Retrying(%d)", + c.logPrefix, vretry) + goto retry + } else { + logging.Errorf("%vValidation Failed while updating nodes and nodesvs.. %v", + c.logPrefix, c) + return ErrValidationFailed + } + } + + return nil + } + + rh := NewRetryHelper(c.retries, time.Second*2, 1, fn) + return rh.Run() +} + func (c *ClusterInfoCache) FetchManifestInfoOnUIDChange(bucketName string, muid string) error { c.Lock() defer c.Unlock() diff --git a/secondary/queryport/client/meta_client.go b/secondary/queryport/client/meta_client.go index b7cd77f53..35e6d6017 100644 --- a/secondary/queryport/client/meta_client.go +++ b/secondary/queryport/client/meta_client.go @@ -1501,7 +1501,7 @@ func (b *metadataClient) updateIndexerList(discardExisting bool) error { return err } cinfo.SetUserAgent("updateIndexerList") - if err := cinfo.Fetch(); err != nil { + if err := cinfo.FetchNodesAndSvsInfoWithLock(); err != nil { return err } From 300c7018b5d6ed9b2f43df0b95adbe8e2d1d96c1 Mon Sep 17 00:00:00 2001 From: Sai Krishna Teja Kommaraju Date: Wed, 28 Apr 2021 01:05:16 +0530 Subject: [PATCH 2/4] MB-45741 : Do not updateIndexerList on CollectionManifestChangeNotification Change-Id: I088bac87333cfd99eff3cff1ba20fa1891cd7088 --- secondary/queryport/client/meta_client.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/secondary/queryport/client/meta_client.go b/secondary/queryport/client/meta_client.go index 35e6d6017..e956691eb 100644 --- a/secondary/queryport/client/meta_client.go +++ b/secondary/queryport/client/meta_client.go @@ -2038,14 +2038,16 @@ func (b *metadataClient) watchClusterChanges() { ch := scn.GetNotifyCh() for { select { - case _, ok := <-ch: + case n, ok := <-ch: if !ok { selfRestart() return - } else if err := b.updateIndexerList(false); err != nil { - logging.Errorf("updateIndexerList(): %v\n", err) - selfRestart() - return + } else if n.Type != common.CollectionManifestChangeNotification { + if err := b.updateIndexerList(false); err != nil { + logging.Errorf("updateIndexerList(): %v\n", err) + selfRestart() + return + } } case _, ok := <-b.mdNotifyCh: if ok { From 5a8560d5dd5fc4e76a519c2a7d427e38b873bd1c Mon Sep 17 00:00:00 2001 From: Varun Velamuri Date: Thu, 29 Apr 2021 15:53:02 +0530 Subject: [PATCH 3/4] MB-45788 Reduce the time spent in recoverIndexInstMap during bootstrap a. Cache the topology of a definition to avoid multiple gometa calls b. Fetch all drop and delete tokens before hand to avoid multiple metakv calls Change-Id: Ia93104a5d047409513231518fd10809d6ccaca0d --- secondary/indexer/cluster_manager_agent.go | 58 ++++++++++++++++++---- secondary/indexer/indexer.go | 3 +- 2 files changed, 51 insertions(+), 10 deletions(-) diff --git a/secondary/indexer/cluster_manager_agent.go b/secondary/indexer/cluster_manager_agent.go index 0ce723b86..46330ea3d 100644 --- a/secondary/indexer/cluster_manager_agent.go +++ b/secondary/indexer/cluster_manager_agent.go @@ -407,15 +407,38 @@ func (c *clustMgrAgent) handleGetGlobalTopology(cmd Message) { defer metaIter.Close() indexInstMap := make(common.IndexInstMap) + topoCache := make(map[string]map[string]map[string]*manager.IndexTopology) + + var delTokens map[common.IndexDefnId]*mc.DeleteCommandToken + delTokens, err = mc.FetchIndexDefnToDeleteCommandTokensMap() + if err != nil { + logging.Warnf("ClustMgr:handleGetGlobalTopology: Error in FetchIndexDefnToDeleteCommandTokensMap %v", err) + } + + var dropTokens map[common.IndexDefnId][]*mc.DropInstanceCommandToken + dropTokens, err = mc.FetchIndexDefnToDropInstanceCommandTokenMap() + if err != nil { + logging.Warnf("ClustMgr:handleGetGlobalTopology: Error in FetchIndexDefnToDropInstanceCommandTokenMap %v", err) + } for _, defn, err := metaIter.Next(); err == nil; _, defn, err = metaIter.Next() { var idxDefn common.IndexDefn idxDefn = *defn - t, e := c.mgr.GetTopologyByCollection(idxDefn.Bucket, idxDefn.Scope, idxDefn.Collection) - if e != nil { - common.CrashOnError(e) + t := topoCache[idxDefn.Bucket][idxDefn.Scope][idxDefn.Collection] + if t == nil { + t, err = c.mgr.GetTopologyByCollection(idxDefn.Bucket, idxDefn.Scope, idxDefn.Collection) + if err != nil { + common.CrashOnError(err) + } + if _, ok := topoCache[idxDefn.Bucket]; !ok { + topoCache[idxDefn.Bucket] = make(map[string]map[string]*manager.IndexTopology) + } + if _, ok := topoCache[idxDefn.Bucket][idxDefn.Scope]; !ok { + topoCache[idxDefn.Bucket][idxDefn.Scope] = make(map[string]*manager.IndexTopology) + } + topoCache[idxDefn.Bucket][idxDefn.Scope][idxDefn.Collection] = t } if t == nil { logging.Warnf("ClustMgr:handleGetGlobalTopology Index Instance Not "+ @@ -464,14 +487,31 @@ func (c *clustMgrAgent) handleGetGlobalTopology(cmd Message) { } if idxInst.State != common.INDEX_STATE_DELETED { - exist1, err := mc.DeleteCommandTokenExist(idxDefn.DefnId) - if err != nil { - logging.Warnf("Error when reading delete command token for defn %v", idxDefn.DefnId) + var exist1, exist2 bool + var err error + + if delTokens != nil { + _, exist1 = delTokens[idxDefn.DefnId] + } else { + exist1, err = mc.DeleteCommandTokenExist(idxDefn.DefnId) + if err != nil { + logging.Warnf("Error when reading delete command token for defn %v", idxDefn.DefnId) + } } - exist2, err := mc.DropInstanceCommandTokenExist(idxDefn.DefnId, idxInst.InstId) - if err != nil { - logging.Warnf("Error when reading delete command token for index (%v, %v)", idxDefn.DefnId, idxInst.InstId) + if dropTokens != nil { + dropTokenList := dropTokens[idxDefn.DefnId] + for _, dropToken := range dropTokenList { + if dropToken.InstId == idxDefn.InstId { + exist2 = true + break + } + } + } else { + exist2, err = mc.DropInstanceCommandTokenExist(idxDefn.DefnId, idxInst.InstId) + if err != nil { + logging.Warnf("Error when reading drop command token for index (%v, %v)", idxDefn.DefnId, idxInst.InstId) + } } if exist1 || exist2 { diff --git a/secondary/indexer/indexer.go b/secondary/indexer/indexer.go index a8404df3c..288cdea65 100644 --- a/secondary/indexer/indexer.go +++ b/secondary/indexer/indexer.go @@ -6211,13 +6211,14 @@ func (idx *indexer) bootstrap1(snapshotNotifych []chan IndexSnapshot, snapshotRe idx.recoverRebalanceState() + start := time.Now() err := idx.recoverIndexInstMap() if err != nil { logging.Fatalf("Indexer::initFromPersistedState Error Recovering IndexInstMap %v", err) return err } - logging.Infof("Indexer::initFromPersistedState Recovered IndexInstMap %v", idx.indexInstMap) + logging.Infof("Indexer::initFromPersistedState Recovered IndexInstMap %v, elapsed: %v", idx.indexInstMap, time.Since(start)) idx.validateIndexInstMap() From dd4531740195c74a6917b314b7a8d8e5c0a9c748 Mon Sep 17 00:00:00 2001 From: Kevin Cherkauer Date: Wed, 28 Apr 2021 16:12:00 -0700 Subject: [PATCH 4/4] MB-45919 Part 1: Add timing and logging of slow indexer msg processing This patch adds tracking the time it takes to process each indexer message and logs info about this at: 1. Warn level for all slow messages, whatever their types 2. Info level for non-slow admin (DDL) messages 3. Debug level for non-slow non-admin messages Slow is defined as > 1 minute to process. The vast majority of messages fall into category 3 so will not be logged unless Debug or higher logging level is enabled. This avoids flooding the indexer.log. This new logging will enable us to more easily troubleshoot cases where the indexer becomes unresponsive. Change-Id: Ibc375bbd46016db0a9936a1fbf3a82a5f693af6d --- secondary/indexer/indexer.go | 170 ++++++++++++++++++++++++++--------- secondary/indexer/message.go | 34 ++++++- 2 files changed, 160 insertions(+), 44 deletions(-) diff --git a/secondary/indexer/indexer.go b/secondary/indexer/indexer.go index 288cdea65..f8220c8eb 100644 --- a/secondary/indexer/indexer.go +++ b/secondary/indexer/indexer.go @@ -120,10 +120,11 @@ type indexer struct { keyspaceIdCreateClientChMap map[string]MsgChannel wrkrRecvCh MsgChannel //channel to receive messages from workers - internalRecvCh MsgChannel //buffered channel to queue worker requests - adminRecvCh MsgChannel //channel to receive admin messages - internalAdminRecvCh MsgChannel //internal channel to receive admin messages - internalAdminRespCh MsgChannel //internal channel to respond admin messages + // (SettingsManager, MutationManager, KVSender, Timekeeper) + internalRecvCh MsgChannel //buffered channel to queue worker requests (most MsgXyz messages) + adminRecvCh MsgChannel //channel to receive admin messages (ClustMgrAgent) + internalAdminRecvCh MsgChannel //internal channel to receive admin messages (DDL; unbuffered channel) + internalAdminRespCh MsgChannel //internal channel to respond admin messages (DDL; unbuffered channel) shutdownInitCh MsgChannel //internal shutdown channel for indexer shutdownCompleteCh MsgChannel //indicate shutdown completion @@ -859,88 +860,165 @@ func (idx *indexer) releaseStreamRequestLock(req *kvRequest) { } } +const MSG_LOOP_MARKER string = "msg_loop" // to grep for all the following logXyz messages +const MSG_PROCESSING_SLOW time.Duration = time.Minute // threshold for warning of slow message processing + +// logProcessingTime logs the time it took to process a message. To avoid log flooding, this logs at levels +// Warn -- if time taken is long, regardless of forceLog flag +// Info -- if time taken is short but forceLog flag is true (admin messages = DDL) +// Debug -- if time taken is short and forceLog flag is false +// classMethod is logging prefix of form "class::method". +func logProcessingTime(classMethod string, msg Message, channel string, timeTaken time.Duration, forceLog bool) { + + if timeTaken > MSG_PROCESSING_SLOW { + logging.Warnf("%v:%v: %v message from %v channel processing took %v > %v", + classMethod, MSG_LOOP_MARKER, msg.GetMsgType().String(), channel, timeTaken, MSG_PROCESSING_SLOW) + return + } + + var loggingFunc func(format string, v ...interface{}) + if forceLog { + loggingFunc = logging.Infof + } else if logging.IsEnabled(logging.Debug) { + loggingFunc = logging.Debugf + } + if loggingFunc != nil { + loggingFunc("%v:%v: %v message from %v channel processing took %v", + classMethod, MSG_LOOP_MARKER, msg.GetMsgType().String(), channel, timeTaken) + } +} + +// logStreamRequestLockTime logs the time consumed waiting for stream request locks in drop processing. +// If this is slow it logs at Warn level, else Info. +// classMethod is logging prefix of form "class::method". +func logStreamRequestLockTime(classMethod string, msg Message, channel string, timeTaken time.Duration) { + if timeTaken <= MSG_PROCESSING_SLOW { + logging.Infof("%v:%v: %v message from %v channel stream request lock waits took %v", + classMethod, MSG_LOOP_MARKER, msg.GetMsgType().String(), channel, timeTaken) + } else { + logging.Warnf("%v:%v: %v message from %v channel stream request lock waits took %v > %v", + classMethod, MSG_LOOP_MARKER, msg.GetMsgType().String(), channel, timeTaken, MSG_PROCESSING_SLOW) + } +} + +// logShutdownStart logs a shutdown starting message for a non-trivial message processing loop shutdown. +// classMethod is logging prefix of form "class::method". +func logShutdownStart(classMethod string) { + logging.Infof("%v:%v: shutdown starting", + classMethod, MSG_LOOP_MARKER) +} + +// logShutdownComplete logs a standard shutdown complete message for a message procesing loop. +// classMethod is logging prefix of form "class::method". +func logShutdownComplete(classMethod string) { + logging.Infof("%v:%v: shutdown complete", + classMethod, MSG_LOOP_MARKER) +} + //run starts the main loop for the indexer func (idx *indexer) run() { + const classMethod string = "Indexer::run" // for logging - go idx.listenWorkerMsgs() - go idx.listenAdminMsgs() + go idx.listenWorkerMsgs() // wrkrRecvCh + go idx.listenAdminMsgs() // adminRecvCh for { + var msg Message + var ok bool // needed as "msg, ok := <-" shadows msg when creating ok + var receiveTime time.Time // time msg was received + var channel string // name of channel msg came from + var forceLog bool // admin messages force logging of processing time + // internalRecvCh and internalAdminRecvCh chosen at random per Go official behavior select { - case msg, ok := <-idx.internalRecvCh: + case msg, ok = <-idx.internalRecvCh: if ok { + receiveTime = time.Now() + channel = "internalRecvCh" + forceLog = false idx.handleWorkerMsgs(msg) } - case msg, ok := <-idx.internalAdminRecvCh: + case msg, ok = <-idx.internalAdminRecvCh: if ok { + receiveTime = time.Now() + channel = "internalAdminRecvCh" + forceLog = true resp := idx.handleAdminMsgs(msg) idx.internalAdminRespCh <- resp } case <-idx.shutdownInitCh: - //send shutdown to all workers + logShutdownStart(classMethod) + //send shutdown to all workers idx.shutdownWorkers() //close the shutdown complete channel to indicate //all workers are shutdown close(idx.shutdownCompleteCh) - return + logShutdownComplete(classMethod) + return } + logProcessingTime(classMethod, msg, channel, time.Since(receiveTime), forceLog) } - } -//run starts the main loop for the indexer +// listenAdminMsgs is the message processing loop for the adminRecvCh channel. func (idx *indexer) listenAdminMsgs() { - - waitForStream := true + const classMethod string = "Indexer::listenAdminMsgs" // for logging for { + var msg Message + var ok bool // needed as "msg, ok := <-" shadows msg when creating ok + var receiveTime time.Time // time msg was received + select { - case msg, ok := <-idx.adminRecvCh: + case msg, ok = <-idx.adminRecvCh: if ok { + receiveTime = time.Now() + // internalAdminRecvCh size is 1. So it will blocked if the previous msg is being // processed. idx.internalAdminRecvCh <- msg resp := <-idx.internalAdminRespCh - if waitForStream { - // now that indexer has processed the message. Let's make sure that - // the stream request is finished before processing the next admin - // msg. This is done by acquiring a lock on the stream request for each - // bucket (on both streams). The lock is FIFO, so if this function - // can get a lock, it will mean that previous stream request would have - // been cleared. - - //create and build don't need to be checked - //create doesn't take stream lock. build only works - //on a fresh stream. - if msg.GetMsgType() == CLUST_MGR_DROP_INDEX_DDL || - msg.GetMsgType() == CLUST_MGR_PRUNE_PARTITION { - - if resp.GetMsgType() == MSG_SUCCESS_DROP { - streamId := resp.(*MsgSuccessDrop).GetStreamId() - keyspaceId := resp.(*MsgSuccessDrop).GetKeyspaceId() - - f := func(streamId common.StreamId, keyspaceId string) { - lock := idx.acquireStreamRequestLock(keyspaceId, streamId) - defer idx.releaseStreamRequestLock(lock) - idx.waitStreamRequestLock(lock) - } - - f(streamId, keyspaceId) + // now that indexer has processed the message. Let's make sure that + // the stream request is finished before processing the next admin + // msg. This is done by acquiring a lock on the stream request for each + // bucket (on both streams). The lock is FIFO, so if this function + // can get a lock, it will mean that previous stream request would have + // been cleared. + + //create and build don't need to be checked + //create doesn't take stream lock. build only works + //on a fresh stream. + if msg.GetMsgType() == CLUST_MGR_DROP_INDEX_DDL || + msg.GetMsgType() == CLUST_MGR_PRUNE_PARTITION { + + if resp.GetMsgType() == MSG_SUCCESS_DROP { + streamId := resp.(*MsgSuccessDrop).GetStreamId() + keyspaceId := resp.(*MsgSuccessDrop).GetKeyspaceId() + + f := func(streamId common.StreamId, keyspaceId string) { + lock := idx.acquireStreamRequestLock(keyspaceId, streamId) + defer idx.releaseStreamRequestLock(lock) + idx.waitStreamRequestLock(lock) } + + startStreamRequestLockTime := time.Now() + f(streamId, keyspaceId) + logStreamRequestLockTime(classMethod, msg, "adminRecvCh", time.Since(startStreamRequestLockTime)) } } } case <-idx.shutdownInitCh: + logShutdownComplete(classMethod) return } + logProcessingTime(classMethod, msg, "adminRecvCh", time.Since(receiveTime), true) } } @@ -964,15 +1042,22 @@ func (idx *indexer) getKeyspaceIdForAdminMsg(msg Message) []string { } } +// listenWorkerMsgs is the message processing loop for the wrkrRecvCh channel. func (idx *indexer) listenWorkerMsgs() { + const classMethod string = "Indexer::listenWorkerMsgs" // for logging //listen to worker messages for { + var msg Message + var ok bool // needed as "msg, ok := <-" shadows msg when creating ok + var receiveTime time.Time // time msg was received select { - case msg, ok := <-idx.wrkrRecvCh: + case msg, ok = <-idx.wrkrRecvCh: if ok { + receiveTime = time.Now() + //handle high priority messages switch msg.GetMsgType() { case MSG_ERROR: @@ -986,10 +1071,11 @@ func (idx *indexer) listenWorkerMsgs() { case <-idx.shutdownInitCh: //exit the loop + logShutdownComplete(classMethod) return } + logProcessingTime(classMethod, msg, "wrkrRecvCh", time.Since(receiveTime), false) } - } func (idx *indexer) handleWorkerMsgs(msg Message) { diff --git a/secondary/indexer/message.go b/secondary/indexer/message.go index 39bf113ac..5aaa11ea2 100644 --- a/secondary/indexer/message.go +++ b/secondary/indexer/message.go @@ -2375,12 +2375,23 @@ func (m MsgType) String() string { return "INDEXER_MERGE_PARTITION" case INDEXER_CANCEL_MERGE_PARTITION: return "INDEXER_CANCEL_MERGE_PARTITION" + case INDEXER_MTR_FAIL: + return "INDEXER_MTR_FAIL" case INDEXER_STORAGE_WARMUP_DONE: return "INDEXER_STORAGE_WARMUP_DONE" + case INDEXER_SECURITY_CHANGE: + return "INDEXER_SECURITY_CHANGE" + case INDEXER_RESET_INDEX_DONE: + return "INDEXER_RESET_INDEX_DONE" + case INDEXER_ACTIVE: + return "INDEXER_ACTIVE" case SCAN_COORD_SHUTDOWN: return "SCAN_COORD_SHUTDOWN" + case COMPACTION_MGR_SHUTDOWN: + return "COMPACTION_MGR_SHUTDOWN" + case UPDATE_INDEX_INSTANCE_MAP: return "UPDATE_INDEX_INSTANCE_MAP" case UPDATE_INDEX_PARTITION_MAP: @@ -2411,15 +2422,21 @@ func (m MsgType) String() string { case KV_SENDER_RESTART_VBUCKETS: return "KV_SENDER_RESTART_VBUCKETS" + case KV_SENDER_RESTART_VBUCKETS_RESPONSE: + return "KV_SENDER_RESTART_VBUCKETS_RESPONSE" case KV_SENDER_REPAIR_ENDPOINTS: return "KV_SENDER_REPAIR_ENDPOINTS" case KV_STREAM_REPAIR: return "KV_STREAM_REPAIR" + case MSG_SUCCESS_OPEN_STREAM: + return "MSG_SUCCESS_OPEN_STREAM" case CLUST_MGR_CREATE_INDEX_DDL: return "CLUST_MGR_CREATE_INDEX_DDL" case CLUST_MGR_BUILD_INDEX_DDL: return "CLUST_MGR_BUILD_INDEX_DDL" + case CLUST_MGR_BUILD_INDEX_DDL_RESPONSE: + return "CLUST_MGR_BUILD_INDEX_DDL_RESPONSE" case CLUST_MGR_DROP_INDEX_DDL: return "CLUST_MGR_DROP_INDEX_DDL" case CLUST_MGR_UPDATE_TOPOLOGY_FOR_INDEX: @@ -2474,11 +2491,21 @@ func (m MsgType) String() string { case CONFIG_SETTINGS_UPDATE: return "CONFIG_SETTINGS_UPDATE" - case STATS_RESET: - return "STATS_RESET" + case STORAGE_STATS: + return "STORAGE_STATS" + case SCAN_STATS: + return "SCAN_STATS" + case INDEX_PROGRESS_STATS: + return "INDEX_PROGRESS_STATS" + case INDEXER_STATS: + return "INDEXER_STATS" + case INDEX_STATS_DONE: + return "INDEX_STATS_DONE" case INDEX_STATS_BROADCAST: return "INDEX_STATS_BROADCAST" + case STATS_RESET: + return "STATS_RESET" case STATS_PERSISTER_START: return "STATS_PERSISTER_START" case STATS_PERSISTER_STOP: @@ -2490,6 +2517,9 @@ func (m MsgType) String() string { case STATS_READ_PERSISTED_STATS: return "STATS_READ_PERSISTED_STATS" + case INDEXER_DDL_IN_PROGRESS_RESPONSE: + return "INDEXER_DDL_IN_PROGRESS_RESPONSE" + default: return "UNKNOWN_MSG_TYPE" }