Skip to content

Commit

Permalink
MB-50770 (7.1.0 2179) Reduce retries under waitForScheduledIndex()
Browse files Browse the repository at this point in the history
cluster_info.go FetchForBucket() is only called by metadata_provider.go
checkValidKeyspace(), a private function inside waitForScheduledIndex().
This had a 300x RetryHelper inside FetchForBucket(), but the caller has
its own retry loop that terminates as soon as an ending condition is
detected. One expected ending condition is that the bucket, scope, or
collection has been dropped, so this should not retry hundreds of times.
Doing so floods query.log with retry messages from children.

The fix is to change the RetryHelper from 300 to 5 retries, as well as
to change the children (and related functions while we're at it) that
have their own internal loops that do 5 retries to only log one message
for all retries instead of one per retry.

Change-Id: I3419e8f72d363dcad7ae0ac297a8c05a8ea35057
  • Loading branch information
cherkauer-couchbase committed Feb 11, 2022
1 parent 22e385e commit 7f0af20
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 67 deletions.
39 changes: 13 additions & 26 deletions secondary/common/cluster_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,17 +234,16 @@ func (c *ClusterInfoCache) FetchNodeSvsData() (err error) {
return nil
}

func (c *ClusterInfoCache) FetchForBucket(bucketName string, getNodeSvs bool, getServerGroups bool, getTerseBucketInfo bool,
getBucketManifest bool) error {
// FetchForBucket loads a ClusterInfoCache with bucket-specific info.
func (c *ClusterInfoCache) FetchForBucket(bucketName string, getTerseBucketInfo bool,
getBucketManifest bool, getServerGroups bool, getNodeSvs bool) error {

fn := func(r int, err error) error {
if r > 0 {
fn := func(retryNum int, errPrior error) error {
if errPrior != nil {
logging.Infof("%vError occurred during cluster info update (%v) .. Retrying(%d)",
c.logPrefix, err, r)
c.logPrefix, errPrior, retryNum)
}

vretry := 0
retry:
cl, err := couchbase.Connect(c.url)
if err != nil {
return err
Expand All @@ -256,42 +255,30 @@ func (c *ClusterInfoCache) FetchForBucket(bucketName string, getNodeSvs bool, ge
return err
}

if getNodeSvs {
if err = c.FetchNodeSvsData(); err != nil {
return err
}
}

if getTerseBucketInfo {
if err := c.pool.RefreshBucket(bucketName, true); err != nil {
if err = c.pool.RefreshBucket(bucketName, true); err != nil {
return err
}
}

if getBucketManifest {
if err := c.pool.RefreshManifest(bucketName, true); err != nil {
if err = c.pool.RefreshManifest(bucketName, true); err != nil {
return err
}
}

if getServerGroups {
if err := c.FetchServerGroups(); err != nil {
if err = c.FetchServerGroups(); err != nil {
return err
}
}

if getNodeSvs {
if err = c.FetchNodeSvsData(); err != nil {
return err
}
if !c.validateCache(c.client.Info.IsIPv6) {
if vretry < CLUSTER_INFO_VALIDATION_RETRIES {
vretry++
logging.Infof("%vValidation Failed for cluster info.. Retrying(%d)",
c.logPrefix, vretry)
goto retry
} else {
logging.Infof("%vValidation Failed for cluster info.. %v",
c.logPrefix, c)
return ErrValidationFailed
}
return ErrValidationFailed
}
}

Expand Down
93 changes: 55 additions & 38 deletions secondary/dcp/pools.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,8 +250,8 @@ func (b *Bucket) getConnPool(i int) *connectionPool {

defer func() {
if r := recover(); r != nil {
logging.Errorf("bucket(%v) getConnPool crashed: %v\n", b.Name, r)
logging.Errorf("%s", logging.StackTrace())
logging.Errorf("Bucket::getConnPool: crashed for bucket %v: %v\n%v",
b.Name, r, logging.StackTrace())
}
}()

Expand All @@ -270,8 +270,8 @@ func (b *Bucket) getMasterNode(i int) string {

defer func() {
if r := recover(); r != nil {
logging.Errorf("bucket(%v) getMasterNode crashed: %v\n", b.Name, r)
logging.Errorf("%s", logging.StackTrace())
logging.Errorf("Bucket::getMasterNode: crashed for bucket %v: %v\n%v",
b.Name, r, logging.StackTrace())
}
}()

Expand Down Expand Up @@ -371,7 +371,7 @@ func queryRestAPIOnLocalhost(
responseBody := ioutil.NopCloser(bytes.NewBuffer(bodyBytes))
d := json.NewDecoder(responseBody)
if err = d.Decode(&out); err != nil {
logging.Errorf("queryRestAPI: Error while decoding the response from path: %s, response body: %s, err: %v", path, string(bodyBytes), err)
logging.Errorf("pools::queryRestAPIOnLocalhost: Error while decoding the response from path: %s, response body: %s, err: %v", path, string(bodyBytes), err)
return err
}
return nil
Expand All @@ -385,7 +385,7 @@ func (c *Client) RunObservePool(pool string, callb func(interface{}) error, canc
var pool Pool
var err error
if err = json.Unmarshal(bs, &pool); err != nil {
logging.Errorf("RunObservePool: Error while decoding the response from path: %s, response body: %s, err: %v", path, string(bs), err)
logging.Errorf("Client::RunObservePool: Error while decoding the response from path: %s, response body: %s, err: %v", path, string(bs), err)
}
return &pool, err
}
Expand All @@ -401,7 +401,7 @@ func (c *Client) RunObserveNodeServices(pool string, callb func(interface{}) err
var ps PoolServices
var err error
if err = json.Unmarshal(bs, &ps); err != nil {
logging.Errorf("RunObserveNodeServices: Error while decoding the response from path: %s, response body: %s, err: %v", path, string(bs), err)
logging.Errorf("Client::RunObserveNodeServices: Error while decoding the response from path: %s, response body: %s, err: %v", path, string(bs), err)
}
return &ps, err
}
Expand All @@ -416,7 +416,7 @@ func (c *Client) RunObserveCollectionManifestChanges(pool, bucket string, callb
var b Bucket
var err error
if err = json.Unmarshal(bs, &b); err != nil {
logging.Errorf("RunObserveCollectionManifestChanges: Error while decoding the response from path: %s, response body: %s, err: %v", path, string(bs), err)
logging.Errorf("Client::RunObserveCollectionManifestChanges: Error while decoding the response from path: %s, response body: %s, err: %v", path, string(bs), err)
}
return &b, err
}
Expand Down Expand Up @@ -734,37 +734,45 @@ func (c *Client) GetCollectionManifest(bucketn string) (retry bool,
}

func (c *Client) GetIndexScopeLimit(bucketn, scope string) (uint32, error) {
var err error
var manifest *collections.CollectionManifest
retryCount := 0
loop:
retry, manifest, err := c.GetCollectionManifest(bucketn)
if retry && retryCount <= 5 {
retryCount++
logging.Warnf("cluster_info: Out of sync for bucket %s. Retrying for GetIndexScopeLimit..", bucketn)
time.Sleep(500 * time.Millisecond)
goto loop
for retry := true; retry && retryCount <= 5; retryCount++ {
// retry true implies a potentially transient non-nil error err
retry, manifest, err = c.GetCollectionManifest(bucketn)
if retry {
time.Sleep(500 * time.Millisecond)
}
}
if retryCount > 0 {
logging.Warnf("Client::GetIndexScopeLimit: Retried %v times due to out of sync for"+
" bucket %s. Final err: %v", retryCount, bucketn, err)
}
if err != nil {
return 0, err
}
return manifest.GetIndexScopeLimit(scope), nil
}

// refreshBucket only calls terseBucket endpoint to fetch the bucket info.
// RefreshBucket only calls terseBucket endpoint to fetch the bucket info.
func (p *Pool) RefreshBucket(bucketn string, resetBucketMap bool) error {
if resetBucketMap {
p.BucketMap = make(map[string]Bucket)
}

var err error
var nb *Bucket
retryCount := 0
loop:
retry, nb, err := p.getTerseBucket(bucketn)
if retry {
retryCount++
if retryCount > 5 {
return err
for retry := true; retry && retryCount <= 5; retryCount++ {
// retry true implies a potentially transient non-nil error err
retry, nb, err = p.getTerseBucket(bucketn)
if retry {
time.Sleep(5 * time.Millisecond)
}
logging.Warnf("cluster_info: Out of sync for bucket %s. Retrying to getTerseBucket. retry count %v", bucketn, retryCount)
time.Sleep(5 * time.Millisecond)
goto loop
}
if retryCount > 0 {
logging.Warnf("Pool::RefreshBucket: Retried %v times due to out of sync for"+
" bucket %s. Final err: %v", retryCount, bucketn, err)
}
if err != nil {
return err
Expand All @@ -779,10 +787,11 @@ loop:
// Refresh calls pools/default/buckets to get data and list of buckets
// calls terseBucket and scopes endpoint for bucket info and manifest.
func (p *Pool) Refresh() (err error) {
const _Refresh = "Pool::Refresh:"
p.BucketMap = make(map[string]Bucket)
p.Manifest = make(map[string]*collections.CollectionManifest)

// Compute the minimum version among all the nodes
// Compute the minimum version among all the nodes. Manifest only exists in versions >= 7.
version := p.getVersion()

loop:
Expand All @@ -792,9 +801,10 @@ loop:
return err
}
for _, b := range buckets {
// retry true implies a potentially transient non-nil error err
retry, nb, err := p.getTerseBucket(b.Name)
if retry {
logging.Warnf("cluster_info: Out of sync for bucket %s. Retrying for getTerseBucket..", b.Name)
logging.Warnf("%v Out of sync for bucket %s. Retrying getTerseBucket", _Refresh, b.Name)
time.Sleep(5 * time.Millisecond)
goto loop
}
Expand All @@ -806,9 +816,10 @@ loop:
p.BucketMap[b.Name] = b

if version >= 7 {
// retry true implies a potentially transient non-nil error err
retry, manifest, err := p.getCollectionManifest(b.Name)
if retry {
logging.Warnf("cluster_info: Out of sync for bucket %s. Retrying for getBucketManifest..", b.Name)
logging.Warnf("%v Out of sync for bucket %s. Retrying getCollectionManifest", _Refresh, b.Name)
time.Sleep(5 * time.Millisecond)
goto loop
}
Expand All @@ -823,20 +834,26 @@ loop:
}

func (p *Pool) RefreshManifest(bucket string, resetManifestMap bool) error {
retryCount := 0
if resetManifestMap {
p.Manifest = make(map[string]*collections.CollectionManifest)
}
// Compute the minimum version among all the nodes

// Compute the minimum version among all the nodes. Manifest only exists in versions >= 7.
version := p.getVersion()
retry:
if version >= 7 {
retry, manifest, err := p.getCollectionManifest(bucket)
if retry && retryCount <= 5 {
retryCount++
logging.Warnf("cluster_info: Retrying to getBucketManifest for bucket %s", bucket)
time.Sleep(1 * time.Millisecond)
goto retry
var err error
var manifest *collections.CollectionManifest
retryCount := 0
for retry := true; retry && retryCount <= 5; retryCount++ {
// retry true implies a potentially transient non-nil error err
retry, manifest, err = p.getCollectionManifest(bucket)
if retry {
time.Sleep(1 * time.Millisecond)
}
}
if retryCount > 0 {
logging.Warnf("Pool::RefreshManifest: Retried %v times due to out of sync for"+
" bucket %s. Final err: %v", retryCount, bucket, err)
}
if err != nil {
return err
Expand Down Expand Up @@ -993,7 +1010,7 @@ func (b *Bucket) Close() {

func bucketFinalizer(b *Bucket) {
if b.connPools != nil {
logging.Warnf("Warning: Finalizing a bucket with active connections.")
logging.Warnf("pools::bucketFinalizer: Finalizing a bucket with active connections.")
}
}

Expand Down
10 changes: 7 additions & 3 deletions secondary/manager/client/metadata_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -1022,14 +1022,18 @@ func (o *MetadataProvider) waitForScheduledIndex(idxDefn *c.IndexDefn) error {
if err != nil {
return err
}
cinfo.SetMaxRetries(5) // avoid flooding query.log with retry messages if bucket was dropped

checkValidKeyspace := func() (bool, error) {
//
// Keyspace validation happens before posting shedule create token.
// Keyspace validation happens before posting schedule create token.
// Here, the purpose of keyspace validation is only to check for
// changes in keyspace if any. So, no need to retry.
// continued keyspace existence. If it no longer exists, waitForScheduledIndex
// can terminate.
//
err = cinfo.FetchForBucket(idxDefn.Bucket, false, false, true, true)

// Fetch bucket info in the ClusterInfoCache
err := cinfo.FetchForBucket(idxDefn.Bucket, true, true, false, false)
if err != nil {
return false, err
}
Expand Down

0 comments on commit 7f0af20

Please sign in to comment.