Skip to content

Commit

Permalink
MB-46960 : Get bucketNames in poolsStreaming instead of saslBucketsSt…
Browse files Browse the repository at this point in the history
…reaming

Change-Id: I56fa11938f2893a27eefac7b411ee4626e7b66e7
  • Loading branch information
ksaikrishnateja committed Jun 29, 2021
1 parent 7ed4d4e commit 12a1b0a
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 76 deletions.
103 changes: 42 additions & 61 deletions secondary/common/services_notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ package common

import (
"errors"
"github.com/couchbase/indexing/secondary/dcp"
"github.com/couchbase/indexing/secondary/logging"
"strings"
"sync"
"time"

couchbase "github.com/couchbase/indexing/secondary/dcp"
"github.com/couchbase/indexing/secondary/logging"
)

const (
Expand Down Expand Up @@ -65,11 +66,21 @@ func (instance *serviceNotifierInstance) getNotifyCallback(t NotificationType) f
switch (msg).(type) {
case *couchbase.Bucket:
bucket := (msg).(*couchbase.Bucket)
logging.Infof("serviceChangeNotifier: received %s for bucket: %s", notifMsg, bucket.Name)
logging.Debugf("serviceChangeNotifier: received %s for bucket: %s", notifMsg, bucket.Name)
default:
errMsg := "Invalid msg type with CollectionManifestChangeNotification"
return errors.New(errMsg)
}
} else if t == PoolChangeNotification {
switch (msg).(type) {
case *couchbase.Pool:
pool := (msg).(*couchbase.Pool)
instance.bucketsChangeCallback(pool.BucketNames)
logging.Infof("serviceChangeNotifier: received %s", notifMsg)
default:
errMsg := "Invalid msg type with PoolChangeNotification"
return errors.New(errMsg)
}
} else {
logging.Infof("serviceChangeNotifier: received %s", notifMsg)
}
Expand All @@ -78,7 +89,7 @@ func (instance *serviceNotifierInstance) getNotifyCallback(t NotificationType) f
select {
case w <- notifMsg:
case <-time.After(notifyWaitTimeout):
logging.Warnf("servicesChangeNotifier: Consumer for %v took too long to read notification, making the consumer invalid", instance.DebugStr())
logging.Warnf("serviceChangeNotifier: Consumer for %v took too long to read notification, making the consumer invalid", instance.DebugStr())
close(w)
delete(instance.waiters, id)
}
Expand All @@ -89,66 +100,46 @@ func (instance *serviceNotifierInstance) getNotifyCallback(t NotificationType) f
return fn
}

// This method gets invoked whenever there is a change to "pools/default/saslBucketsStreaming" endpoint
// This method gets invoked whenever there is a change to poolsStreaming endpoint
// The serviceChangeNotifier keeps a track of buckets that is it monitoring. When this method is
// invoked, it would check the buckets that have been newly added into the cluster and starts monitoring
// the corresponding buckets streaming endpoints (for manifestUID changes). If a bucket is deleted, then
// the streaming endpoing would get closed with EOF error and the go-routine would terminate. So, there is
// the streaming endpoint would get closed with EOF error and the go-routine would terminate. So, there is
// no need to explicitly close the go-routine monitoring the streaming endpoint of a bucket
func (instance *serviceNotifierInstance) bucketsChangeCallback(t NotificationType) func(interface{}) error {
fn := func(msg interface{}) error {

instance.Lock()
defer instance.Unlock()

if !instance.valid {
return ErrNotifierInvalid
}

var saslBucket *couchbase.SaslBucket
logging.Infof("serviceChangeNotifier: received BucketsChangeNotification")
switch (msg).(type) {
case *couchbase.SaslBucket:
saslBucket = (msg).(*couchbase.SaslBucket)
default:
return errors.New("Invalid message type with BucketsChangeNotification")
}

// Remove all buckets that are present in instances and not in incoming message
for bucket, _ := range instance.buckets {
present := false
for _, b := range saslBucket.Buckets {
if b.Name == bucket {
present = true
break
}
}
if !present {
logging.Infof("serviceChangeNotifier: Removing the bucket: %v from book-keeping", bucket)
delete(instance.buckets, bucket)
func (instance *serviceNotifierInstance) bucketsChangeCallback(bucketNames []couchbase.BucketName) {

// Remove all buckets that are present in instances and not in incoming bucketNames
for bucket := range instance.buckets {
present := false
for _, b := range bucketNames {
if b.Name == bucket {
present = true
break
}
}

for _, bucket := range saslBucket.Buckets {
if _, ok := instance.buckets[bucket.Name]; !ok {
// Bucket is newly added to cluster
logging.Infof("serviceChangeNotifier: Starting to monitor the bucket streaming endpoint for bucket: %v", bucket.Name)
go instance.RunObserveCollectionManifestChanges(bucket.Name)
instance.buckets[bucket.Name] = true
}
if !present {
logging.Infof("serviceChangeNotifier: Removing the bucket: %v from book-keeping", bucket)
delete(instance.buckets, bucket)
}
}

return nil
for _, b := range bucketNames {
if _, ok := instance.buckets[b.Name]; !ok {
// Bucket is newly added to cluster
logging.Infof("serviceChangeNotifier: Starting to monitor the bucket streaming endpoint for bucket: %v", b.Name)
go instance.RunObserveCollectionManifestChanges(b.Name)
instance.buckets[b.Name] = true
}
}

return fn
return
}

func (instance *serviceNotifierInstance) RunPoolObserver() {
poolCallback := instance.getNotifyCallback(PoolChangeNotification)
err := instance.client.RunObservePool(instance.pool, poolCallback, nil)
if err != nil {
logging.Warnf("servicesChangeNotifier: Connection terminated for pool notifier instance of %s, %s (%v)", instance.DebugStr(), instance.pool, err)
logging.Warnf("serviceChangeNotifier: Connection terminated for pool notifier instance of %s, %s (%v)", instance.DebugStr(), instance.pool, err)
}
instance.cleanup()
}
Expand All @@ -157,16 +148,7 @@ func (instance *serviceNotifierInstance) RunServicesObserver() {
servicesCallback := instance.getNotifyCallback(ServiceChangeNotification)
err := instance.client.RunObserveNodeServices(instance.pool, servicesCallback, nil)
if err != nil {
logging.Warnf("servicesChangeNotifier: Connection terminated for services notifier instance of %s, %s (%v)", instance.DebugStr(), instance.pool, err)
}
instance.cleanup()
}

func (instance *serviceNotifierInstance) RunBucketsObserver() {
bucketCallback := instance.bucketsChangeCallback(BucketsChangeNotification)
err := instance.client.RunObserveBuckets(instance.pool, bucketCallback, nil)
if err != nil {
logging.Warnf("servicesChangeNotifier: Connection terminated for buckets notifier instance of %s, %s (%v)", instance.DebugStr(), instance.pool, err)
logging.Warnf("serviceChangeNotifier: Connection terminated for services notifier instance of %s, %s (%v)", instance.DebugStr(), instance.pool, err)
}
instance.cleanup()
}
Expand All @@ -175,7 +157,7 @@ func (instance *serviceNotifierInstance) RunObserveCollectionManifestChanges(buc
collectionChangeCallback := instance.getNotifyCallback(CollectionManifestChangeNotification)
err := instance.client.RunObserveCollectionManifestChanges(instance.pool, bucket, collectionChangeCallback, nil)
if err != nil {
logging.Warnf("servicesChangeNotifier: Connection terminated for collection manifest notifier instance of %s, %s, bucket: %s, (%v)", instance.DebugStr(), instance.pool, bucket, err)
logging.Warnf("serviceChangeNotifier: Connection terminated for collection manifest notifier instance of %s, %s, bucket: %s, (%v)", instance.DebugStr(), instance.pool, bucket, err)
}
instance.cleanup()
}
Expand Down Expand Up @@ -255,12 +237,11 @@ func NewServicesChangeNotifier(clusterUrl, pool string) (*ServicesChangeNotifier
waiters: make(map[int]chan Notification),
buckets: make(map[string]bool),
}
logging.Infof("servicesChangeNotifier: Creating new notifier instance for %s, %s", instance.DebugStr(), pool)
logging.Infof("serviceChangeNotifier: Creating new notifier instance for %s, %s", instance.DebugStr(), pool)

singletonServicesContainer.notifiers[id] = instance
go instance.RunPoolObserver()
go instance.RunServicesObserver()
go instance.RunBucketsObserver()
}

notifier := singletonServicesContainer.notifiers[id]
Expand Down
21 changes: 6 additions & 15 deletions secondary/dcp/pools.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@ type Node struct {
AddressFamily string `json:"addressFamily,omitempty"`
}

type BucketName struct {
Name string `json:"bucketName"`
UUID string `json:"uuid"`
}

// A Pool of nodes and buckets.
type Pool struct {
BucketMap map[string]Bucket
Expand All @@ -97,6 +102,7 @@ type Pool struct {

BucketURL map[string]string `json:"buckets"`
ServerGroupsUri string `json:"serverGroupsUri"`
BucketNames []BucketName `json:"bucketNames"`

client Client
}
Expand Down Expand Up @@ -381,21 +387,6 @@ func (c *Client) RunObserveNodeServices(pool string, callb func(interface{}) err
return c.runObserveStreamingEndpoint(path, decoder, callb, cancel)
}

func (c *Client) RunObserveBuckets(pool string, callb func(interface{}) error, cancel chan bool) error {

path := "/pools/" + pool + "/saslBucketsStreaming"
decoder := func(bs []byte) (interface{}, error) {
var buckets SaslBucket
var err error
if err = json.Unmarshal(bs, &buckets); err != nil {
logging.Errorf("RunObserveBuckets: Error while decoding the response from path: %s, response body: %s, err: %v", path, string(bs), err)
}
return &buckets, err
}

return c.runObserveStreamingEndpoint(path, decoder, callb, cancel)
}

func (c *Client) RunObserveCollectionManifestChanges(pool, bucket string, callb func(interface{}) error, cancel chan bool) error {

path := "/pools/" + pool + "/bs/" + bucket
Expand Down

0 comments on commit 12a1b0a

Please sign in to comment.