Skip to content

Commit

Permalink
CBG-268 backport to 2.1.3 (#4021)
Browse files Browse the repository at this point in the history
  • Loading branch information
adamcfraser committed Mar 15, 2019
1 parent 4a7326f commit 7143b13
Showing 1 changed file with 12 additions and 0 deletions.
12 changes: 12 additions & 0 deletions base/dcp_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ const DCPBackfillSeqs = "_sync:dcp_backfill" // Bucket doc used for DCP sequence
// DCP will already be sending more documents per snapshot.
const kCheckpointThreshold = 1

// Only persist checkpoint once per kCheckpointTimeThreshold (per vbucket)
const kCheckpointTimeThreshold = 1 * time.Minute

// Persist backfill progress every 10s
const kBackfillPersistInterval = 10 * time.Second

Expand Down Expand Up @@ -94,6 +97,7 @@ type DCPReceiver struct {
meta [][]byte // To track metadata blob's per vbucketId.
vbuuids map[uint16]uint64 // Map of vbucket uuids, by vbno. Used in cases of manual vbucket metadata creation
updatesSinceCheckpoint []uint64 // Number of updates since the last checkpoint. Used to avoid checkpoint persistence feedback loop
lastCheckpointTime []time.Time // Time of last checkpoint persistence, per vbucket. Used to manage checkpoint persistence volume
notify sgbucket.BucketNotifyFn // Function to callback when we lose our dcp feed
callback sgbucket.FeedEventCallbackFunc // Function to callback for mutation processing
backfill backfillStatus // Backfill state and stats
Expand All @@ -109,6 +113,7 @@ func NewDCPReceiver(callback sgbucket.FeedEventCallbackFunc, bucket Bucket, maxV
meta: make([][]byte, maxVbNo),
vbuuids: make(map[uint16]uint64, maxVbNo),
updatesSinceCheckpoint: make([]uint64, maxVbNo),
lastCheckpointTime: make([]time.Time, maxVbNo),
}

r.callback = callback
Expand Down Expand Up @@ -248,11 +253,18 @@ func (r *DCPReceiver) SetMetaData(vbucketId uint16, value []byte) error {

// Check persistMeta to avoids persistence if the only feed events we've seen are the DCP echo of DCP checkpoint docs
if r.persistCheckpoints && r.updatesSinceCheckpoint[vbucketId] >= kCheckpointThreshold {

// Don't checkpoint more frequently than kCheckpointTimeThreshold
if time.Since(r.lastCheckpointTime[vbucketId]) < kCheckpointTimeThreshold {
return nil
}

err := r.persistCheckpoint(vbucketId, value)
if err != nil {
Warnf(KeyAll, "Unable to persist DCP metadata - will retry next snapshot. Error: %v", err)
}
r.updatesSinceCheckpoint[vbucketId] = 0
r.lastCheckpointTime[vbucketId] = time.Now()
}
return nil
}
Expand Down

0 comments on commit 7143b13

Please sign in to comment.