Skip to content

Commit

Permalink
MB-38119 Use clusterInfoClient in projector
Browse files Browse the repository at this point in the history
Change-Id: Ic82fab56d6e4469fa061ba0a2d9ab90446c46bc2
  • Loading branch information
varunv-cb committed Apr 27, 2020
1 parent 052a535 commit ce5323a
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 48 deletions.
44 changes: 11 additions & 33 deletions secondary/projector/feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -1711,23 +1711,11 @@ func (feed *Feed) getLocalKVAddrs(
pooln, bucketn string, opaque uint16) (string, error) {

prefix := feed.logPrefix
url, err := c.ClusterAuthUrl(feed.config["clusterAddr"].String())
if err != nil {
fmsg := "%v ##%x ClusterAuthUrl(): %v\n"
logging.Errorf(fmsg, prefix, opaque, err)
return "", projC.ErrorClusterInfo
}
cinfo, err := c.NewClusterInfoCache(url, pooln)
if err != nil {
fmsg := "%v ##%x ClusterInfoCache(`%v`): %v\n"
logging.Errorf(fmsg, prefix, opaque, bucketn, err)
return "", projC.ErrorClusterInfo
}
if err := cinfo.Fetch(); err != nil {
fmsg := "%v ##%x cinfo.Fetch(`%v`): %v\n"
logging.Errorf(fmsg, prefix, opaque, bucketn, err)
return "", projC.ErrorClusterInfo
}
// Fetch the clusterInfoCache from clusterInfoClient at projector
cinfo := feed.projector.cinfoClient.GetClusterInfoCache()
cinfo.RLock()
defer cinfo.RUnlock()

kvaddr, err := cinfo.GetLocalServiceAddress("kv")
if err != nil {
fmsg := "%v ##%x cinfo.GetLocalServiceAddress(`kv`): %v\n"
Expand All @@ -1741,22 +1729,12 @@ func (feed *Feed) getLocalVbuckets(
pooln, bucketn string, opaque uint16) ([]uint16, error) {

prefix := feed.logPrefix
// gather vbnos based on colocation policy.
var cinfo *c.ClusterInfoCache
url, err := c.ClusterAuthUrl(feed.config["clusterAddr"].String())
if err == nil {
cinfo, err = c.NewClusterInfoCache(url, pooln)
}
if err != nil {
fmsg := "%v ##%x ClusterInfoCache(`%v`): %v\n"
logging.Errorf(fmsg, prefix, opaque, bucketn, err)
return nil, projC.ErrorClusterInfo
}
if err := cinfo.Fetch(); err != nil {
fmsg := "%v ##%x cinfo.Fetch(`%v`): %v\n"
logging.Errorf(fmsg, prefix, opaque, bucketn, err)
return nil, projC.ErrorClusterInfo
}

// Fetch the clusterInfoCache from clusterInfoClient at projector
cinfo := feed.projector.cinfoClient.GetClusterInfoCache()
cinfo.RLock()
defer cinfo.RUnlock()

nodeID := cinfo.GetCurrentNode()
vbnos32, err := cinfo.GetVBuckets(nodeID, bucketn)
if err != nil {
Expand Down
25 changes: 10 additions & 15 deletions secondary/projector/projector.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ type Projector struct {
cpuProfFd *os.File
logPrefix string

cinfoClient *c.ClusterInfoClient

certFile string
keyFile string
reqch chan ap.Request
Expand Down Expand Up @@ -84,6 +86,11 @@ func NewProjector(maxvbs int, config c.Config, certFile string, keyFile string)
ef := config["projector.routerEndpointFactory"]
config["projector.routerEndpointFactory"] = ef

// Start cluster info client
cic, err := c.NewClusterInfoClient(p.clusterAddr, "default", config)
c.CrashOnError(err)
p.cinfoClient = cic

p.stats = NewProjectorStats()
p.statsMgr = NewStatsManager(p.statsCmdCh, p.statsStopCh, config)
p.UpdateStatsMgr(p.stats.Clone())
Expand Down Expand Up @@ -988,21 +995,9 @@ func (p *Projector) getNodeUUID() (string, error) {
var nodeUUID string
prefix := p.logPrefix
fn := func(r int, err error) error {
var cinfo *c.ClusterInfoCache
url, err := c.ClusterAuthUrl(p.clusterAddr)
if err == nil {
cinfo, err = c.NewClusterInfoCache(url, p.pooln)
}
if err != nil {
fmsg := "%v ClusterInfoCache(): %v\n"
logging.Errorf(fmsg, prefix, err)
return err
}
if err := cinfo.Fetch(); err != nil {
fmsg := "%v cinfo.Fetch(): %v\n"
logging.Errorf(fmsg, prefix, err)
return err
}
cinfo := p.cinfoClient.GetClusterInfoCache()
cinfo.RLock()
defer cinfo.RUnlock()

if nodeUUID = cinfo.GetLocalNodeUUID(); nodeUUID == "" {
fmsg := "%v cinfo.GetLocalNodeUUID(): %v\n"
Expand Down

0 comments on commit ce5323a

Please sign in to comment.