diff --git a/base/dcp_feed.go b/base/dcp_feed.go index f5d63ced8b..c8e92e2d14 100644 --- a/base/dcp_feed.go +++ b/base/dcp_feed.go @@ -52,6 +52,9 @@ const DCPBackfillSeqs = "_sync:dcp_backfill" // Bucket doc used for DCP sequence // DCP will already be sending more documents per snapshot. const kCheckpointThreshold = 1 +// Only persist checkpoint once per kCheckpointTimeThreshold (per vbucket) +const kCheckpointTimeThreshold = 1 * time.Minute + // Persist backfill progress every 10s const kBackfillPersistInterval = 10 * time.Second @@ -94,6 +97,7 @@ type DCPReceiver struct { meta [][]byte // To track metadata blob's per vbucketId. vbuuids map[uint16]uint64 // Map of vbucket uuids, by vbno. Used in cases of manual vbucket metadata creation updatesSinceCheckpoint []uint64 // Number of updates since the last checkpoint. Used to avoid checkpoint persistence feedback loop + lastCheckpointTime []time.Time // Time of last checkpoint persistence, per vbucket. Used to manage checkpoint persistence volume notify sgbucket.BucketNotifyFn // Function to callback when we lose our dcp feed callback sgbucket.FeedEventCallbackFunc // Function to callback for mutation processing backfill backfillStatus // Backfill state and stats @@ -109,6 +113,7 @@ func NewDCPReceiver(callback sgbucket.FeedEventCallbackFunc, bucket Bucket, maxV meta: make([][]byte, maxVbNo), vbuuids: make(map[uint16]uint64, maxVbNo), updatesSinceCheckpoint: make([]uint64, maxVbNo), + lastCheckpointTime: make([]time.Time, maxVbNo), } r.callback = callback @@ -248,11 +253,18 @@ func (r *DCPReceiver) SetMetaData(vbucketId uint16, value []byte) error { // Check persistMeta to avoids persistence if the only feed events we've seen are the DCP echo of DCP checkpoint docs if r.persistCheckpoints && r.updatesSinceCheckpoint[vbucketId] >= kCheckpointThreshold { + + // Don't checkpoint more frequently than kCheckpointTimeThreshold + if time.Since(r.lastCheckpointTime[vbucketId]) < kCheckpointTimeThreshold { + return nil + } + err := r.persistCheckpoint(vbucketId, value) if err != nil { Warnf(KeyAll, "Unable to persist DCP metadata - will retry next snapshot. Error: %v", err) } r.updatesSinceCheckpoint[vbucketId] = 0 + r.lastCheckpointTime[vbucketId] = time.Now() } return nil }