Skip to content

Commit

Permalink
[MB-26859] Don't exit metakv callback functions on Eventing nodes...
Browse files Browse the repository at this point in the history
...that are going to be rebalanced out

As part of change to fail rebalance when one or more apps are undergoing
bootstrap, change was made to exit metakv callback functions for
settings/topology/app code changes if node isn't part of keepNodes
supplied from ns_server. This logic would have problem in-case of
rebalance out of Eventing nodes, because those nodes won't be part of
keepNodes instead would be in list of ejectNodes. The callbacks needs to
be triggered on nodes that are in ejectNodes as they need to give up
vbucket ownerships. Hence this patch adds logic to trigger metakv
callbacks on nodes that are about to be ejected from the cluster.

Change-Id: Ia804341105ec337f24baed00e0942ed89c3f6208
Reviewed-on: http://review.couchbase.org/91058
Reviewed-by: Abhishek Singh <abhishek@couchbase.com>
Tested-by: Abhishek Singh <abhishek@couchbase.com>
  • Loading branch information
abhi-bit committed Mar 16, 2018
1 parent 09800c1 commit b80465a
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 49 deletions.
10 changes: 6 additions & 4 deletions consumer/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
)

func (c *Consumer) doLastSeqNoCheckpoint() {
logPrefix := "Consumer::doLastSeqNoCheckpoint"

c.checkpointTicker = time.NewTicker(c.checkpointInterval)

var vbBlob vbucketKVBlob
Expand All @@ -22,8 +24,8 @@ func (c *Consumer) doLastSeqNoCheckpoint() {
case <-c.checkpointTicker.C:
deployedApps := c.superSup.GetDeployedApps()
if _, ok := deployedApps[c.app.AppName]; !ok {
logging.Infof("CRCH[%s:%s:%s:%d] Returning from checkpoint ticker routine",
c.app.AppName, c.workerName, c.tcpPort, c.Pid())
logging.Infof("%s [%s:%s:%d] Returning from checkpoint ticker routine",
logPrefix, c.workerName, c.tcpPort, c.Pid())
return
}

Expand All @@ -41,8 +43,8 @@ func (c *Consumer) doLastSeqNoCheckpoint() {
util.Retry(util.NewFixedBackoff(bucketOpRetryInterval), getOpCallback, c, vbKey, &vbBlob, &cas, true, &isNoEnt)
if isNoEnt {

logging.Infof("CRCH[%s:%s:%s:%d] vb: %d Creating the initial metadata blob entry",
c.app.AppName, c.workerName, c.tcpPort, c.Pid(), vbno)
logging.Infof("%s [%s:%s:%d] vb: %d Creating the initial metadata blob entry",
logPrefix, c.workerName, c.tcpPort, c.Pid(), vbno)

c.updateCheckpointInfo(vbKey, vbno, &vbBlob)
continue
Expand Down
4 changes: 1 addition & 3 deletions consumer/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,7 @@ const (

restartVbDcpStreamTickInterval = time.Duration(3000) * time.Millisecond

retryVbsStateUpdateInterval = time.Duration(5000) * time.Millisecond

retryVbMetaStateCheckInterval = time.Duration(100) * time.Millisecond
retryVbMetaStateCheckInterval = time.Duration(1000) * time.Millisecond

vbTakeoverRetryInterval = time.Duration(1000) * time.Millisecond

Expand Down
36 changes: 20 additions & 16 deletions consumer/handle_messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,11 +438,13 @@ func (c *Consumer) sendDcpEvent(e *memcached.DcpEvent, sendToDebugger bool) {
}

func (c *Consumer) sendMessageLoop() {
logPrefix := "Consumer::sendMessageLoop"

defer func() {
if r := recover(); r != nil {
trace := debug.Stack()
logging.Errorf("CRHM[%s:%s:%s:%d] sendMessageLoop recover, %r stack trace: %v",
c.app.AppName, c.workerName, c.tcpPort, c.Pid(), r, string(trace))
logging.Errorf("%s [%s:%s:%d] sendMessageLoop recover, %r stack trace: %v",
logPrefix, c.workerName, c.tcpPort, c.Pid(), r, string(trace))
}
}()

Expand All @@ -464,8 +466,8 @@ func (c *Consumer) sendMessageLoop() {
defer c.sendMsgBufferRWMutex.Unlock()
err := binary.Write(c.conn, binary.LittleEndian, c.sendMsgBuffer.Bytes())
if err != nil {
logging.Errorf("CRHM[%s:%s:%s:%d] Write to downstream socket failed, err: %v",
c.app.AppName, c.workerName, c.tcpPort, c.Pid(), err)
logging.Errorf("%s [%s:%s:%d] Write to downstream socket failed, err: %v",
logPrefix, c.workerName, c.tcpPort, c.Pid(), err)
c.client.Stop()
}

Expand All @@ -483,6 +485,8 @@ func (c *Consumer) sendMessageLoop() {
}

func (c *Consumer) sendMessage(m *msgToTransmit) error {
logPrefix := "Consumer::sendMessage"

defer func() {
if m.headerBuilder != nil {
c.putBuilder(m.headerBuilder)
Expand All @@ -499,29 +503,29 @@ func (c *Consumer) sendMessage(m *msgToTransmit) error {
defer c.sendMsgBufferRWMutex.Unlock()
err := binary.Write(&c.sendMsgBuffer, binary.LittleEndian, uint32(len(m.msg.Header)))
if err != nil {
logging.Errorf("CRHM[%s:%s:%s:%d] Failure while writing header size, err : %v",
c.app.AppName, c.workerName, c.tcpPort, c.Pid(), err)
logging.Errorf("%s [%s:%s:%d] Failure while writing header size, err : %v",
logPrefix, c.workerName, c.tcpPort, c.Pid(), err)
return err
}

err = binary.Write(&c.sendMsgBuffer, binary.LittleEndian, uint32(len(m.msg.Payload)))
if err != nil {
logging.Errorf("CRHM[%s:%s:%s:%d] Failure while writing payload size, err: %v",
c.app.AppName, c.workerName, c.tcpPort, c.Pid(), err)
logging.Errorf("%s [%s:%s:%d] Failure while writing payload size, err: %v",
logPrefix, c.workerName, c.tcpPort, c.Pid(), err)
return err
}

err = binary.Write(&c.sendMsgBuffer, binary.LittleEndian, m.msg.Header)
if err != nil {
logging.Errorf("CRHM[%s:%s:%s:%d] Failure while writing encoded header, err: %v",
c.app.AppName, c.workerName, c.tcpPort, c.Pid(), err)
logging.Errorf("%s [%s:%s:%d] Failure while writing encoded header, err: %v",
logPrefix, c.workerName, c.tcpPort, c.Pid(), err)
return err
}

err = binary.Write(&c.sendMsgBuffer, binary.LittleEndian, m.msg.Payload)
if err != nil {
logging.Errorf("CRHM[%s:%s:%s:%d] Failure while writing encoded payload, err: %v",
c.app.AppName, c.workerName, c.tcpPort, c.Pid(), err)
logging.Errorf("%s [%s:%s:%d] Failure while writing encoded payload, err: %v",
logPrefix, c.workerName, c.tcpPort, c.Pid(), err)
return err
}

Expand All @@ -536,16 +540,16 @@ func (c *Consumer) sendMessage(m *msgToTransmit) error {

err = binary.Write(c.conn, binary.LittleEndian, c.sendMsgBuffer.Bytes())
if err != nil {
logging.Errorf("CRHM[%s:%s:%s:%d] Write to downstream socket failed, err: %v",
c.app.AppName, c.workerName, c.tcpPort, c.Pid(), err)
logging.Errorf("%s [%s:%s:%d] Write to downstream socket failed, err: %v",
logPrefix, c.workerName, c.tcpPort, c.Pid(), err)
c.client.Stop()
return err
}
} else if c.debugConn != nil {
err = binary.Write(c.debugConn, binary.LittleEndian, c.sendMsgBuffer.Bytes())
if err != nil {
logging.Errorf("CRHM[%s:%s:%s:%d] Write to debug enabled worker socket failed, err: %v",
c.app.AppName, c.workerName, c.debugTCPPort, c.Pid(), err)
logging.Errorf("%s [%s:%s:%d] Write to debug enabled worker socket failed, err: %v",
logPrefix, c.workerName, c.debugTCPPort, c.Pid(), err)
c.debugConn.Close()
return err
}
Expand Down
5 changes: 4 additions & 1 deletion consumer/http_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ func (c *Consumer) EventsProcessedPSec() *cm.EventProcessingStats {
}

func (c *Consumer) dcpEventsRemainingToProcess() {
logPrefix := "Consumer::dcpEventsRemainingToProcess"

vbsTohandle := c.vbsToHandle()
if len(vbsTohandle) <= 0 {
return
Expand All @@ -47,7 +49,8 @@ func (c *Consumer) dcpEventsRemainingToProcess() {

seqNos, err := util.BucketSeqnos(c.producer.NsServerHostPort(), "default", c.bucket)
if err != nil {
logging.Errorf("CRVT[%s:%s:%s:%d] Failed to fetch get_all_vb_seqnos, err: %v", c.app.AppName, c.workerName, c.tcpPort, c.Pid(), err)
logging.Errorf("%s [%s:%s:%d] Failed to fetch get_all_vb_seqnos, err: %v",
logPrefix, c.workerName, c.tcpPort, c.Pid(), err)
c.dcpEventsRemaining = 0
return
}
Expand Down
44 changes: 24 additions & 20 deletions consumer/vbucket_takeover.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ var errVbOwnedByAnotherWorker = errors.New("vbucket is owned by another worker o
var errVbOwnedByAnotherNode = errors.New("vbucket is owned by another node")

func (c *Consumer) reclaimVbOwnership(vb uint16) error {
logPrefix := "Consumer::reclaimVbOwnership"

var vbBlob vbucketKVBlob
var cas gocb.Cas

Expand All @@ -31,8 +33,8 @@ func (c *Consumer) reclaimVbOwnership(vb uint16) error {
util.Retry(util.NewFixedBackoff(bucketOpRetryInterval), getOpCallback, c, vbKey, &vbBlob, &cas, false)

if vbBlob.NodeUUID == c.NodeUUID() && vbBlob.AssignedWorker == c.ConsumerName() {
logging.Debugf("CRVT[%s:%s:%d] vb: %v successfully reclaimed ownership",
c.workerName, c.tcpPort, c.Pid(), vb)
logging.Debugf("%s [%s:%s:%d] vb: %v successfully reclaimed ownership",
logPrefix, c.workerName, c.tcpPort, c.Pid(), vb)
return nil
}

Expand All @@ -41,19 +43,21 @@ func (c *Consumer) reclaimVbOwnership(vb uint16) error {

// Vbucket ownership give-up routine
func (c *Consumer) vbGiveUpRoutine(vbsts vbStats, giveupWg *sync.WaitGroup) {
logPrefix := "Consumer::vbGiveUpRoutine"

defer giveupWg.Done()

if len(c.vbsRemainingToGiveUp) == 0 {
logging.Tracef("CRVT[%s:%s:%d] No vbuckets remaining to give up",
c.workerName, c.tcpPort, c.Pid())
logging.Tracef("%s [%s:%s:%d] No vbuckets remaining to give up",
logPrefix, c.workerName, c.tcpPort, c.Pid())
return
}

vbsDistribution := util.VbucketDistribution(c.vbsRemainingToGiveUp, c.vbOwnershipGiveUpRoutineCount)

for k, v := range vbsDistribution {
logging.Tracef("CRVT[%s:%s:%d] vb give up routine id: %v, vbs assigned len: %v dump: %v",
c.workerName, c.tcpPort, c.Pid(), k, len(v), util.Condense(v))
logging.Tracef("%s [%s:%s:%d] vb give up routine id: %v, vbs assigned len: %v dump: %v",
logPrefix, c.workerName, c.tcpPort, c.Pid(), k, len(v), util.Condense(v))
}

signalPlasmaClosedChs := make([]chan uint16, 0)
Expand All @@ -78,14 +82,14 @@ func (c *Consumer) vbGiveUpRoutine(vbsts vbStats, giveupWg *sync.WaitGroup) {
util.Retry(util.NewFixedBackoff(bucketOpRetryInterval), getOpCallback, c, vbKey, &vbBlob, &cas, false)

if vbBlob.NodeUUID != c.NodeUUID() && vbBlob.DCPStreamStatus == dcpStreamRunning {
logging.Tracef("CRVT[%s:giveup_r_%d:%s:%d] vb: %v metadata node uuid: %v dcp stream status: %v, skipping give up phase",
c.workerName, i, c.tcpPort, c.Pid(), vb, vbBlob.NodeUUID, vbBlob.DCPStreamStatus)
logging.Tracef("%s [%s:giveup_r_%d:%s:%d] vb: %v metadata node uuid: %v dcp stream status: %v, skipping give up phase",
logPrefix, c.workerName, i, c.tcpPort, c.Pid(), vb, vbBlob.NodeUUID, vbBlob.DCPStreamStatus)

c.RLock()
err := c.vbDcpFeedMap[vb].DcpCloseStream(vb, vb)
if err != nil {
logging.Errorf("CRVT[%s:giveup_r_%d:%s:%d] vb: %v Failed to close dcp stream, err: %v",
c.workerName, i, c.tcpPort, c.Pid(), vb, err)
logging.Errorf("%s [%s:giveup_r_%d:%s:%d] vb: %v Failed to close dcp stream, err: %v",
logPrefix, c.workerName, i, c.tcpPort, c.Pid(), vb, err)
}
c.RUnlock()

Expand All @@ -98,8 +102,8 @@ func (c *Consumer) vbGiveUpRoutine(vbsts vbStats, giveupWg *sync.WaitGroup) {
continue
}

logging.Tracef("CRVT[%s:giveup_r_%d:%s:%d] vb: %v uuid: %v vbStat uuid: %v owner node: %r consumer name: %v",
c.workerName, i, c.tcpPort, c.Pid(), vb, c.NodeUUID(),
logging.Tracef("%s [%s:giveup_r_%d:%s:%d] vb: %v uuid: %v vbStat uuid: %v owner node: %r consumer name: %v",
logPrefix, c.workerName, i, c.tcpPort, c.Pid(), vb, c.NodeUUID(),
vbsts.getVbStat(vb, "node_uuid"),
vbsts.getVbStat(vb, "current_vb_owner"),
vbsts.getVbStat(vb, "assigned_worker"))
Expand All @@ -121,8 +125,8 @@ func (c *Consumer) vbGiveUpRoutine(vbsts vbStats, giveupWg *sync.WaitGroup) {
c.RLock()
err := c.vbDcpFeedMap[vb].DcpCloseStream(vb, vb)
if err != nil {
logging.Errorf("CRVT[%s:giveup_r_%d:%s:%d] vb: %v Failed to close dcp stream, err: %v",
c.workerName, i, c.tcpPort, c.Pid(), vb, err)
logging.Errorf("%s [%s:giveup_r_%d:%s:%d] vb: %v Failed to close dcp stream, err: %v",
logPrefix, c.workerName, i, c.tcpPort, c.Pid(), vb, err)
}
c.RUnlock()

Expand All @@ -137,13 +141,13 @@ func (c *Consumer) vbGiveUpRoutine(vbsts vbStats, giveupWg *sync.WaitGroup) {
retryVbMetaStateCheck:
util.Retry(util.NewFixedBackoff(bucketOpRetryInterval), getOpCallback, c, vbKey, &vbBlob, &cas, false)

logging.Tracef("CRVT[%s:giveup_r_%d:%s:%d] vb: %v vbsStateUpdate MetaState check",
c.workerName, i, c.tcpPort, c.Pid(), vb)
logging.Tracef("%s [%s:giveup_r_%d:%s:%d] vb: %v vbsStateUpdate MetaState check",
logPrefix, c.workerName, i, c.tcpPort, c.Pid(), vb)

select {
case <-c.stopVbOwnerGiveupCh:
logging.Debugf("CRVT[%s:giveup_r_%d:%s:%d] Exiting vb ownership give-up routine, last vb handled: %v",
c.workerName, i, c.tcpPort, c.Pid(), vb)
logging.Debugf("%s [%s:giveup_r_%d:%s:%d] Exiting vb ownership give-up routine, last vb handled: %v",
logPrefix, c.workerName, i, c.tcpPort, c.Pid(), vb)
return

default:
Expand All @@ -157,8 +161,8 @@ func (c *Consumer) vbGiveUpRoutine(vbsts vbStats, giveupWg *sync.WaitGroup) {
time.Sleep(retryVbMetaStateCheckInterval)
goto retryVbMetaStateCheck
}
logging.Debugf("CRVT[%s:giveup_r_%d:%s:%d] Gracefully exited vb ownership give-up routine, last vb handled: %v",
c.workerName, i, c.tcpPort, c.Pid(), vb)
logging.Debugf("%s [%s:giveup_r_%d:%s:%d] Gracefully exited vb ownership give-up routine, last vb handled: %v",
logPrefix, c.workerName, i, c.tcpPort, c.Pid(), vb)
}
}
}
Expand Down
6 changes: 4 additions & 2 deletions producer/cluster_ops.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
)

var getClusterInfoCacheOpCallback = func(args ...interface{}) error {
logPrefix := "Producer::getClusterInfoCacheOpCallback"

p := args[0].(*Producer)
cinfo := args[1].(**util.ClusterInfoCache)

Expand All @@ -21,8 +23,8 @@ var getClusterInfoCacheOpCallback = func(args ...interface{}) error {
var err error
*cinfo, err = util.FetchNewClusterInfoCache(hostAddress)
if err != nil {
logging.Errorf("PRCO[%s:%d] Failed to get CIC handle while trying to get kv vbmap, err: %v",
p.appName, p.LenRunningConsumers(), err)
logging.Errorf("%s [%s:%d] Failed to get CIC handle while trying to get kv vbmap, err: %v",
logPrefix, p.appName, p.LenRunningConsumers(), err)
}

return err
Expand Down
6 changes: 3 additions & 3 deletions supervisor/super_supervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (s *SuperSupervisor) EventHandlerLoadCallback(path string, value []byte, re

logging.Infof("%s [%d] path => %s encoded value size => %v", logPrefix, len(s.runningProducers), path, len(value))

if !s.checkIfNodeInCluster() {
if !s.checkIfNodeInCluster() && len(s.runningProducers) == 0 {
logging.Infof("%s [%d] Node not part of cluster. Exiting callback", logPrefix, len(s.runningProducers))
return nil
}
Expand Down Expand Up @@ -181,7 +181,7 @@ func (s *SuperSupervisor) EventHandlerLoadCallback(path string, value []byte, re
func (s *SuperSupervisor) SettingsChangeCallback(path string, value []byte, rev interface{}) error {
logPrefix := "SuperSupervisor::SettingsChangeCallback"

if !s.checkIfNodeInCluster() {
if !s.checkIfNodeInCluster() && len(s.runningProducers) == 0 {
logging.Infof("%s [%d] Node not part of cluster. Exiting callback", logPrefix, len(s.runningProducers))
return nil
}
Expand Down Expand Up @@ -354,7 +354,7 @@ func (s *SuperSupervisor) TopologyChangeNotifCallback(path string, value []byte,

logging.Infof("%s [%d] Path => %s value => %s", logPrefix, len(s.runningProducers), path, string(value))

if !s.checkIfNodeInCluster() {
if !s.checkIfNodeInCluster() && len(s.runningProducers) == 0 {
logging.Infof("%s [%d] Node not part of cluster. Exiting callback", logPrefix, len(s.runningProducers))
return nil
}
Expand Down

0 comments on commit b80465a

Please sign in to comment.