Skip to content

Commit

Permalink
Merge remote-tracking branch 'couchbase/unstable' into HEAD
Browse files Browse the repository at this point in the history
http://ci-eventing-unstable.northscale.in/eventing-02.10.2018-05.30.pass.html

Change-Id: I1d611afb6b495b5e975986baedd89fd645bfdcb3
  • Loading branch information
jeelanp2003 committed Oct 2, 2018
2 parents 8836d2b + 51b65b8 commit 0d3c46f
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 11 deletions.
5 changes: 4 additions & 1 deletion consumer/handle_messages.go
Expand Up @@ -480,6 +480,7 @@ func (c *Consumer) sendMessageLoop() {
case <-c.socketWriteTicker.C:
if c.sendMsgCounter > 0 && c.conn != nil {
if atomic.LoadUint32(&c.isTerminateRunning) == 1 || c.stoppingConsumer {
c.socketWriteLoopStopAckCh <- struct{}{}
return
}

Expand All @@ -490,8 +491,9 @@ func (c *Consumer) sendMessageLoop() {
defer c.sendMsgBufferRWMutex.Unlock()

if c.conn == nil {
logging.Infof("%s [%s:%s:%d] connection socket closed, bailing out",
logging.Infof("%s [%s:%s:%d] stoppingConsumer: %t connection socket closed, bailing out",
logPrefix, c.workerName, c.tcpPort, c.Pid(), c.stoppingConsumer)
c.socketWriteLoopStopAckCh <- struct{}{}
return
}

Expand All @@ -501,6 +503,7 @@ func (c *Consumer) sendMessageLoop() {
logPrefix, c.workerName, c.tcpPort, c.Pid(), c.stoppingConsumer, err)

if atomic.LoadUint32(&c.isTerminateRunning) == 1 || c.stoppingConsumer {
c.socketWriteLoopStopAckCh <- struct{}{}
return
}

Expand Down
10 changes: 1 addition & 9 deletions consumer/http_handlers.go
Expand Up @@ -46,9 +46,8 @@ func (c *Consumer) RebalanceTaskProgress() *cm.RebalanceProgress {
progress.VbsRemainingToShuffle = vbsToMove
progress.CloseStreamVbsLen = vbsToMove
return progress
} else {
c.timerQueuesAreDrained = true
}
c.timerQueuesAreDrained = true
}

if len(vbsRemainingToCloseStream) == 0 && len(vbsRemainingToStreamReq) == 0 {
Expand All @@ -70,13 +69,6 @@ func (c *Consumer) checkIfTimerQueuesAreDrained() error {

if util.Contains(c.NodeUUID(), c.ejectNodesUUIDs) {

vbsFilterAckYetToCome := c.getVbsFilterAckYetToCome()
if len(vbsFilterAckYetToCome) > 0 {
logging.Infof("%s [%s:%s:%d] vbsFilterAckYetToCome dump: %s len: %d",
logPrefix, c.workerName, c.tcpPort, c.Pid(), util.Condense(vbsFilterAckYetToCome), len(vbsFilterAckYetToCome))
return errTimerQueueNotDrained
}

if c.cppQueueSizes.AggQueueSize > 0 {
c.GetExecutionStats()
logging.Infof("%s [%s:%s:%d] AggQueueSize: %d",
Expand Down
4 changes: 4 additions & 0 deletions service_manager/manager.go
Expand Up @@ -562,6 +562,10 @@ func (m *ServiceMgr) getActiveNodeAddrs() ([]string, error) {
var data []byte
util.Retry(util.NewFixedBackoff(time.Second), nil, metakvGetCallback, metakvConfigKeepNodes, &data)

if len(data) == 0 {
return nodeAddrs, nil
}

var keepNodes []string
err = json.Unmarshal(data, &keepNodes)
if err != nil {
Expand Down
22 changes: 22 additions & 0 deletions service_manager/service_manager.go
Expand Up @@ -2,6 +2,7 @@ package servicemanager

import (
"bytes"
"fmt"
"os"
"time"

Expand Down Expand Up @@ -177,6 +178,27 @@ func (m *ServiceMgr) StartTopologyChange(change service.TopologyChange) error {
m.failoverNotif = true

case service.TopologyChangeTypeRebalance:
nodeAddrs, err := m.getActiveNodeAddrs()
logging.Infof("%s Active Eventing nodes in the cluster: %rs", logPrefix, nodeAddrs)

if len(nodeAddrs) > 0 && err == nil {

logging.Infof("%s Querying nodes: %rs for bootstrap status", logPrefix, nodeAddrs)

// Fail rebalance if some apps are undergoing bootstrap
appsBootstrapping, err := util.GetAggBootstrappingApps("/getBootstrappingApps", nodeAddrs)
logging.Infof("%s Status of app bootstrap across all Eventing nodes: %v", logPrefix, appsBootstrapping)
if err != nil {
logging.Warnf("%s Some apps are undergoing bootstrap on some/all Eventing nodes, err: %v", logPrefix, err)
return err
}
}

if err != nil {
logging.Warnf("%s Error encountered while fetching active Eventing nodes, err: %v", logPrefix, err)
return fmt.Errorf("failed to get active eventing nodes in the cluster")
}

util.Retry(util.NewFixedBackoff(time.Second), nil, storeKeepNodesCallback, m.keepNodeUUIDs)

m.startRebalance(change)
Expand Down
4 changes: 4 additions & 0 deletions tests/functional_tests/clusterSetup.go
Expand Up @@ -272,6 +272,10 @@ func waitForRebalanceFinish() {
}
for _, v := range tasks {
task := v.(map[string]interface{})
if task["errorMessage"] != nil {
log.Println(task["errorMessage"].(string))
return
}
if task["type"].(string) == "rebalance" && task["status"].(string) == "running" {
rebalanceRunning = true
log.Println("Rebalance progress:", task["progress"])
Expand Down
3 changes: 2 additions & 1 deletion v8_consumer/src/client.cc
Expand Up @@ -713,10 +713,11 @@ void AppWorker::RouteMessageWithResponse(header_t *parsed_header,
}
break;
default:
LOG(logError) << "Opcode " << getTimerOpcode(parsed_header->opcode)
LOG(logError) << "Opcode " << getFilterOpcode(parsed_header->opcode)
<< "is not implemented for filtering" << std::endl;
break;
}
break;
case eTimer:
switch (getTimerOpcode(parsed_header->opcode)) {
case oTimer:
Expand Down

0 comments on commit 0d3c46f

Please sign in to comment.