Skip to content

Commit

Permalink
Merge remote-tracking branch 'couchbase/unstable' into HEAD
Browse files Browse the repository at this point in the history
Change-Id: Icb89844cdb6150bedee19a8934680169b5f99483
  • Loading branch information
deepkaran committed Apr 30, 2021
2 parents 17dbe2a + dd45317 commit ed003d3
Show file tree
Hide file tree
Showing 5 changed files with 312 additions and 60 deletions.
93 changes: 93 additions & 0 deletions secondary/common/cluster_info.go
Expand Up @@ -150,6 +150,51 @@ func (c *ClusterInfoCache) SetServicePorts(portMap map[string]string) {

}

func (c *ClusterInfoCache) Connect() (err error) {
c.client, err = couchbase.Connect(c.url)
if err != nil {
return err
}

c.client.SetUserAgent(c.userAgent)
return nil
}

// Note: This function does not fetch BucketMap and Manifest data in c.pool
func (c *ClusterInfoCache) FetchNodesData() (err error) {
c.pool, err = c.client.CallPoolURI(c.poolName)
if err != nil {
return err
}

c.updateNodesData()

found := false
for _, node := range c.nodes {
if node.ThisNode {
found = true
}
}

if !found {
return errors.New("Current node's cluster membership is not active")
}
return nil
}

func (c *ClusterInfoCache) FetchNodeSvsData() (err error) {
var poolServs couchbase.PoolServices

poolServs, err = c.client.GetPoolServices(c.poolName)
if err != nil {
return err
}

c.nodesvs = poolServs.NodesExt
c.buildEncryptPortMapping()
return nil
}

// TODO: In many places (e.g. lifecycle manager), cluster info cache
// refresh is required only for one bucket. It is sub-optimal to update
// the cluster info for all the buckets. Add a new method
Expand Down Expand Up @@ -378,6 +423,54 @@ func (c *ClusterInfoCache) FetchForPoolChange() error {
return rh.Run()
}

func (c *ClusterInfoCache) FetchNodesAndSvsInfoWithLock() (err error) {
c.Lock()
defer c.Unlock()

return c.FetchNodesAndSvsInfo()
}

func (c *ClusterInfoCache) FetchNodesAndSvsInfo() (err error) {
fn := func(r int, err error) error {
if r > 0 {
logging.Infof("%vError occured during nodes and nodesvs update (%v) .. Retrying(%d)",
c.logPrefix, err, r)
}

vretry := 0
retry:
if err = c.Connect(); err != nil {
return err
}

if err = c.FetchNodesData(); err != nil {
return err
}

if err = c.FetchNodeSvsData(); err != nil {
return err
}

if !c.validateCache(c.client.Info.IsIPv6) {
if vretry < CLUSTER_INFO_VALIDATION_RETRIES {
vretry++
logging.Infof("%vValidation Failed while updating nodes and nodesvs.. Retrying(%d)",
c.logPrefix, vretry)
goto retry
} else {
logging.Errorf("%vValidation Failed while updating nodes and nodesvs.. %v",
c.logPrefix, c)
return ErrValidationFailed
}
}

return nil
}

rh := NewRetryHelper(c.retries, time.Second*2, 1, fn)
return rh.Run()
}

func (c *ClusterInfoCache) FetchManifestInfoOnUIDChange(bucketName string, muid string) error {
c.Lock()
defer c.Unlock()
Expand Down
58 changes: 49 additions & 9 deletions secondary/indexer/cluster_manager_agent.go
Expand Up @@ -407,15 +407,38 @@ func (c *clustMgrAgent) handleGetGlobalTopology(cmd Message) {
defer metaIter.Close()

indexInstMap := make(common.IndexInstMap)
topoCache := make(map[string]map[string]map[string]*manager.IndexTopology)

var delTokens map[common.IndexDefnId]*mc.DeleteCommandToken
delTokens, err = mc.FetchIndexDefnToDeleteCommandTokensMap()
if err != nil {
logging.Warnf("ClustMgr:handleGetGlobalTopology: Error in FetchIndexDefnToDeleteCommandTokensMap %v", err)
}

var dropTokens map[common.IndexDefnId][]*mc.DropInstanceCommandToken
dropTokens, err = mc.FetchIndexDefnToDropInstanceCommandTokenMap()
if err != nil {
logging.Warnf("ClustMgr:handleGetGlobalTopology: Error in FetchIndexDefnToDropInstanceCommandTokenMap %v", err)
}

for _, defn, err := metaIter.Next(); err == nil; _, defn, err = metaIter.Next() {

var idxDefn common.IndexDefn
idxDefn = *defn

t, e := c.mgr.GetTopologyByCollection(idxDefn.Bucket, idxDefn.Scope, idxDefn.Collection)
if e != nil {
common.CrashOnError(e)
t := topoCache[idxDefn.Bucket][idxDefn.Scope][idxDefn.Collection]
if t == nil {
t, err = c.mgr.GetTopologyByCollection(idxDefn.Bucket, idxDefn.Scope, idxDefn.Collection)
if err != nil {
common.CrashOnError(err)
}
if _, ok := topoCache[idxDefn.Bucket]; !ok {
topoCache[idxDefn.Bucket] = make(map[string]map[string]*manager.IndexTopology)
}
if _, ok := topoCache[idxDefn.Bucket][idxDefn.Scope]; !ok {
topoCache[idxDefn.Bucket][idxDefn.Scope] = make(map[string]*manager.IndexTopology)
}
topoCache[idxDefn.Bucket][idxDefn.Scope][idxDefn.Collection] = t
}
if t == nil {
logging.Warnf("ClustMgr:handleGetGlobalTopology Index Instance Not "+
Expand Down Expand Up @@ -464,14 +487,31 @@ func (c *clustMgrAgent) handleGetGlobalTopology(cmd Message) {
}

if idxInst.State != common.INDEX_STATE_DELETED {
exist1, err := mc.DeleteCommandTokenExist(idxDefn.DefnId)
if err != nil {
logging.Warnf("Error when reading delete command token for defn %v", idxDefn.DefnId)
var exist1, exist2 bool
var err error

if delTokens != nil {
_, exist1 = delTokens[idxDefn.DefnId]
} else {
exist1, err = mc.DeleteCommandTokenExist(idxDefn.DefnId)
if err != nil {
logging.Warnf("Error when reading delete command token for defn %v", idxDefn.DefnId)
}
}

exist2, err := mc.DropInstanceCommandTokenExist(idxDefn.DefnId, idxInst.InstId)
if err != nil {
logging.Warnf("Error when reading delete command token for index (%v, %v)", idxDefn.DefnId, idxInst.InstId)
if dropTokens != nil {
dropTokenList := dropTokens[idxDefn.DefnId]
for _, dropToken := range dropTokenList {
if dropToken.InstId == idxDefn.InstId {
exist2 = true
break
}
}
} else {
exist2, err = mc.DropInstanceCommandTokenExist(idxDefn.DefnId, idxInst.InstId)
if err != nil {
logging.Warnf("Error when reading drop command token for index (%v, %v)", idxDefn.DefnId, idxInst.InstId)
}
}

if exist1 || exist2 {
Expand Down

0 comments on commit ed003d3

Please sign in to comment.