Skip to content

Commit

Permalink
MB-56318 Ensure bucket updater is stopped on bucket close
Browse files Browse the repository at this point in the history
Have streaming function directly respond with the need to exit the
updater rather than indicating this via the StopUpdater function.  This
will ensure that even if a new updater has been kicked off, the existing
updater will exit.

Ensure bucket close stops updater and that updater is stopped when pool
is closed.

Don't use loadNamespace as it duplicates the namespace and buckets
(which can lead to updaters leaking).

Also enhanced bucket updater messages so we can more easily tie messages
to an individual bucket object instance.

Change-Id: I7fde24040fd07be9d44b8bb27ca01293fd9fa896
Reviewed-on: https://review.couchbase.org/c/query/+/189285
Reviewed-by: Sitaram Vemulapalli <sitaram.vemulapalli@couchbase.com>
Reviewed-by: Marco Greco <marco.greco@couchbase.com>
Well-Formed: Restriction Checker
Tested-by: Donald Haggart <donald.haggart@couchbase.com>
  • Loading branch information
dhaggart committed Apr 6, 2023
1 parent ade030e commit 8abd741
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 35 deletions.
15 changes: 10 additions & 5 deletions datastore/couchbase/couchbase.go
Expand Up @@ -1414,8 +1414,9 @@ func (p *namespace) KeyspaceDeleteCallback(name string, err error) {
}

// Called by primitives/couchbase if a configured keyspace is updated
func (p *namespace) KeyspaceUpdateCallback(bucket *cb.Bucket) {
func (p *namespace) KeyspaceUpdateCallback(bucket *cb.Bucket) bool {

ret := true
checkSysBucket := false

p.lock.Lock()
Expand All @@ -1426,9 +1427,11 @@ func (p *namespace) KeyspaceUpdateCallback(bucket *cb.Bucket) {
uid, _ := strconv.ParseUint(bucket.CollectionsManifestUid, 16, 64)
if ks.cbKeyspace.collectionsManifestUid != uid {
if ks.cbKeyspace.collectionsManifestUid == _INVALID_MANIFEST_UID {
logging.Infof("Bucket updater: received first manifest id %v for bucket %v", uid, bucket.Name)
logging.Infof("[%p] Bucket updater: received first manifest id %v for bucket %v",
ks.cbKeyspace.cbbucket, uid, bucket.Name)
} else {
logging.Infof("Bucket updater: switching manifest id from %v to %v for bucket %v", ks.cbKeyspace.collectionsManifestUid, uid, bucket.Name)
logging.Infof("[%p] Bucket updater: switching manifest id from %v to %v for bucket %v",
ks.cbKeyspace.cbbucket, ks.cbKeyspace.collectionsManifestUid, uid, bucket.Name)
}
ks.cbKeyspace.flags |= _NEEDS_MANIFEST
ks.cbKeyspace.newCollectionsManifestUid = uid
Expand All @@ -1439,11 +1442,11 @@ func (p *namespace) KeyspaceUpdateCallback(bucket *cb.Bucket) {

// the KV nodes list has changed, force a refresh on next use
if ks.cbKeyspace.cbbucket.ChangedVBServerMap(&bucket.VBSMJson) {
logging.Infof("Bucket updater: vbMap changed for bucket %v", bucket.Name)
logging.Infof("[%p] Bucket updater: vbMap changed for bucket %v", ks.cbKeyspace.cbbucket, bucket.Name)
ks.cbKeyspace.flags |= _NEEDS_REFRESH

// bucket will be reloaded, we don't need an updater anymore
ks.cbKeyspace.cbbucket.StopUpdater()
ret = false
}
ks.cbKeyspace.Unlock()
} else {
Expand All @@ -1455,6 +1458,8 @@ func (p *namespace) KeyspaceUpdateCallback(bucket *cb.Bucket) {
if checkSysBucket {
chkSysBucket()
}

return ret
}

func (b *keyspace) NamespaceId() string {
Expand Down
5 changes: 3 additions & 2 deletions datastore/couchbase/system_keyspace_ee.go
Expand Up @@ -32,10 +32,11 @@ const (
)

func (s *store) CreateSystemCBOStats(requestId string) errors.Error {
defaultPool, er := loadNamespace(s, "default")
dPool, er := s.NamespaceByName("default") // so we're using the cached namespace always
if er != nil {
return er
}
defaultPool := dPool.(*namespace)

// create/get system bucket/scope/collection
sysBucket, er := defaultPool.keyspaceByName(_N1QL_SYSTEM_BUCKET)
Expand Down Expand Up @@ -236,7 +237,7 @@ func (s *store) CreateSystemCBOStats(requestId string) errors.Error {
}

func (s *store) HasSystemCBOStats() (bool, errors.Error) {
defaultPool, er := loadNamespace(s, "default")
defaultPool, er := s.NamespaceByName("default") // so we're using the cached namespace always
if er != nil {
return false, er
}
Expand Down
11 changes: 8 additions & 3 deletions primitives/couchbase/ns_server.go
Expand Up @@ -1248,7 +1248,6 @@ func (c *Client) GetPoolServices(name string) (ps PoolServices, err error) {
// may have open.
func (b *Bucket) Close() {
b.Lock()
defer b.Unlock()
if b.connPools != nil {
for _, c := range b.getConnPools(true /* already locked */) {
if c != nil {
Expand All @@ -1257,14 +1256,18 @@ func (b *Bucket) Close() {
}
b.connPools = nil
}
if b.updater != nil {
b.updater.Close()
b.updater = nil
}
b.Unlock()
}

func (b *Bucket) StopUpdater() {
b.Lock()
if b.updater != nil {
bu := b.updater
b.updater.Close()
b.updater = nil
bu.Close()
}
b.Unlock()
}
Expand Down Expand Up @@ -1322,6 +1325,8 @@ func (p *Pool) Close() {
bucket.Unlock()
if needClose {
bucket.Close()
} else {
bucket.StopUpdater()
}
}
}
Expand Down
56 changes: 31 additions & 25 deletions primitives/couchbase/streaming.go
Expand Up @@ -157,7 +157,7 @@ const DISCONNECT_PERIOD = 120 * time.Second
const STREAM_RETRY_PERIOD = 100 * time.Millisecond

type NotifyFn func(bucket string, err error)
type StreamingFn func(bucket *Bucket)
type StreamingFn func(bucket *Bucket) bool

// Use TCP keepalive to detect half close sockets
var updaterTransport http.RoundTripper = &http.Transport{
Expand Down Expand Up @@ -198,9 +198,8 @@ func (b *Bucket) RunBucketUpdater2(streamingFn StreamingFn, notify NotifyFn) {

b.Lock()
if b.updater != nil {
bu := b.updater
b.updater.Close()
b.updater = nil
bu.Close()
}
b.Unlock()
go func() {
Expand All @@ -218,7 +217,7 @@ func (b *Bucket) RunBucketUpdater2(streamingFn StreamingFn, notify NotifyFn) {
delete(p.BucketMap, name)
p.Unlock()
}
logging.Errorf("Bucket Updater exited with err %v", err)
logging.Errorf("[%p] Bucket Updater exited with err %v", b, err)
}
}()
}
Expand Down Expand Up @@ -266,13 +265,13 @@ func (b *Bucket) UpdateBucket2(streamingFn StreamingFn) error {
}

streamUrl := fmt.Sprintf("%s/pools/default/bucketsStreaming/%s", b.pool.client.BaseURL, uriAdj(b.GetName()))
logging.Infof("Bucket updater: Trying with %s", streamUrl)
logging.Infof("[%p] Bucket updater: [%p,%d] Trying with %s", b, updater, failures, streamUrl)
req, err := http.NewRequest("GET", streamUrl, nil)
if err != nil {
if isConnError(err) {
failures++
if failures < MAX_RETRY_COUNT {
logging.Infof("Bucket updater : %v (Retrying %v)", err, failures)
logging.Infof("[%p] Bucket updater : %v (Retrying %v)", b, err, failures)
time.Sleep(time.Duration(failures) * STREAM_RETRY_PERIOD)
} else {
returnErr = errors.NewBucketUpdaterStreamingError(err)
Expand All @@ -295,7 +294,7 @@ func (b *Bucket) UpdateBucket2(streamingFn StreamingFn) error {
if isConnError(err) {
failures++
if failures < MAX_RETRY_COUNT {
logging.Infof("Bucket updater : %v (Retrying %v)", err, failures)
logging.Infof("[%p] Bucket updater : %v (Retrying %v)", b, err, failures)
time.Sleep(time.Duration(failures) * STREAM_RETRY_PERIOD)
} else {
returnErr = errors.NewBucketUpdaterStreamingError(err)
Expand All @@ -307,45 +306,50 @@ func (b *Bucket) UpdateBucket2(streamingFn StreamingFn) error {

if res.StatusCode != 200 {
bod, _ := ioutil.ReadAll(io.LimitReader(res.Body, 512))
logging.Errorf("Bucket updater: Failed to connect to host, unexpected status code: %v. Body %s", res.StatusCode, bod)
logging.Errorf("[%p] Bucket updater: Failed to connect to host, unexpected status code: %v. Body %s",
b, res.StatusCode, bod)
res.Body.Close()
returnErr = errors.NewBucketUpdaterFailedToConnectToHost(res.StatusCode, bod)
failures++
continue
}
b.Lock()
if b.updater == updater {
b.updater = res.Body
updater = b.updater
} else {
if b.updater != updater {
// another updater is running and we should exit cleanly
b.Unlock()
res.Body.Close()
logging.Debugf("Bucket updater: New updater found for bucket: %v", b.GetName())
logging.Debugf("[%p] Bucket updater: New updater found for bucket: %v", b, b.GetName())
return nil
} else if b.closed {
b.Unlock()
res.Body.Close()
logging.Debugf("[%p] Bucket updater: Bucket closed: %v", b, b.GetName())
return nil
}
b.updater = res.Body
updater = b.updater
b.Unlock()

dec := json.NewDecoder(res.Body)

tmpb := &Bucket{}
for {
b.Lock()
bu := b.updater
b.Unlock()
if bu != updater {
b.RLock()
terminate := b.updater != updater || b.closed
b.RUnlock()
if terminate {
res.Body.Close()
logging.Debugf("Bucket updater: Detected new updater for bucket: %v", b.GetName())
logging.Debugf("[%p] Bucket updater: Stopping for bucket: %v", b, b.GetName())
return nil
}

err := dec.Decode(&tmpb)

b.Lock()
bu = b.updater
b.Unlock()
if bu != updater {
logging.Debugf("Bucket updater: Detected new updater for bucket: %v", b.GetName())
b.RLock()
terminate = b.updater != updater || b.closed
b.RUnlock()
if terminate {
logging.Debugf("[%p] Bucket updater: Stopping for bucket: %v", b, b.GetName())
return nil
}

Expand Down Expand Up @@ -416,9 +420,11 @@ func (b *Bucket) UpdateBucket2(streamingFn StreamingFn) error {
b.Unlock()

if streamingFn != nil {
streamingFn(tmpb)
if !streamingFn(tmpb) {
return nil
}
}
logging.Debugf("Bucket updater: Got new configuration for bucket %s", b.GetName())
logging.Debugf("[%p] Bucket updater: Got new configuration for bucket %s", b, b.GetName())

}
// we are here because of an error
Expand Down

0 comments on commit 8abd741

Please sign in to comment.