Skip to content

Commit

Permalink
Merge remote-tracking branch 'couchbase/unstable' into HEAD
Browse files Browse the repository at this point in the history
http: //ci2i-unstable.northscale.in/gsi-02.04.2022-03.39.pass.html
Change-Id: I38d075489ed19f5235c8bbc59bb0c46f23d7025e
  • Loading branch information
amithk committed Apr 2, 2022
2 parents 12e385c + 9f5c7f1 commit 24dde9b
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 61 deletions.
50 changes: 48 additions & 2 deletions secondary/common/cluster_info_lite.go
Expand Up @@ -453,6 +453,54 @@ func (bi *bucketInfo) setClusterURL(u string) {
bi.clusterURL = u
}

func (bi *bucketInfo) String() string {
return fmt.Sprintf("bucket: %+v cluster: %v", bi.bucket, bi.clusterURL)
}

func NewBucketInfo(cluster, pooln, bucketn string) (*bucketInfo, error) {

if strings.HasPrefix(cluster, "http") {
u, err := url.Parse(cluster)
if err != nil {
return nil, err
}
cluster = u.Host
}

ah := &CbAuthHandler{
Hostport: cluster,
Bucket: bucketn,
}

client, err := couchbase.ConnectWithAuth("http://"+cluster, ah)
if err != nil {
return nil, err
}
client.SetUserAgent("NewBucketInfo")

var bucket *couchbase.Bucket
retryCount := 0
terseBucketsBase := fmt.Sprintf("/pools/%v/b/", pooln)
for retry := true; retry && retryCount <= 5; retryCount++ {
retry, bucket, err = client.GetTerseBucket(terseBucketsBase, bucketn)
if retry {
time.Sleep(5 * time.Millisecond)
}
}
if retryCount > 1 {
logging.Warnf("NewBucketInfo: Retried %v times due to out of sync for"+
" bucket %s. Final err: %v", retryCount, bucketn, err)
}
if err != nil {
return nil, err
}

bi := newBucketInfo(bucket, client.BaseURL.Host)
bi.setClusterURL(cluster)

return bi, err
}

//
// PoolInfo
//
Expand Down Expand Up @@ -1999,7 +2047,6 @@ func (c *ClusterInfoCacheLiteClient) GetLocalNodeUUID() (string, error) {
return "", fmt.Errorf("no node has ThisNode set")
}


func (ni *NodesInfo) GetNodesByIds(nids []NodeId) (nodes []couchbase.Node) {
for _, nid := range nids {
nodes = append(nodes, ni.nodes[nid])
Expand All @@ -2014,7 +2061,6 @@ func (ni *NodesInfo) GetNodesByServiceType(srvc string) (nodes []couchbase.Node)
return ni.GetNodesByIds(nids)
}


// The output of this function comes from querying the /pools/default endpoint
// Thus the function returns list of nodes where "cluster membership of node is active".
// Which is essentially a confirmation of fact that a service is configured on a node and
Expand Down
63 changes: 34 additions & 29 deletions secondary/indexer/kv_sender.go
Expand Up @@ -61,7 +61,9 @@ type kvSender struct {
func NewKVSender(supvCmdch MsgChannel, supvRespch MsgChannel,
config c.Config) (KVSender, Message) {

useCInfoLite := config["use_cinfo_lite"].Bool()
// Disabled due to MB-51636
// useCInfoLite := config["use_cinfo_lite"].Bool()
useCInfoLite := false
cip, err := common.NewClusterInfoProvider(useCInfoLite, config["clusterAddr"].String(),
DEFAULT_POOL, "kvsender", config)
if err != nil {
Expand Down Expand Up @@ -1476,34 +1478,37 @@ func (k *kvSender) handleConfigUpdate(cmd Message) {
cfgUpdate := cmd.(*MsgConfigUpdate)

newConfig := cfgUpdate.GetConfig()
oldConfig := k.config
newUseCInfoLite := newConfig["use_cinfo_lite"].Bool()
oldUseCInfoLite := oldConfig["use_cinfo_lite"].Bool()

if oldUseCInfoLite != newUseCInfoLite {
logging.Infof("KVSender::handleConfigUpdate Updating ClusterInfoProvider in kvsender")

cip, err := common.NewClusterInfoProvider(newUseCInfoLite,
newConfig["clusterAddr"].String(), DEFAULT_POOL, "kvsender", newConfig)
if err != nil {
logging.Errorf("KVSender::handleConfigUpdate Unable to update ClusterInfoProvider in kvsender err: %v, use_cinfo_lite: old %v new %v",
err, oldUseCInfoLite, newUseCInfoLite)
common.CrashOnError(err)
}
cip.SetMaxRetries(MAX_CLUSTER_FETCH_RETRY)

k.cinfoProviderLock.Lock()
oldProvider := k.cinfoProvider
k.cinfoProvider = cip
k.cinfoProviderLock.Unlock()

if oldProvider != nil {
oldProvider.Close()
}

logging.Infof("KVSender::handleConfigUpdate Updated ClusterInfoProvider in Indexer use_cinfo_lite: old %v new %v",
oldUseCInfoLite, newUseCInfoLite)
}

// Disabled due to MB-51636

// oldConfig := k.config
// newUseCInfoLite := newConfig["use_cinfo_lite"].Bool()
// oldUseCInfoLite := oldConfig["use_cinfo_lite"].Bool()

// if oldUseCInfoLite != newUseCInfoLite {
// logging.Infof("KVSender::handleConfigUpdate Updating ClusterInfoProvider in kvsender")

// cip, err := common.NewClusterInfoProvider(newUseCInfoLite,
// newConfig["clusterAddr"].String(), DEFAULT_POOL, "kvsender", newConfig)
// if err != nil {
// logging.Errorf("KVSender::handleConfigUpdate Unable to update ClusterInfoProvider in kvsender err: %v, use_cinfo_lite: old %v new %v",
// err, oldUseCInfoLite, newUseCInfoLite)
// common.CrashOnError(err)
// }
// cip.SetMaxRetries(MAX_CLUSTER_FETCH_RETRY)

// k.cinfoProviderLock.Lock()
// oldProvider := k.cinfoProvider
// k.cinfoProvider = cip
// k.cinfoProviderLock.Unlock()

// if oldProvider != nil {
// oldProvider.Close()
// }

// logging.Infof("KVSender::handleConfigUpdate Updated ClusterInfoProvider in Indexer use_cinfo_lite: old %v new %v",
// oldUseCInfoLite, newUseCInfoLite)
// }

k.config = newConfig

Expand Down
34 changes: 4 additions & 30 deletions secondary/projector/feed.go
Expand Up @@ -1848,45 +1848,19 @@ func (feed *Feed) getLocalKVAddrs(
func (feed *Feed) getLocalVbuckets(pooln, bucketn string, opaque uint16) ([]uint16, error) {

prefix := feed.logPrefix
cluster := feed.cluster

feed.projector.cinfoProviderLock.RLock()
defer feed.projector.cinfoProviderLock.RUnlock()

binfo, err := feed.projector.cinfoProvider.GetBucketInfoProvider(bucketn)
binfo, err := common.NewBucketInfo(cluster, pooln, bucketn)
if err != nil {
fmsg := "%v ##%x cinfo.GetBucketInfoProvider(%s): %v\n"
logging.Warnf(fmsg, prefix, opaque, bucketn, err)
fmsg := "%v ##%x NewBucketInfo(`%v`, `%v`, `%v`): %v\n"
logging.Errorf(fmsg, prefix, opaque, cluster, pooln, bucketn, err)
return nil, projC.ErrorClusterInfo
}

err = binfo.FetchBucketInfo(bucketn)
if err != nil {
// If bucket is deleted GetVBuckets will fail. If err is transient vbmap is refreshed.
fmsg := "%v ##%x cinfo.FetchBucketInfo(%s): %v\n"
logging.Warnf(fmsg, prefix, opaque, bucketn, err)

// Force fetch cluster info cache incase it was not syncronized properly,
// so that next call to this method can succeed
err = binfo.FetchWithLock()
if err != nil {
return nil, projC.ErrorClusterInfo
}
}

binfo.RLock()
defer binfo.RUnlock()

vbnos, err := binfo.GetLocalVBuckets(bucketn)
if err != nil {
fmsg := "%v ##%x cinfo.GetLocalVBuckets(`%v`): %v\n"
logging.Errorf(fmsg, prefix, opaque, bucketn, err)

// Force fetch cluster info cache incase it was not syncronized properly,
// so that next call to this method can succeed
binfo.RUnlock()
binfo.FetchWithLock()
binfo.RLock()

return nil, projC.ErrorClusterInfo
}

Expand Down

0 comments on commit 24dde9b

Please sign in to comment.