Skip to content

Commit

Permalink
MB-43395: Cluster info cache refresh optimisations
Browse files Browse the repository at this point in the history
Change-Id: I469f671ae0c5d487a24ebe0e055b2a6115e53143
Reviewed-on: http://review.couchbase.org/c/eventing/+/146974
Well-Formed: Build Bot <build@couchbase.com>
Reviewed-by: CI Bot
Reviewed-by: Jeelan Basha Poola <jeelan.poola@couchbase.com>
Tested-by: <ankit.prabhu@couchbase.com>
  • Loading branch information
AnkitPrabhu committed Mar 2, 2021
1 parent 42c58e9 commit 54ff6d1
Show file tree
Hide file tree
Showing 7 changed files with 242 additions and 65 deletions.
106 changes: 92 additions & 14 deletions dcp/pools.go
Expand Up @@ -17,6 +17,7 @@ import (
"sort"
"strings"
"sync/atomic"
"time"
"unsafe"

"github.com/couchbase/eventing/dcp/transport/client"
Expand Down Expand Up @@ -193,6 +194,14 @@ func (b Bucket) getConnPools() []*connectionPool {
return nil
}

func (b *Bucket) GetBucketURLVersionHash() (string, error) {
return b.pool.GetBucketURLVersionHash()
}

func (b *Bucket) SetBucketUri(nUri string) {
b.pool.BucketURL["uri"] = nUri
}

func (b *Bucket) replaceConnPools(with []*connectionPool) {
for {
old := atomic.LoadPointer(&b.connPools)
Expand Down Expand Up @@ -530,7 +539,14 @@ func GetBucketList(baseU string) (bInfo []BucketInfo, err error) {
func (b *Bucket) Refresh() error {
pool := b.pool
tmpb := &Bucket{}
err := pool.client.parseURLResponse(b.URI, tmpb)

// Unescape the b.URI as it pool.client.parseURLResponse will again escape it.
bucketURI, err1 := url.PathUnescape(b.URI)
if err1 != nil {
return fmt.Errorf("Malformed bucket URI path %v, error %v", b.URI, err1)
}

err := pool.client.parseURLResponse(bucketURI, tmpb)
if err != nil {
return err
}
Expand All @@ -539,6 +555,18 @@ func (b *Bucket) Refresh() error {
return nil
}

func (b *Bucket) RefreshWithTerseBucket() error {
pool := b.pool

_, tmpb, err := pool.getTerseBucket(b.Name)
if err != nil {
return err
}

b.init(tmpb)
return nil
}

func (b *Bucket) init(nb *Bucket) {
connHost, _, _ := net.SplitHostPort(b.pool.client.BaseURL.Host)
for i := range nb.NodesJSON {
Expand All @@ -557,8 +585,46 @@ func (b *Bucket) init(nb *Bucket) {
atomic.StorePointer(&b.nodeList, unsafe.Pointer(&nb.NodesJSON))
}

func (p *Pool) getTerseBucket(bucketn string) (bool, *Bucket, error) {
nb := &Bucket{}
err := p.client.parseURLResponse(p.BucketURL["terseBucketsBase"]+bucketn, nb)
if err != nil {
// bucket list is out of sync with cluster bucket list
// bucket might have got deleted.
if strings.Contains(err.Error(), "HTTP error 404") {
return true, nil, err
}
return false, nil, err
}
return false, nb, nil
}

// refreshBucket only calls terseBucket endpoint to fetch the bucket info.
func (p *Pool) refreshBucket(bucketn string) error {
retryCount := 0
loop:
retry, nb, err := p.getTerseBucket(bucketn)
if retry {
retryCount++
if retryCount > 5 {
return err
}
logging.Warnf("cluster_info: Out of sync for bucket %s. Retrying to getTerseBucket. retry count %v", bucketn, retryCount)
time.Sleep(5 * time.Millisecond)
goto loop
}
if err != nil {
return err
}
nb.pool = p
nb.init(nb)
p.BucketMap[nb.Name] = *nb

return nil
}

func (p *Pool) refresh() (err error) {
p.BucketMap = make(map[string]Bucket)
bucketMap := make(map[string]Bucket)

loop:
buckets := []Bucket{}
Expand All @@ -567,29 +633,41 @@ loop:
return err
}
for _, b := range buckets {
nb := &Bucket{}
err = p.client.parseURLResponse(p.BucketURL["terseBucketsBase"]+b.Name, nb)
retry, nb, err := p.getTerseBucket(b.Name)
if retry {
logging.Warnf("cluster_info: Out of sync for bucket %s. Retrying..", b.Name)
goto loop
}
if err != nil {
// bucket list is out of sync with cluster bucket list
// bucket might have got deleted.
if strings.Contains(err.Error(), "HTTP error 404") {
logging.Warnf("cluster_info: Out of sync for bucket %s. Retrying..", b.Name)
goto loop
}
return err
}
b.pool = p
b.init(nb)
p.BucketMap[b.Name] = b
nb.pool = p
nb.init(nb)
bucketMap[nb.Name] = *nb
}

p.BucketMap = bucketMap
return nil
}

func (p *Pool) GetServerGroups() (groups ServerGroups, err error) {

err = p.client.parseURLResponse(p.ServerGroupsUri, &groups)
return
}

// GetBucketURLVersionHash will prase the p.BucketURI and extract version hash from it
// Parses /pools/default/buckets?v=<$ver>&uuid=<$uid> and returns $ver
func (p *Pool) GetBucketURLVersionHash() (string, error) {
b := p.BucketURL["uri"]
u, err := url.Parse(b)
if err != nil {
return "", fmt.Errorf("Unable to parse BucketURL: %v in PoolChangeNotification", b)
}
m, err := url.ParseQuery(u.RawQuery)
if err != nil {
return "", fmt.Errorf("Unable to extract version hash from BucketURL: %v in PoolChangeNotification", b)
}
return m["v"][0], nil
}

// GetPool gets a pool from within the couchbase cluster (usually
Expand Down
2 changes: 1 addition & 1 deletion producer/producer.go
Expand Up @@ -124,7 +124,7 @@ func (p *Producer) Serve() {
}
p.seqsNoProcessedRWMutex.Unlock()

go p.pollForDeletedVbs()
//go p.pollForDeletedVbs()

p.appLogWriter, err = openAppLog(p.appLogPath, 0640, p.appLogMaxSize, p.appLogMaxFiles)
if err != nil {
Expand Down
7 changes: 5 additions & 2 deletions service_manager/http_handlers.go
Expand Up @@ -2300,13 +2300,16 @@ func (m *ServiceMgr) getKVNodesAddresses(w http.ResponseWriter, r *http.Request)
w.Header().Set("Content-Type", "application/json")

nsServer := net.JoinHostPort(util.Localhost(), m.restPort)
clusterInfo, err := util.FetchNewClusterInfoCache(nsServer)
cic, err := util.FetchClusterInfoClient(nsServer)
if err != nil {
logging.Errorf("%s Failed to get cluster info cache, err : %v", logPrefix, err)
return
}

kvNodes, err := clusterInfo.GetAddressOfActiveKVNodes()
cinfo := cic.GetClusterInfoCache()
cinfo.RLock()
kvNodes, err := cinfo.GetAddressOfActiveKVNodes()
cinfo.RUnlock()
if err != nil {
logging.Errorf("%s Failed to get KV nodes addresses, err : %v", logPrefix, err)
return
Expand Down
3 changes: 3 additions & 0 deletions supervisor/defs.go
@@ -1,6 +1,7 @@
package supervisor

import (
"errors"
"sync"
"time"

Expand Down Expand Up @@ -45,6 +46,8 @@ const (
//TODO: move it to common package
const bucketOpRetryInterval = time.Duration(1000) * time.Millisecond

var NoBucket = errors.New("Bucket not found")

const (
supCmdType int8 = iota
cmdAppDelete
Expand Down
52 changes: 23 additions & 29 deletions supervisor/super_supervisor.go
Expand Up @@ -230,26 +230,26 @@ func (s *SuperSupervisor) SettingsChangeCallback(path string, value []byte, rev

switch processingStatus {
case true:
sourceNodeCount, metaNodeCount, err := s.getSourceAndMetaBucketNodeCount(appName)
if err != nil {
logging.Errorf("%s [%d] getSourceAndMetaBucketNodeCount failed for Function: %s runningProducer: %v",
logPrefix, s.runningFnsCount(), appName, s.runningFns()[appName])
return nil
}
if sourceNodeCount < 1 || metaNodeCount < 1 {
util.Retry(util.NewExponentialBackoff(), &s.retryCount, undeployFunctionCallback, s, appName)
s.appRWMutex.Lock()
s.appDeploymentStatus[appName] = false
s.appProcessingStatus[appName] = false
s.appRWMutex.Unlock()
logging.Errorf("%s [%d] Source bucket or metadata bucket is deleted, Function: %s is undeployed",
logPrefix, s.runningFnsCount(), appName)
return nil
}
logging.Infof("%s [%d] Function: %s begin deployment process", logPrefix, s.runningFnsCount(), appName)
state := s.GetAppState(appName)

if state == common.AppStateUndeployed || state == common.AppStatePaused {
sourceNodeCount, metaNodeCount, err := s.getSourceAndMetaBucketNodeCount(appName)
if err != nil {
logging.Errorf("%s [%d] getSourceAndMetaBucketNodeCount failed for Function: %s runningProducer: %v",
logPrefix, s.runningFnsCount(), appName, s.runningFns()[appName])
return nil
}
if sourceNodeCount < 1 || metaNodeCount < 1 {
util.Retry(util.NewExponentialBackoff(), &s.retryCount, undeployFunctionCallback, s, appName)
s.appRWMutex.Lock()
s.appDeploymentStatus[appName] = false
s.appProcessingStatus[appName] = false
s.appRWMutex.Unlock()
logging.Errorf("%s [%d] Source bucket or metadata bucket is deleted, Function: %s is undeployed",
logPrefix, s.runningFnsCount(), appName)
return nil
}

s.appListRWMutex.Lock()
if _, ok := s.bootstrappingApps[appName]; ok {
Expand All @@ -271,11 +271,6 @@ func (s *SuperSupervisor) SettingsChangeCallback(path string, value []byte, rev
resumed := false
if state == common.AppStatePaused {
if p, ok := s.runningFns()[appName]; ok {
err = s.WatchBucket(p.SourceBucket(), appName)
if err != nil {
return err
}

p.ResumeProducer()
p.NotifySupervisor()
resumed = true
Expand Down Expand Up @@ -342,7 +337,6 @@ func (s *SuperSupervisor) SettingsChangeCallback(path string, value []byte, rev

p.PauseProducer()
p.NotifySupervisor()
s.UnwatchBucket(p.SourceBucket(), appName)
logging.Infof("%s [%d] Function: %s Cleaned up running Eventing.Producer instance", logPrefix, s.runningFnsCount(), appName)

}
Expand Down Expand Up @@ -461,16 +455,16 @@ func (s *SuperSupervisor) TopologyChangeNotifCallback(path string, value []byte,
logging.Infof("%s [%d] Function: %s deployment_status: %t processing_status: %t runningProducer: %v",
logPrefix, s.runningFnsCount(), appName, deploymentStatus, processingStatus, s.runningFns()[appName])

sourceNodeCount, metaNodeCount, err := s.getSourceAndMetaBucketNodeCount(appName)
if err != nil {
logging.Errorf("%s [%d] getSourceAndMetaBucketNodeCount failed for Function: %s runningProducer: %v",
logPrefix, s.runningFnsCount(), appName, s.runningFns()[appName])
continue
}

if _, ok := s.runningFns()[appName]; !ok {

if deploymentStatus && processingStatus {
sourceNodeCount, metaNodeCount, err := s.getSourceAndMetaBucketNodeCount(appName)
if err != nil {
logging.Errorf("%s [%d] getSourceAndMetaBucketNodeCount failed for Function: %s runningProducer: %v",
logPrefix, s.runningFnsCount(), appName, s.runningFns()[appName])
continue
}

if sourceNodeCount < 1 || metaNodeCount < 1 {
util.Retry(util.NewExponentialBackoff(), &s.retryCount, undeployFunctionCallback, s, appName)
logging.Errorf("%s [%d] Source bucket or metadata bucket is deleted, Function: %s is undeployed",
Expand Down

0 comments on commit 54ff6d1

Please sign in to comment.