Skip to content

Commit

Permalink
Feature/issue 2626 processentry notification (#2635) (#2751)
Browse files Browse the repository at this point in the history
* Notes from call w/ Adam

* Notes on container/heap usage

* Fixes #2626 processEntry calls can skip notification when it unblocks pending sequences

# Conflicts:
#	db/change_cache.go
  • Loading branch information
tleyden authored and adamcfraser committed Jul 28, 2017
1 parent c484d7b commit e4fd91a
Showing 1 changed file with 35 additions and 8 deletions.
43 changes: 35 additions & 8 deletions db/change_cache.go
Expand Up @@ -231,6 +231,8 @@ func (c *changeCache) CleanSkippedSequenceQueue() bool {
return found, deletes
}()

changedChannelsCombined := base.Set{}

// Add found entries
for _, entry := range foundEntries {
entry.Skipped = true
Expand All @@ -242,7 +244,15 @@ func (c *changeCache) CleanSkippedSequenceQueue() bool {
continue
}
entry.Channels = doc.Channels
c.processEntry(entry)

changedChannels := c.processEntry(entry)
changedChannelsCombined = changedChannelsCombined.Union(changedChannels)
}

// Since the calls to processEntry() above may unblock pending sequences, if there were any changed channels we need
// to notify any change listeners that are working changes feeds for these channels
if c.onChange != nil && len(changedChannelsCombined) > 0 {
c.onChange(changedChannelsCombined)
}

// Purge pending deletes
Expand Down Expand Up @@ -311,8 +321,12 @@ func (c *changeCache) waitForSequenceWithMissing(sequence uint64) {
// The JSON must be the raw document from the bucket, with the metadata and all.
func (c *changeCache) DocChanged(docID string, docJSON []byte, seq uint64, vbNo uint16) {
entryTime := time.Now()

// ** This method does not directly access any state of c, so it doesn't lock.
go func() {

changedChannelsCombined := base.Set{}

// Is this a user/role doc?
if strings.HasPrefix(docID, auth.UserKeyPrefix) {
c.processPrincipalDoc(docID, docJSON, true)
Expand Down Expand Up @@ -351,9 +365,11 @@ func (c *changeCache) DocChanged(docID string, docJSON []byte, seq uint64, vbNo
Sequence: seq,
TimeReceived: time.Now(),
}
c.processEntry(change)
changedChannels := c.processEntry(change)
changedChannelsCombined = changedChannelsCombined.Union(changedChannels)
}


// If the recent sequence history includes any sequences earlier than the current sequence, and
// not already seen by the gateway (more recent than c.nextSequence), add them as empty entries
// so that they are included in sequence buffering.
Expand Down Expand Up @@ -382,7 +398,8 @@ func (c *changeCache) DocChanged(docID string, docJSON []byte, seq uint64, vbNo
change.Channels = channelRemovals
}

c.processEntry(change)
changedChannels := c.processEntry(change)
changedChannelsCombined = changedChannelsCombined.Union(changedChannels)
}
}
}
Expand All @@ -400,8 +417,11 @@ func (c *changeCache) DocChanged(docID string, docJSON []byte, seq uint64, vbNo
base.LogTo("Cache", "Received #%d after %3dms (%q / %q)", change.Sequence, int(tapLag/time.Millisecond), change.DocID, change.RevID)

changedChannels := c.processEntry(change)
if c.onChange != nil && len(changedChannels) > 0 {
c.onChange(changedChannels)
changedChannelsCombined = changedChannelsCombined.Union(changedChannels)

// Notify change listeners for all of the changed channels
if c.onChange != nil && len(changedChannelsCombined) > 0 {
c.onChange(changedChannelsCombined)
}
}()
}
Expand All @@ -417,7 +437,7 @@ func (c *changeCache) unmarshalPrincipal(docJSON []byte, isUser bool) (auth.Prin
}

// Process unused sequence notification. Extracts sequence from docID and sends to cache for buffering
func (c *changeCache) processUnusedSequence(docID string) {
func (c *changeCache) processUnusedSequence(docID string) {
sequenceStr := strings.TrimPrefix(docID, UnusedSequenceKeyPrefix)
sequence, err := strconv.ParseUint(sequenceStr, 10, 64)
if err != nil {
Expand All @@ -429,7 +449,14 @@ func (c *changeCache) processUnusedSequence(docID string) {
TimeReceived: time.Now(),
}
base.LogTo("Cache", "Received #%d (unused sequence)", sequence)
c.processEntry(change)

// Since processEntry may unblock pending sequences, if there were any changed channels we need
// to notify any change listeners that are working changes feeds for these channels
changedChannels := c.processEntry(change)
if c.onChange != nil && len(changedChannels) > 0 {
c.onChange(changedChannels)
}

}

func (c *changeCache) processPrincipalDoc(docID string, docJSON []byte, isUser bool) {
Expand Down Expand Up @@ -673,7 +700,7 @@ func (c *changeCache) getOldestSkippedSequence() uint64 {
}
}

//////// LOG PRIORITY QUEUE
//////// LOG PRIORITY QUEUE -- container/heap callbacks that should not be called directly. Use heap.Init/Push/etc()

func (h LogPriorityQueue) Len() int { return len(h) }
func (h LogPriorityQueue) Less(i, j int) bool { return h[i].Sequence < h[j].Sequence }
Expand Down

0 comments on commit e4fd91a

Please sign in to comment.