Skip to content

Commit

Permalink
Backport CBG-267 to 2.1.3 (#4020)
Browse files Browse the repository at this point in the history
  • Loading branch information
adamcfraser committed Mar 15, 2019
1 parent 1cac85b commit 4a7326f
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 0 deletions.
28 changes: 28 additions & 0 deletions base/dcp_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
package base

import (
"bytes"
"encoding/json"
"errors"
"expvar"
Expand Down Expand Up @@ -151,9 +152,33 @@ func (r *DCPReceiver) OnError(err error) {
dcpExpvars.Add("onError_count", 1)
}

// Only a subset of Sync Gateway's internal documents need to be included during DCP processing: user, role, and
// unused sequence documents. Any other documents with the leading '_sync' prefix can be ignored.
// Returns true for documents that should be processed, false for those that do not need processing.
// TODO: The hardcoded strings here should be changed to constants (CBG-274)
func dcpKeyFilter(key []byte) bool {

// If it's not a _sync doc, process
if !bytes.HasPrefix(key, []byte("_sync")) {
return true
}

// User, role and unused sequence markers should be processed
if bytes.HasPrefix(key, []byte("_sync:unused")) ||
bytes.HasPrefix(key, []byte("_sync:user:")) ||
bytes.HasPrefix(key, []byte("_sync:role:")) {
return true
}

return false
}

func (r *DCPReceiver) DataUpdate(vbucketId uint16, key []byte, seq uint64,
req *gomemcached.MCRequest) error {
dcpExpvars.Add("dataUpdate_count", 1)
if !dcpKeyFilter(key) {
return nil
}
r.updateSeq(vbucketId, seq, true)
shouldPersistCheckpoint := r.callback(makeFeedEvent(req, vbucketId, sgbucket.FeedOpMutation))
if shouldPersistCheckpoint {
Expand All @@ -165,6 +190,9 @@ func (r *DCPReceiver) DataUpdate(vbucketId uint16, key []byte, seq uint64,
func (r *DCPReceiver) DataDelete(vbucketId uint16, key []byte, seq uint64,
req *gomemcached.MCRequest) error {
dcpExpvars.Add("dataDelete_count", 1)
if !dcpKeyFilter(key) {
return nil
}
r.updateSeq(vbucketId, seq, true)
shouldPersistCheckpoint := r.callback(makeFeedEvent(req, vbucketId, sgbucket.FeedOpDeletion))
if shouldPersistCheckpoint {
Expand Down
14 changes: 14 additions & 0 deletions base/dcp_feed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ func TestTransformBucketCredentials(t *testing.T) {
inputPassword,
inputBucketName,
)

assert.Equals(t, username, inputUsername)
assert.Equals(t, password, inputPassword)
assert.Equals(t, bucketname, inputBucketName)
Expand All @@ -38,3 +39,16 @@ func TestTransformBucketCredentials(t *testing.T) {
assert.Equals(t, bucketname2, inputBucketName2)

}

func TestDCPKeyFilter(t *testing.T) {

assert.True(t, dcpKeyFilter([]byte("doc123")))
assert.True(t, dcpKeyFilter([]byte("_sync:user:user1")))
assert.True(t, dcpKeyFilter([]byte("_sync:role:role2")))
assert.True(t, dcpKeyFilter([]byte("_sync:unusedSeq:1234")))

assert.False(t, dcpKeyFilter([]byte("_sync:seq")))
assert.False(t, dcpKeyFilter([]byte("_sync:unusualSeq")))
assert.False(t, dcpKeyFilter([]byte("_sync:syncdata")))
assert.False(t, dcpKeyFilter([]byte("_sync:dcp_ck:12")))
}

0 comments on commit 4a7326f

Please sign in to comment.