Skip to content

Commit

Permalink
MB-36095: On pause, send filter messages to all consumers on all assi…
Browse files Browse the repository at this point in the history
…gned VBs

Change-Id: I38c52bca99a0422d24ba1b81fcfce7c8f111d4dc
Reviewed-on: http://review.couchbase.org/116429
Reviewed-by: <satya.nand@couchbase.com>
Tested-by: Jeelan Basha Poola <jeelan.poola@couchbase.com>
  • Loading branch information
jeelanp2003 committed Oct 16, 2019
1 parent 0c43418 commit 02c18cb
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 3 deletions.
1 change: 1 addition & 0 deletions common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ type EventingConsumer interface {
WorkerVbMapUpdate(map[string][]uint16)

SendAssignedVbs()
PauseConsumer()
}

type EventingSuperSup interface {
Expand Down
11 changes: 11 additions & 0 deletions consumer/exported_functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package consumer
import (
"bufio"
"fmt"
"math"
"net"
"os"
"os/exec"
Expand Down Expand Up @@ -747,3 +748,13 @@ func (c *Consumer) GetInsight() *common.Insight {
return nil
}
}

func (c *Consumer) PauseConsumer() {
// send vb filter messages to all assigned VBs to stop processing mutations. Note the use math.MaxUint64 for seqno
assignedVbs, _ := c.getAssignedVbs(c.ConsumerName())
for _, vb := range assignedVbs {
c.sendVbFilterData(vb, math.MaxUint64, false)
}

c.WorkerVbMapUpdate(nil)
}
6 changes: 4 additions & 2 deletions consumer/process_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func (c *Consumer) processDCPEvents() {
for {
if c.cppQueueSizes != nil {
if c.workerQueueCap < c.cppQueueSizes.AggQueueSize ||
c.workerQueueMemCap < (c.sentEventsSize - c.cppQueueSizes.ProcessedEventsSize) {
c.workerQueueMemCap < (c.sentEventsSize-c.cppQueueSizes.ProcessedEventsSize) {
logging.Debugf("%s [%s:%s:%d] Throttling, cpp queue sizes: %+v, event size: %d",
logPrefix, c.workerName, c.tcpPort, c.Pid(), c.cppQueueSizes, c.sentEventsSize)
time.Sleep(10 * time.Millisecond)
Expand Down Expand Up @@ -1313,7 +1313,9 @@ func (c *Consumer) handleStreamEnd(vBucket uint16, last_processed_seqno uint64)
}

c.filterVbEventsRWMutex.Lock()
delete(c.filterVbEvents, vBucket)
if _, ok := c.filterVbEvents[vBucket]; ok {
delete(c.filterVbEvents, vBucket)
}
c.filterVbEventsRWMutex.Unlock()

var vbBlob vbucketKVBlob
Expand Down
2 changes: 1 addition & 1 deletion producer/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ func (p *Producer) Serve() {
logging.Infof("%s [%s:%d] Pausing processing", logPrefix, p.appName, p.LenRunningConsumers())

for _, c := range p.getConsumers() {
c.WorkerVbMapUpdate(nil)
c.PauseConsumer()
c.ResetBootstrapDone()
c.CloseAllRunningDcpFeeds()
}
Expand Down

0 comments on commit 02c18cb

Please sign in to comment.