From 9f5c7f17d1d32dc8fc64ae0288fad10c1caf7d16 Mon Sep 17 00:00:00 2001 From: Sai Krishna Teja Kommaraju Date: Fri, 1 Apr 2022 18:39:34 +0530 Subject: [PATCH] MB-51636: Make vbucket fetch in kv_sender and projector synchronous * kv_sender and projector needs vbmap accurately when there is conn error * When there is a connection error kv_sender tries to connect to all the projectors owning the vbuckets which are marked as conn_err and sends vBucket shutdown and they are restarted after that. * If indexer gets two StreamBegins, without a stream end in between, it treats it as connection error as it will not be able to determine the owner of the vbucket. This can happen at the time of rebalance. * Indexer sends Shutdown for the new owner of the vbucket to correct the bookkeeping for the vBucket owner. * If the cache is stale at this point of time shutdown is sent to the old owner and shutdown logic works like a broadcast wherein indexer will send conn err vbuckets list to all the owners in the list. Each projector will shutdown the part that it owns. The old node will ignore the vbucket if moved during rebalance and that shutdown never happens. * Projector also needs accurate vbucket list as it getLocalBucket and selects the vbuckets that it owns from the list of vbuckets sent by the indexer. Change-Id: Ia9a16ea0472629183fa1a561886ce9209c7a60a8 --- secondary/common/cluster_info_lite.go | 50 ++++++++++++++++++++- secondary/indexer/kv_sender.go | 63 +++++++++++++++------------ secondary/projector/feed.go | 34 ++------------- 3 files changed, 86 insertions(+), 61 deletions(-) diff --git a/secondary/common/cluster_info_lite.go b/secondary/common/cluster_info_lite.go index e70077f57..252632573 100644 --- a/secondary/common/cluster_info_lite.go +++ b/secondary/common/cluster_info_lite.go @@ -453,6 +453,54 @@ func (bi *bucketInfo) setClusterURL(u string) { bi.clusterURL = u } +func (bi *bucketInfo) String() string { + return fmt.Sprintf("bucket: %+v cluster: %v", bi.bucket, bi.clusterURL) +} + +func NewBucketInfo(cluster, pooln, bucketn string) (*bucketInfo, error) { + + if strings.HasPrefix(cluster, "http") { + u, err := url.Parse(cluster) + if err != nil { + return nil, err + } + cluster = u.Host + } + + ah := &CbAuthHandler{ + Hostport: cluster, + Bucket: bucketn, + } + + client, err := couchbase.ConnectWithAuth("http://"+cluster, ah) + if err != nil { + return nil, err + } + client.SetUserAgent("NewBucketInfo") + + var bucket *couchbase.Bucket + retryCount := 0 + terseBucketsBase := fmt.Sprintf("/pools/%v/b/", pooln) + for retry := true; retry && retryCount <= 5; retryCount++ { + retry, bucket, err = client.GetTerseBucket(terseBucketsBase, bucketn) + if retry { + time.Sleep(5 * time.Millisecond) + } + } + if retryCount > 1 { + logging.Warnf("NewBucketInfo: Retried %v times due to out of sync for"+ + " bucket %s. Final err: %v", retryCount, bucketn, err) + } + if err != nil { + return nil, err + } + + bi := newBucketInfo(bucket, client.BaseURL.Host) + bi.setClusterURL(cluster) + + return bi, err +} + // // PoolInfo // @@ -1999,7 +2047,6 @@ func (c *ClusterInfoCacheLiteClient) GetLocalNodeUUID() (string, error) { return "", fmt.Errorf("no node has ThisNode set") } - func (ni *NodesInfo) GetNodesByIds(nids []NodeId) (nodes []couchbase.Node) { for _, nid := range nids { nodes = append(nodes, ni.nodes[nid]) @@ -2014,7 +2061,6 @@ func (ni *NodesInfo) GetNodesByServiceType(srvc string) (nodes []couchbase.Node) return ni.GetNodesByIds(nids) } - // The output of this function comes from querying the /pools/default endpoint // Thus the function returns list of nodes where "cluster membership of node is active". // Which is essentially a confirmation of fact that a service is configured on a node and diff --git a/secondary/indexer/kv_sender.go b/secondary/indexer/kv_sender.go index d8f499b94..52a60124a 100644 --- a/secondary/indexer/kv_sender.go +++ b/secondary/indexer/kv_sender.go @@ -61,7 +61,9 @@ type kvSender struct { func NewKVSender(supvCmdch MsgChannel, supvRespch MsgChannel, config c.Config) (KVSender, Message) { - useCInfoLite := config["use_cinfo_lite"].Bool() + // Disabled due to MB-51636 + // useCInfoLite := config["use_cinfo_lite"].Bool() + useCInfoLite := false cip, err := common.NewClusterInfoProvider(useCInfoLite, config["clusterAddr"].String(), DEFAULT_POOL, "kvsender", config) if err != nil { @@ -1476,34 +1478,37 @@ func (k *kvSender) handleConfigUpdate(cmd Message) { cfgUpdate := cmd.(*MsgConfigUpdate) newConfig := cfgUpdate.GetConfig() - oldConfig := k.config - newUseCInfoLite := newConfig["use_cinfo_lite"].Bool() - oldUseCInfoLite := oldConfig["use_cinfo_lite"].Bool() - - if oldUseCInfoLite != newUseCInfoLite { - logging.Infof("KVSender::handleConfigUpdate Updating ClusterInfoProvider in kvsender") - - cip, err := common.NewClusterInfoProvider(newUseCInfoLite, - newConfig["clusterAddr"].String(), DEFAULT_POOL, "kvsender", newConfig) - if err != nil { - logging.Errorf("KVSender::handleConfigUpdate Unable to update ClusterInfoProvider in kvsender err: %v, use_cinfo_lite: old %v new %v", - err, oldUseCInfoLite, newUseCInfoLite) - common.CrashOnError(err) - } - cip.SetMaxRetries(MAX_CLUSTER_FETCH_RETRY) - - k.cinfoProviderLock.Lock() - oldProvider := k.cinfoProvider - k.cinfoProvider = cip - k.cinfoProviderLock.Unlock() - - if oldProvider != nil { - oldProvider.Close() - } - - logging.Infof("KVSender::handleConfigUpdate Updated ClusterInfoProvider in Indexer use_cinfo_lite: old %v new %v", - oldUseCInfoLite, newUseCInfoLite) - } + + // Disabled due to MB-51636 + + // oldConfig := k.config + // newUseCInfoLite := newConfig["use_cinfo_lite"].Bool() + // oldUseCInfoLite := oldConfig["use_cinfo_lite"].Bool() + + // if oldUseCInfoLite != newUseCInfoLite { + // logging.Infof("KVSender::handleConfigUpdate Updating ClusterInfoProvider in kvsender") + + // cip, err := common.NewClusterInfoProvider(newUseCInfoLite, + // newConfig["clusterAddr"].String(), DEFAULT_POOL, "kvsender", newConfig) + // if err != nil { + // logging.Errorf("KVSender::handleConfigUpdate Unable to update ClusterInfoProvider in kvsender err: %v, use_cinfo_lite: old %v new %v", + // err, oldUseCInfoLite, newUseCInfoLite) + // common.CrashOnError(err) + // } + // cip.SetMaxRetries(MAX_CLUSTER_FETCH_RETRY) + + // k.cinfoProviderLock.Lock() + // oldProvider := k.cinfoProvider + // k.cinfoProvider = cip + // k.cinfoProviderLock.Unlock() + + // if oldProvider != nil { + // oldProvider.Close() + // } + + // logging.Infof("KVSender::handleConfigUpdate Updated ClusterInfoProvider in Indexer use_cinfo_lite: old %v new %v", + // oldUseCInfoLite, newUseCInfoLite) + // } k.config = newConfig diff --git a/secondary/projector/feed.go b/secondary/projector/feed.go index ffe18e2fb..8185f52e2 100644 --- a/secondary/projector/feed.go +++ b/secondary/projector/feed.go @@ -1848,45 +1848,19 @@ func (feed *Feed) getLocalKVAddrs( func (feed *Feed) getLocalVbuckets(pooln, bucketn string, opaque uint16) ([]uint16, error) { prefix := feed.logPrefix + cluster := feed.cluster - feed.projector.cinfoProviderLock.RLock() - defer feed.projector.cinfoProviderLock.RUnlock() - - binfo, err := feed.projector.cinfoProvider.GetBucketInfoProvider(bucketn) + binfo, err := common.NewBucketInfo(cluster, pooln, bucketn) if err != nil { - fmsg := "%v ##%x cinfo.GetBucketInfoProvider(%s): %v\n" - logging.Warnf(fmsg, prefix, opaque, bucketn, err) + fmsg := "%v ##%x NewBucketInfo(`%v`, `%v`, `%v`): %v\n" + logging.Errorf(fmsg, prefix, opaque, cluster, pooln, bucketn, err) return nil, projC.ErrorClusterInfo } - err = binfo.FetchBucketInfo(bucketn) - if err != nil { - // If bucket is deleted GetVBuckets will fail. If err is transient vbmap is refreshed. - fmsg := "%v ##%x cinfo.FetchBucketInfo(%s): %v\n" - logging.Warnf(fmsg, prefix, opaque, bucketn, err) - - // Force fetch cluster info cache incase it was not syncronized properly, - // so that next call to this method can succeed - err = binfo.FetchWithLock() - if err != nil { - return nil, projC.ErrorClusterInfo - } - } - - binfo.RLock() - defer binfo.RUnlock() - vbnos, err := binfo.GetLocalVBuckets(bucketn) if err != nil { fmsg := "%v ##%x cinfo.GetLocalVBuckets(`%v`): %v\n" logging.Errorf(fmsg, prefix, opaque, bucketn, err) - - // Force fetch cluster info cache incase it was not syncronized properly, - // so that next call to this method can succeed - binfo.RUnlock() - binfo.FetchWithLock() - binfo.RLock() - return nil, projC.ErrorClusterInfo }