Skip to content

Commit

Permalink
Address PR feedback about avoiding external locking
Browse files Browse the repository at this point in the history
  • Loading branch information
Traun Leyden committed May 30, 2018
1 parent 5e884e6 commit 0be977f
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 42 deletions.
37 changes: 22 additions & 15 deletions db/change_cache.go
Expand Up @@ -91,6 +91,8 @@ type CacheOptions struct {
// Initializes a new changeCache.
// lastSequence is the last known database sequence assigned.
// notifyChange is an optional function that will be called to notify of channel changes.
// After calling Init(), you must call .Start() to start useing the cache, otherwise it will be in a locked state
// and callers will block on trying to obtain the lock.
func (c *changeCache) Init(context *DatabaseContext, notifyChange func(base.Set), options *CacheOptions, indexOptions *ChannelIndexOptions) error {
c.context = context

Expand Down Expand Up @@ -153,6 +155,24 @@ func (c *changeCache) Init(context *DatabaseContext, notifyChange func(base.Set)
}
}()

// Lock the cache -- not usable until .Start() called. This fixes the DCP startup race condition documented in SG #3558.
c.lock.Lock()

return nil
}

func (c *changeCache) Start() error {

// Unlock the cache after this function returns.
defer c.lock.Unlock()

// Find the current global doc sequence and use that for the initial sequence for the change cache
lastSequence, err := c.context.LastSequence()
if err != nil {
return err
}

c._setInitialSequence(lastSequence)
return nil
}

Expand Down Expand Up @@ -760,21 +780,8 @@ func (c *changeCache) getOldestSkippedSequence() uint64 {
}
}

// Lock the cache during startup. While locked, incoming DCP changes will not be processed since they will be waiting for the lock.
func (c *changeCache) StartupLock() {
c.lock.Lock()
}

// Unlock the cache after it's ready to receive DCP changes. Typically called after SetInitialSequence() has
// initialized the cache with the initialSequence
func (c *changeCache) StartupUnlock() {
c.lock.Unlock()
}

// Set the initial sequence.
// Think of this as _SetInitialSequence() -- since it presumes that change chache is locked. It is not safe to
// call c.lock.Lock() here, it will cause a deadlock.
func (c *changeCache) SetInitialSequence(initialSequence uint64) {
// Set the initial sequence. Presumes that change chache is already locked.
func (c *changeCache) _setInitialSequence(initialSequence uint64) {
c.initialSequence = initialSequence
c.nextSequence = initialSequence + 1
}
Expand Down
11 changes: 3 additions & 8 deletions db/change_index.go
Expand Up @@ -32,6 +32,9 @@ type ChangeIndex interface {
// Stop the index
Stop()

// Start the index
Start() error

// Clear the index
Clear() error

Expand Down Expand Up @@ -61,14 +64,6 @@ type ChangeIndex interface {
getOldestSkippedSequence() uint64
getChannelCache(channelName string) *channelCache

// Explicitly lock the index so that DCP changes will not be processed.
StartupLock()
StartupUnlock()

// Set the initial sequence, which represents the current "latest global document sequence number".
// Only valid for non-accel scenarios.
SetInitialSequence(sequence uint64)

// Unit test support
waitForSequence(sequence uint64, maxWaitTime time.Duration)
waitForSequenceWithMissing(sequence uint64, maxWaitTime time.Duration)
Expand Down
21 changes: 5 additions & 16 deletions db/database.go
Expand Up @@ -226,7 +226,7 @@ func NewDatabaseContext(dbName string, bucket base.Bucket, autoImport bool, opti
context.mutationListener.Notify(changedChannels)
}

// Initialize the ChangeCache
// Initialize the ChangeCache. Will be locked and unusable until .Start() is called (SG #3558)
context.changeCache.Init(
context,
notifyChange,
Expand All @@ -250,10 +250,6 @@ func NewDatabaseContext(dbName string, bucket base.Bucket, autoImport bool, opti
if options.IndexOptions == nil || options.TrackDocs {
base.Infof(base.KeyDCP, "Starting mutation feed on bucket %v due to either channel cache mode or doc tracking (auto-import/bucketshadow)", base.MD(bucket.GetName()))

// Lock the change cache so we don't process any incoming DCP events before the
// cache is ready. (SG #3558)
context.changeCache.StartupLock()

err = context.mutationListener.Start(bucket, options.TrackDocs, feedMode, func(bucket string, err error) {

msgFormat := "%v dropped Mutation Feed (TAP/DCP) due to error: %v, taking offline"
Expand Down Expand Up @@ -304,25 +300,18 @@ func NewDatabaseContext(dbName string, bucket base.Bucket, autoImport bool, opti

})

// Check if there is an error starting the DCP feed
if err != nil {
// In the case of an error, unlock the change cache despite the fact it's in an unitialized state.
context.changeCache.StartupUnlock()
context.changeCache = nil
return nil, err
}

// Find the current global doc sequence and use that for the initial sequence for the change cache
lastSequence, err := context.LastSequence()
// Unlock change cache
err := context.changeCache.Start()
if err != nil {
context.changeCache.StartupUnlock()
return nil, err
}

// Set the initial sequence
context.changeCache.SetInitialSequence(lastSequence)

// Unlock change cache
context.changeCache.StartupUnlock()

}

// Load providers into provider map. Does basic validation on the provider definition, and identifies the default provider.
Expand Down
4 changes: 1 addition & 3 deletions db/kv_change_index.go
Expand Up @@ -410,9 +410,7 @@ func (k *kvChangeIndex) generatePartitionStats() (PartitionStats, error) {
}

// The following are no-ops for kvChangeIndex.
func (k *kvChangeIndex) StartupLock() {}
func (k *kvChangeIndex) StartupUnlock() {}
func (k *kvChangeIndex) SetInitialSequence(sequence uint64) {}
func (k *kvChangeIndex) Start() (error) { return nil }

func IsNotFoundError(err error) bool {
return strings.Contains(strings.ToLower(err.Error()), "not found")
Expand Down

0 comments on commit 0be977f

Please sign in to comment.