From fb66b757f0b7dbed2e57b066c32050655c2bff38 Mon Sep 17 00:00:00 2001 From: Marco Greco Date: Mon, 14 Nov 2022 14:32:05 +0000 Subject: [PATCH] MB-53950 initial commit Change-Id: I0199c9915120eb21d2e210cc97fa331a276574df Reviewed-on: https://review.couchbase.org/c/go-couchbase/+/182721 Reviewed-by: Donald Haggart Reviewed-by: Sitaram Vemulapalli Tested-by: Marco Greco Well-Formed: Restriction Checker --- cbdatasource/example/dump.go | 1 + cbdatasource/example/dump_windows.go | 1 + platform/platform.go | 1 + platform/platform_windows.go | 1 + platform/sync.go | 1 + platform/sync_386.go | 1 + pools.go | 53 ++++++++++++++++++++++++++-- streaming.go | 45 ++++++++++++++++++++++- 8 files changed, 101 insertions(+), 3 deletions(-) diff --git a/cbdatasource/example/dump.go b/cbdatasource/example/dump.go index e467204..a65d387 100644 --- a/cbdatasource/example/dump.go +++ b/cbdatasource/example/dump.go @@ -10,6 +10,7 @@ // implied. See the License for the specific language governing // permissions and limitations under the License. +//go:build !windows // +build !windows package main diff --git a/cbdatasource/example/dump_windows.go b/cbdatasource/example/dump_windows.go index 98d3543..7a24ef5 100644 --- a/cbdatasource/example/dump_windows.go +++ b/cbdatasource/example/dump_windows.go @@ -10,6 +10,7 @@ // implied. See the License for the specific language governing // permissions and limitations under the License. +//go:build windows || !darwin || !freebsd || !linux || !openbsd || !netbsd // +build windows !darwin !freebsd !linux !openbsd !netbsd package main diff --git a/platform/platform.go b/platform/platform.go index 6675802..74707ba 100644 --- a/platform/platform.go +++ b/platform/platform.go @@ -7,6 +7,7 @@ // either express or implied. See the License for the specific language governing permissions // and limitations under the License. +//go:build !windows // +build !windows package platform diff --git a/platform/platform_windows.go b/platform/platform_windows.go index f58fb5b..37decf1 100644 --- a/platform/platform_windows.go +++ b/platform/platform_windows.go @@ -7,6 +7,7 @@ // either express or implied. See the License for the specific language governing permissions // and limitations under the License. +//go:build windows // +build windows package platform diff --git a/platform/sync.go b/platform/sync.go index 0b15246..bf5a022 100644 --- a/platform/sync.go +++ b/platform/sync.go @@ -3,6 +3,7 @@ // This is for 64-bit OS and hence is a no-op effectively. // +//go:build !386 // +build !386 package platform diff --git a/platform/sync_386.go b/platform/sync_386.go index 23ae533..094acf5 100644 --- a/platform/sync_386.go +++ b/platform/sync_386.go @@ -2,6 +2,7 @@ // This is a thin wrapper around sync/atomic to help with alignment issues. // +//go:build 386 // +build 386 package platform diff --git a/pools.go b/pools.go index 92d684f..7fcbd60 100644 --- a/pools.go +++ b/pools.go @@ -188,8 +188,9 @@ type Node struct { // A Pool of nodes and buckets. type Pool struct { - BucketMap map[string]*Bucket - Nodes []Node + sync.RWMutex // for BucketMap + BucketMap map[string]*Bucket + Nodes []Node BucketURL map[string]string `json:"buckets"` @@ -258,6 +259,8 @@ type Bucket struct { closed bool dedicatedPool bool // Set if the pool instance above caters to this Bucket alone + + updater io.ReadCloser } // PoolServices is all the bucket-independent services in a pool @@ -311,6 +314,45 @@ func (b *Bucket) VBServerMap() *VBucketServerMap { return ret } +func (b *Bucket) ChangedVBServerMap(new *VBucketServerMap) bool { + b.RLock() + defer b.RUnlock() + return b.changedVBServerMap(new) +} + +func (b *Bucket) changedVBServerMap(new *VBucketServerMap) bool { + old := (*VBucketServerMap)(b.vBucketServerMap) + if new.NumReplicas != old.NumReplicas { + return true + } + if len(new.ServerList) != len(old.ServerList) { + return true + } + + // this will also catch the same server list in different order, + // but better safe than sorry + for i, s := range new.ServerList { + if s != old.ServerList[i] { + return true + } + } + + if len(new.VBucketMap) != len(old.VBucketMap) { + return true + } + for i, v := range new.VBucketMap { + if len(v) != len(old.VBucketMap[i]) { + return true + } + for j, n := range v { + if old.VBucketMap[i][j] != n { + return true + } + } + } + return false +} + func (b *Bucket) GetVBmap(addrs []string) (map[string][]uint16, error) { vbmap := b.VBServerMap() servers := vbmap.ServerList @@ -1640,6 +1682,13 @@ func (b *Bucket) Close() { } } +func (b *Bucket) StopUpdater() { + if b.updater != nil { + b.updater.Close() + b.updater = nil + } +} + func bucketFinalizer(b *Bucket) { if b.connPools != nil { if !b.closed { diff --git a/streaming.go b/streaming.go index 2bfc566..4287938 100644 --- a/streaming.go +++ b/streaming.go @@ -7,6 +7,7 @@ import ( "io/ioutil" "net" "net/http" + "strings" "time" "unsafe" @@ -65,11 +66,27 @@ func (b *Bucket) RunBucketUpdater(notify NotifyFn) { } func (b *Bucket) RunBucketUpdater2(streamingFn StreamingFn, notify NotifyFn) { + + b.Lock() + if b.updater != nil { + b.updater.Close() + b.updater = nil + } + b.Unlock() go func() { err := b.UpdateBucket2(streamingFn) if err != nil { if notify != nil { - notify(b.GetName(), err) + name := b.GetName() + notify(name, err) + + // MB-49772 get rid of the deleted bucket + p := b.pool + b.Close() + p.Lock() + p.BucketMap[name] = nil + delete(p.BucketMap, name) + p.Unlock() } logging.Errorf(" Bucket Updater exited with err %v", err) } @@ -101,6 +118,15 @@ func (b *Bucket) UpdateBucket2(streamingFn StreamingFn) error { var failures int var returnErr error var poolServices PoolServices + var updater io.ReadCloser + + defer func() { + b.Lock() + if b.updater == updater { + b.updater = nil + } + b.Unlock() + }() for { @@ -158,6 +184,18 @@ func (b *Bucket) UpdateBucket2(streamingFn StreamingFn) error { failures++ continue } + b.Lock() + if b.updater == updater { + b.updater = res.Body + updater = b.updater + } else { + // another updater is running and we should exit cleanly + b.Unlock() + res.Body.Close() + logging.Debugf("Bucket updater: New updater found for bucket: %v", b.GetName()) + return nil + } + b.Unlock() dec := json.NewDecoder(res.Body) @@ -168,6 +206,11 @@ func (b *Bucket) UpdateBucket2(streamingFn StreamingFn) error { if err != nil { returnErr = err res.Body.Close() + // if this was closed under us it means a new updater is starting so exit cleanly + if strings.Contains(err.Error(), "use of closed network connection") { + logging.Debugf("Bucket updater: Notified of new updater for bucket: %v", b.GetName()) + return nil + } break }