Skip to content

Commit

Permalink
MB-53950 initial commit
Browse files Browse the repository at this point in the history
Change-Id: I0199c9915120eb21d2e210cc97fa331a276574df
Reviewed-on: https://review.couchbase.org/c/go-couchbase/+/182721
Reviewed-by: Donald Haggart <donald.haggart@couchbase.com>
Reviewed-by: Sitaram Vemulapalli <sitaram.vemulapalli@couchbase.com>
Tested-by: Marco Greco <marco.greco@couchbase.com>
Well-Formed: Restriction Checker
  • Loading branch information
Marco Greco committed Dec 2, 2022
1 parent 4f3f882 commit fb66b75
Show file tree
Hide file tree
Showing 8 changed files with 101 additions and 3 deletions.
1 change: 1 addition & 0 deletions cbdatasource/example/dump.go
Expand Up @@ -10,6 +10,7 @@
// implied. See the License for the specific language governing
// permissions and limitations under the License.

//go:build !windows
// +build !windows

package main
Expand Down
1 change: 1 addition & 0 deletions cbdatasource/example/dump_windows.go
Expand Up @@ -10,6 +10,7 @@
// implied. See the License for the specific language governing
// permissions and limitations under the License.

//go:build windows || !darwin || !freebsd || !linux || !openbsd || !netbsd
// +build windows !darwin !freebsd !linux !openbsd !netbsd

package main
Expand Down
1 change: 1 addition & 0 deletions platform/platform.go
Expand Up @@ -7,6 +7,7 @@
// either express or implied. See the License for the specific language governing permissions
// and limitations under the License.

//go:build !windows
// +build !windows

package platform
Expand Down
1 change: 1 addition & 0 deletions platform/platform_windows.go
Expand Up @@ -7,6 +7,7 @@
// either express or implied. See the License for the specific language governing permissions
// and limitations under the License.

//go:build windows
// +build windows

package platform
Expand Down
1 change: 1 addition & 0 deletions platform/sync.go
Expand Up @@ -3,6 +3,7 @@
// This is for 64-bit OS and hence is a no-op effectively.
//

//go:build !386
// +build !386

package platform
Expand Down
1 change: 1 addition & 0 deletions platform/sync_386.go
Expand Up @@ -2,6 +2,7 @@
// This is a thin wrapper around sync/atomic to help with alignment issues.
//

//go:build 386
// +build 386

package platform
Expand Down
53 changes: 51 additions & 2 deletions pools.go
Expand Up @@ -188,8 +188,9 @@ type Node struct {

// A Pool of nodes and buckets.
type Pool struct {
BucketMap map[string]*Bucket
Nodes []Node
sync.RWMutex // for BucketMap
BucketMap map[string]*Bucket
Nodes []Node

BucketURL map[string]string `json:"buckets"`

Expand Down Expand Up @@ -258,6 +259,8 @@ type Bucket struct {
closed bool

dedicatedPool bool // Set if the pool instance above caters to this Bucket alone

updater io.ReadCloser
}

// PoolServices is all the bucket-independent services in a pool
Expand Down Expand Up @@ -311,6 +314,45 @@ func (b *Bucket) VBServerMap() *VBucketServerMap {
return ret
}

func (b *Bucket) ChangedVBServerMap(new *VBucketServerMap) bool {
b.RLock()
defer b.RUnlock()
return b.changedVBServerMap(new)
}

func (b *Bucket) changedVBServerMap(new *VBucketServerMap) bool {
old := (*VBucketServerMap)(b.vBucketServerMap)
if new.NumReplicas != old.NumReplicas {
return true
}
if len(new.ServerList) != len(old.ServerList) {
return true
}

// this will also catch the same server list in different order,
// but better safe than sorry
for i, s := range new.ServerList {
if s != old.ServerList[i] {
return true
}
}

if len(new.VBucketMap) != len(old.VBucketMap) {
return true
}
for i, v := range new.VBucketMap {
if len(v) != len(old.VBucketMap[i]) {
return true
}
for j, n := range v {
if old.VBucketMap[i][j] != n {
return true
}
}
}
return false
}

func (b *Bucket) GetVBmap(addrs []string) (map[string][]uint16, error) {
vbmap := b.VBServerMap()
servers := vbmap.ServerList
Expand Down Expand Up @@ -1640,6 +1682,13 @@ func (b *Bucket) Close() {
}
}

func (b *Bucket) StopUpdater() {
if b.updater != nil {
b.updater.Close()
b.updater = nil
}
}

func bucketFinalizer(b *Bucket) {
if b.connPools != nil {
if !b.closed {
Expand Down
45 changes: 44 additions & 1 deletion streaming.go
Expand Up @@ -7,6 +7,7 @@ import (
"io/ioutil"
"net"
"net/http"
"strings"
"time"
"unsafe"

Expand Down Expand Up @@ -65,11 +66,27 @@ func (b *Bucket) RunBucketUpdater(notify NotifyFn) {
}

func (b *Bucket) RunBucketUpdater2(streamingFn StreamingFn, notify NotifyFn) {

b.Lock()
if b.updater != nil {
b.updater.Close()
b.updater = nil
}
b.Unlock()
go func() {
err := b.UpdateBucket2(streamingFn)
if err != nil {
if notify != nil {
notify(b.GetName(), err)
name := b.GetName()
notify(name, err)

// MB-49772 get rid of the deleted bucket
p := b.pool
b.Close()
p.Lock()
p.BucketMap[name] = nil
delete(p.BucketMap, name)
p.Unlock()
}
logging.Errorf(" Bucket Updater exited with err %v", err)
}
Expand Down Expand Up @@ -101,6 +118,15 @@ func (b *Bucket) UpdateBucket2(streamingFn StreamingFn) error {
var failures int
var returnErr error
var poolServices PoolServices
var updater io.ReadCloser

defer func() {
b.Lock()
if b.updater == updater {
b.updater = nil
}
b.Unlock()
}()

for {

Expand Down Expand Up @@ -158,6 +184,18 @@ func (b *Bucket) UpdateBucket2(streamingFn StreamingFn) error {
failures++
continue
}
b.Lock()
if b.updater == updater {
b.updater = res.Body
updater = b.updater
} else {
// another updater is running and we should exit cleanly
b.Unlock()
res.Body.Close()
logging.Debugf("Bucket updater: New updater found for bucket: %v", b.GetName())
return nil
}
b.Unlock()

dec := json.NewDecoder(res.Body)

Expand All @@ -168,6 +206,11 @@ func (b *Bucket) UpdateBucket2(streamingFn StreamingFn) error {
if err != nil {
returnErr = err
res.Body.Close()
// if this was closed under us it means a new updater is starting so exit cleanly
if strings.Contains(err.Error(), "use of closed network connection") {
logging.Debugf("Bucket updater: Notified of new updater for bucket: %v", b.GetName())
return nil
}
break
}

Expand Down

0 comments on commit fb66b75

Please sign in to comment.