Skip to content

Commit

Permalink
remove caching from CfgCB due to races
Browse files Browse the repository at this point in the history
See couchbase/sync_gateway#1392

In a cluster using cfg_cb, there can be startup races where multiple
nodes are registering, and overwrite each other because (previously)
the CAS implementation was a local-only cfgMem cache.

This change implements CAS correctly via server-side Couchbase, at the
tradeoff of performance in order to have more correctness.

Change-Id: I9aeed7ba0a17f98a91203ba68935515f768af2e3
Reviewed-on: http://review.couchbase.org/58359
Reviewed-by: Steve Yen <steve.yen@gmail.com>
Tested-by: Steve Yen <steve.yen@gmail.com>
  • Loading branch information
steveyen committed Jan 12, 2016
1 parent 11f31c1 commit b724ae7
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 95 deletions.
220 changes: 132 additions & 88 deletions cfg_cb.go
Expand Up @@ -34,11 +34,12 @@ type CfgCB struct {
url *url.URL
bucket string
b *couchbase.Bucket
cfgMem *CfgMem
cfgKey string

options map[string]interface{}

subscriptions map[string][]chan<- CfgEvent // Keyed by key.

bds cbdatasource.BucketDataSource
bdsm sync.Mutex
seqs map[uint16]uint64 // To track max seq #'s we received per vbucketId.
Expand Down Expand Up @@ -81,9 +82,10 @@ func NewCfgCBEx(urlStr, bucket string,
urlStr: urlStr,
url: url,
bucket: bucket,
cfgMem: NewCfgMem(),
cfgKey: keyPrefix + "cfg",
options: options,

subscriptions: make(map[string][]chan<- CfgEvent),
}

b, err := c.getBucket() // TODO: Need to b.Close()?
Expand Down Expand Up @@ -111,141 +113,187 @@ func NewCfgCBEx(urlStr, bucket string,
return c, nil
}

// DEPRECATED: Please instead use the NewCfgCBEx() API with an options
// map entry of "keyPrefix".
//
// The old docs for existing SetKeyPrefix users: SetKeyPrefix changes
// the key prefix that the CfgCB will use as it reads/writes its
// documents to the couchbase bucket (default key prefix is ""). Use
// SetKeyPrefix with care, as you must arrange all nodes in the
// cluster to use the same key prefix. The SetKeyPrefix should be
// used right after NewCfgCB.
func (c *CfgCB) SetKeyPrefix(keyPrefix string) {
if c.options == nil {
c.options = map[string]interface{}{}
}
c.options["keyPrefix"] = keyPrefix
c.Refresh()
}

func (c *CfgCB) Get(key string, cas uint64) (
[]byte, uint64, error) {
c.m.Lock()
defer c.m.Unlock()

return c.cfgMem.Get(key, cas)
bucket, err := c.getBucket()
if err != nil {
return nil, 0, err
}

cfgBuf, _, cfgCAS, err := bucket.GetsRaw(c.cfgKey)
if err != nil && !gomemcached.IsNotFound(err) {
return nil, 0, err
}

if cas != 0 && cas != cfgCAS {
return nil, 0, &CfgCASError{}
}

cfgMem := NewCfgMem()
if cfgBuf != nil {
err = json.Unmarshal(cfgBuf, cfgMem)
if err != nil {
return nil, 0, err
}
}

val, _, err := cfgMem.Get(key, 0)
if err != nil {
return nil, 0, err
}

return val, cfgCAS, nil
}

func (c *CfgCB) Set(key string, val []byte, cas uint64) (
uint64, error) {
c.m.Lock()
defer c.m.Unlock()

cas, err := c.cfgMem.Set(key, val, cas)
bucket, err := c.getBucket()
if err != nil {
return 0, err
}

err = c.unlockedSave()
if err != nil {
cfgBuf, _, cfgCAS, err := bucket.GetsRaw(c.cfgKey)
if err != nil && !gomemcached.IsNotFound(err) {
return 0, err
}
return cas, err
}

func (c *CfgCB) Del(key string, cas uint64) error {
c.m.Lock()
defer c.m.Unlock()

err := c.cfgMem.Del(key, cas)
if err != nil {
return err
if cas != 0 && cas != cfgCAS {
return 0, &CfgCASError{}
}
return c.unlockedSave()
}

func (c *CfgCB) Load() error {
c.m.Lock()
defer c.m.Unlock()
cfgMem := NewCfgMem()
if cfgBuf != nil {
err = json.Unmarshal(cfgBuf, cfgMem)
if err != nil {
return 0, err
}
}

return c.unlockedLoad()
}
_, err = cfgMem.Set(key, val, CFG_CAS_FORCE)
if err != nil {
return 0, err
}

func (c *CfgCB) getBucket() (*couchbase.Bucket, error) {
if c.b == nil {
b, err := couchbase.GetBucket(c.urlStr, "default", c.bucket)
if err != nil {
return nil, err
nextCAS, err := bucket.Cas(c.cfgKey, 0, cfgCAS, cfgMem)
if err != nil {
if res, ok := err.(*gomemcached.MCResponse); ok {
if res.Status == gomemcached.KEY_EEXISTS {
return 0, &CfgCASError{}
}
}
c.b = b
return 0, err
}
return c.b, nil

c.fireEvent(key, nextCAS, nil)

return nextCAS, err
}

func (c *CfgCB) unlockedLoad() error {
func (c *CfgCB) Del(key string, cas uint64) error {
c.m.Lock()
defer c.m.Unlock()

bucket, err := c.getBucket()
if err != nil {
return err
}

buf, err := bucket.GetRaw(c.cfgKey)
cfgBuf, _, cfgCAS, err := bucket.GetsRaw(c.cfgKey)
if err != nil && !gomemcached.IsNotFound(err) {
return err
}

if cas != 0 && cas != cfgCAS {
return &CfgCASError{}
}

cfgMem := NewCfgMem()
if buf != nil {
err = json.Unmarshal(buf, cfgMem)
if cfgBuf != nil {
err = json.Unmarshal(cfgBuf, cfgMem)
if err != nil {
return err
}
}

cfgMemPrev := c.cfgMem
cfgMemPrev.m.Lock()
defer cfgMemPrev.m.Unlock()
err = cfgMem.Del(key, 0)
if err != nil {
return err
}

cfgMem.subscriptions = cfgMemPrev.subscriptions
nextCAS, err := bucket.Cas(c.cfgKey, 0, cfgCAS, cfgMem)
if err != nil {
if res, ok := err.(*gomemcached.MCResponse); ok {
if res.Status == gomemcached.KEY_EEXISTS {
return &CfgCASError{}
}
}
return err
}

c.cfgMem = cfgMem
c.fireEvent(key, nextCAS, nil)

return nil
return err
}

func (c *CfgCB) unlockedSave() error {
bucket, err := c.getBucket()
if err != nil {
return err
func (c *CfgCB) getBucket() (*couchbase.Bucket, error) {
if c.b == nil {
b, err := couchbase.GetBucket(c.urlStr, "default", c.bucket)
if err != nil {
return nil, err
}
c.b = b
}

return bucket.Set(c.cfgKey, 0, c.cfgMem)
return c.b, nil
}

func (c *CfgCB) Subscribe(key string, ch chan CfgEvent) error {
c.m.Lock()
defer c.m.Unlock()

return c.cfgMem.Subscribe(key, ch)
a, exists := c.subscriptions[key]
if !exists || a == nil {
a = make([]chan<- CfgEvent, 0)
}
c.subscriptions[key] = append(a, ch)

c.m.Unlock()

return nil
}

func (c *CfgCB) Refresh() error {
func (c *CfgCB) FireEvent(key string, cas uint64, err error) {
c.m.Lock()
defer c.m.Unlock()
c.fireEvent(key, cas, err)
c.m.Unlock()
}

c2, err := NewCfgCBEx(c.urlStr, c.bucket, c.options)
if err != nil {
return err
func (c *CfgCB) fireEvent(key string, cas uint64, err error) {
for _, c := range c.subscriptions[key] {
go func(c chan<- CfgEvent) {
c <- CfgEvent{
Key: key, CAS: cas, Error: err,
}
}(c)
}
}

err = c2.Load()
if err != nil {
return err
func (c *CfgCB) Refresh() error {
c.m.Lock()
for key, cs := range c.subscriptions {
event := CfgEvent{Key: key}
for _, c := range cs {
go func(c chan<- CfgEvent, event CfgEvent) {
c <- event
}(c, event)
}
}

c.cfgMem.CASNext = c2.cfgMem.CASNext
c.cfgMem.Entries = c2.cfgMem.Entries

return c.cfgMem.Refresh()
c.m.Unlock()
return nil
}

// ----------------------------------------------------------------
Expand All @@ -267,17 +315,14 @@ func (r *CfgCB) OnError(err error) {
log.Printf("cfg_cb: OnError, err: %v", err)

go func() {
r.cfgMem.FireEvent("", 0, err)
r.FireEvent("", 0, err)
}()
}

func (r *CfgCB) DataUpdate(vbucketId uint16, key []byte, seq uint64,
req *gomemcached.MCRequest) error {
if string(key) == r.cfgKey {
go func() {
r.Load()
r.cfgMem.Refresh()
}()
go r.Refresh()
}
r.updateSeq(vbucketId, seq)
return nil
Expand All @@ -286,10 +331,7 @@ func (r *CfgCB) DataUpdate(vbucketId uint16, key []byte, seq uint64,
func (r *CfgCB) DataDelete(vbucketId uint16, key []byte, seq uint64,
req *gomemcached.MCRequest) error {
if string(key) == r.cfgKey {
go func() {
r.Load()
r.cfgMem.Refresh()
}()
go r.Refresh()
}
r.updateSeq(vbucketId, seq)
return nil
Expand All @@ -302,20 +344,20 @@ func (r *CfgCB) SnapshotStart(vbucketId uint16,

func (r *CfgCB) SetMetaData(vbucketId uint16, value []byte) error {
r.bdsm.Lock()
defer r.bdsm.Unlock()

if r.meta == nil {
r.meta = make(map[uint16][]byte)
}
r.meta[vbucketId] = value

r.bdsm.Unlock()

return nil
}

func (r *CfgCB) GetMetaData(vbucketId uint16) (
value []byte, lastSeq uint64, err error) {
r.bdsm.Lock()
defer r.bdsm.Unlock()

value = []byte(nil)
if r.meta != nil {
Expand All @@ -326,6 +368,8 @@ func (r *CfgCB) GetMetaData(vbucketId uint16) (
lastSeq = r.seqs[vbucketId]
}

r.bdsm.Unlock()

return value, lastSeq, nil
}

Expand Down
5 changes: 0 additions & 5 deletions cmd/main_cfg.go
Expand Up @@ -97,11 +97,6 @@ func MainCfgCB(baseName, urlStr, bindHttp, register, dataDir string) (
return nil, err
}

err = cfg.Load()
if err != nil {
return nil, err
}

return cfg, nil
}

Expand Down

0 comments on commit b724ae7

Please sign in to comment.